You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/12/06 23:15:42 UTC

[9/10] git commit: Improve schema propagation performance patch by jbellis; reviewed by Chris Herron and xedin for CASSANDRA-5025

Improve schema propagation performance
patch by jbellis; reviewed by Chris Herron and xedin for CASSANDRA-5025


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8d55474f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8d55474f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8d55474f

Branch: refs/heads/cassandra-1.2.0
Commit: 8d55474febf31980f2eb158ae0ff9717b0761f1e
Parents: 5abeecc
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Dec 6 16:12:31 2012 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Dec 6 16:12:38 2012 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../apache/cassandra/service/MigrationManager.java |   39 ++++++++++-----
 2 files changed, 28 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8d55474f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a0e3eb7..5d1fd8d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.1.8
+ * Improve schema propagation performance (CASSANDRA-5025)
  * Fall back to old describe_splits if d_s_ex is not available (CASSANDRA-4803)
  * Improve error reporting when streaming ranges fail (CASSANDRA-5009)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8d55474f/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 94c0dcc..102ea12 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -25,12 +25,12 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,7 +72,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
         if (state != ApplicationState.SCHEMA || endpoint.equals(FBUtilities.getBroadcastAddress()))
             return;
 
-        rectifySchema(UUID.fromString(value.value), endpoint);
+        maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint);
     }
 
     public void onAlive(InetAddress endpoint, EndpointState state)
@@ -80,7 +80,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
         VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA);
 
         if (value != null)
-            rectifySchema(UUID.fromString(value.value), endpoint);
+            maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint);
     }
 
     public void onDead(InetAddress endpoint, EndpointState state)
@@ -92,7 +92,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
     public void onRemove(InetAddress endpoint)
     {}
 
-    private static void rectifySchema(UUID theirVersion, final InetAddress endpoint)
+    private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint)
     {
         // Can't request migrations from nodes with versions younger than 1.1.7
         if (Gossiper.instance.getVersion(endpoint) < MessagingService.VERSION_117)
@@ -101,14 +101,29 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
         if (Schema.instance.getVersion().equals(theirVersion))
             return;
 
-        /**
-         * if versions differ this node sends request with local migration list to the endpoint
-         * and expecting to receive a list of migrations to apply locally.
-         *
-         * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are
-         * running in the gossip stage.
-         */
-        StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
+        // check our schema vs theirs, after a delay to make sure we have a chance to apply any changes
+        // being pushed out simultaneously.  See CASSANDRA-5025
+        Runnable runnable = new Runnable()
+        {
+            public void run()
+            {
+                // grab the latest version of the schema since it may have changed again since the initial scheduling
+                VersionedValue value = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA);
+                UUID currentVersion = UUID.fromString(value.value);
+                if (Schema.instance.getVersion().equals(currentVersion))
+                    return;
+
+                /**
+                 * if versions differ this node sends request with local migration list to the endpoint
+                 * and expecting to receive a list of migrations to apply locally.
+                 *
+                 * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are
+                 * running in the gossip stage.
+                 */
+                StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
+            }
+        };
+        StorageService.optionalTasks.schedule(runnable, 1, TimeUnit.MINUTES);
     }
 
     public static boolean isReadyForBootstrap()