You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/08/04 01:01:10 UTC

git commit: Future-proof inter-major-version schema migrations

Updated Branches:
  refs/heads/cassandra-1.2 88bc4f237 -> 65daaf9d6


Future-proof inter-major-version schema migrations

patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-5845


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/65daaf9d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/65daaf9d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/65daaf9d

Branch: refs/heads/cassandra-1.2
Commit: 65daaf9d6ae90c1cd01cdbe23e36998411f0f2ed
Parents: 88bc4f2
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sun Aug 4 00:58:28 2013 +0200
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Aug 4 00:58:28 2013 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/service/MigrationManager.java     | 50 ++++++++++----------
 2 files changed, 25 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/65daaf9d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8eff1fd..960a8c2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
  * (Hadoop) fix support for Thrift tables in CqlPagingRecordReader 
    (CASSANDRA-5752)
  * add "all time blocked" to StatusLogger output (CASSANDRA-5825)
+ * Future-proof inter-major-version schema migrations (CASSANDRA-5845)
 
 
 1.2.8

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65daaf9d/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index de34785..9aa6f22 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -110,14 +110,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
      */
     private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint)
     {
-        // Can't request migrations from nodes with versions younger than 1.1.7
-        if (MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_117)
-            return;
-
-        if (Gossiper.instance.isFatClient(endpoint))
-            return;
-
-        if (Schema.instance.getVersion().equals(theirVersion))
+        if (Schema.instance.getVersion().equals(theirVersion) || !shouldPullSchemaFrom(endpoint))
             return;
 
         if (Schema.emptyVersion.equals(Schema.instance.getVersion()) || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
@@ -146,13 +139,25 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
         }
     }
 
-    private static void submitMigrationTask(InetAddress endpoint)
+    private static Future<?> submitMigrationTask(InetAddress endpoint)
     {
         /*
          * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are
          * running in the gossip stage.
          */
-        StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
+        return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
+    }
+
+    private static boolean shouldPullSchemaFrom(InetAddress endpoint)
+    {
+        /*
+         * Don't request schema from nodes with versions younger than 1.1.7 (timestamps in versions prior to 1.1.7 are broken)
+         * Don't request schema from nodes with a higher major (may have incompatible schema)
+         * Don't request schema from fat clients
+         */
+        return MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_117
+            && MessagingService.instance().getVersion(endpoint) <= MessagingService.current_version
+            && !Gossiper.instance.isFatClient(endpoint);
     }
 
     public static boolean isReadyForBootstrap()
@@ -303,10 +308,10 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
         for (InetAddress endpoint : Gossiper.instance.getLiveMembers())
         {
             if (endpoint.equals(FBUtilities.getBroadcastAddress()))
-                continue; // we've delt with localhost already
+                continue; // we've dealt with localhost already
 
-            // don't send migrations to the nodes with the versions older than < 1.2
-            if (MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_12)
+            // don't send schema to the nodes with the versions older than current major
+            if (MessagingService.instance().getVersion(endpoint) < MessagingService.current_version)
                 continue;
 
             pushSchemaMutation(endpoint, schema);
@@ -338,8 +343,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
 
         try
         {
-            if (logger.isDebugEnabled())
-                logger.debug("Truncating schema tables...");
+            logger.debug("Truncating schema tables...");
 
             // truncate schema tables
             FBUtilities.waitOnFutures(new ArrayList<Future<?>>(3)
@@ -349,26 +353,20 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
                 SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNS_CF).truncate();
             }});
 
-            if (logger.isDebugEnabled())
-                logger.debug("Clearing local schema keyspace definitions...");
+            logger.debug("Clearing local schema keyspace definitions...");
 
             Schema.instance.clear();
 
             Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers();
             liveEndpoints.remove(FBUtilities.getBroadcastAddress());
 
-            // force migration is there are nodes around, first of all
-            // check if there are nodes with versions >= 1.1.7 to request migrations from,
-            // because migration format of the nodes with versions < 1.1 is incompatible with older versions
-            // and due to broken timestamps in versions prior to 1.1.7
+            // force migration if there are nodes around
             for (InetAddress node : liveEndpoints)
             {
-                if (MessagingService.instance().getVersion(node) >= MessagingService.VERSION_117)
+                if (shouldPullSchemaFrom(node))
                 {
-                    if (logger.isDebugEnabled())
-                        logger.debug("Requesting schema from " + node);
-
-                    FBUtilities.waitOnFuture(StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(node)));
+                    logger.debug("Requesting schema from {}", node);
+                    FBUtilities.waitOnFuture(submitMigrationTask(node));
                     break;
                 }
             }