You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/12/07 23:56:13 UTC

[16/16] git commit: Pull schema immediately when bootstrapping. Patch by Chris Herron and brandonwilliams, reviewed by jbellis for CASSANDRA-5025

Pull schema immediately when bootstrapping.
Patch by Chris Herron and brandonwilliams, reviewed by jbellis for
CASSANDRA-5025


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c9ce1f01
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c9ce1f01
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c9ce1f01

Branch: refs/heads/cassandra-1.2
Commit: c9ce1f0111202044d686f335fc2ebf0fe771ed79
Parents: 1ac9058
Author: Brandon Williams <br...@apache.org>
Authored: Fri Dec 7 15:31:25 2012 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Dec 7 15:44:26 2012 -0600

----------------------------------------------------------------------
 .../apache/cassandra/service/MigrationManager.java |   57 +++++++++------
 .../apache/cassandra/service/StorageService.java   |    7 ++-
 2 files changed, 41 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ce1f01/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 102ea12..72a9a84 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.ArrayList;
@@ -92,6 +93,10 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
     public void onRemove(InetAddress endpoint)
     {}
 
+    /**
+     * If versions differ this node sends request with local migration list to the endpoint
+     * and expecting to receive a list of migrations to apply locally.
+     */
     private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint)
     {
         // Can't request migrations from nodes with versions younger than 1.1.7
@@ -101,29 +106,39 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
         if (Schema.instance.getVersion().equals(theirVersion))
             return;
 
-        // check our schema vs theirs, after a delay to make sure we have a chance to apply any changes
-        // being pushed out simultaneously.  See CASSANDRA-5025
-        Runnable runnable = new Runnable()
+        if (Schema.emptyVersion.equals(Schema.instance.getVersion()))
         {
-            public void run()
+            // If we think we may be bootstrapping, submit MigrationTask immediately
+            submitMigrationTask(endpoint);
+        }
+        else
+        {
+            // Include a delay to make sure we have a chance to apply any changes being
+            // pushed out simultaneously. See CASSANDRA-5025
+            Runnable runnable = new Runnable()
             {
-                // grab the latest version of the schema since it may have changed again since the initial scheduling
-                VersionedValue value = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA);
-                UUID currentVersion = UUID.fromString(value.value);
-                if (Schema.instance.getVersion().equals(currentVersion))
-                    return;
-
-                /**
-                 * if versions differ this node sends request with local migration list to the endpoint
-                 * and expecting to receive a list of migrations to apply locally.
-                 *
-                 * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are
-                 * running in the gossip stage.
-                 */
-                StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
-            }
-        };
-        StorageService.optionalTasks.schedule(runnable, 1, TimeUnit.MINUTES);
+                public void run()
+                {
+                    // grab the latest version of the schema since it may have changed again since the initial scheduling
+                    VersionedValue value = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA);
+                    UUID currentVersion = UUID.fromString(value.value);
+                    if (Schema.instance.getVersion().equals(currentVersion))
+                        return;
+
+                    submitMigrationTask(endpoint);
+                }
+            };
+            StorageService.optionalTasks.schedule(runnable, 1, TimeUnit.MINUTES);
+        }
+    }
+
+    private static void submitMigrationTask(InetAddress endpoint)
+    {
+        /*
+         * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are
+         * running in the gossip stage.
+         */
+        StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
     }
 
     public static boolean isReadyForBootstrap()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ce1f01/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 30da45c..d041279 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -544,8 +544,11 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         Gossiper.instance.register(this);
         Gossiper.instance.register(migrationManager);
         Gossiper.instance.start(SystemTable.incrementAndGetGeneration()); // needed for node-ring gathering.
-        // gossip schema version when gossiper is running
-        Schema.instance.updateVersionAndAnnounce();
+
+        // gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull)
+        Schema.instance.updateVersion(); // Ensure we know our own actual Schema UUID in preparation for updates
+        MigrationManager.passiveAnnounce(Schema.emptyVersion);
+
         // add rpc listening info
         Gossiper.instance.addLocalApplicationState(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(DatabaseDescriptor.getRpcAddress()));
         if (null != DatabaseDescriptor.getReplaceToken())