You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/08/04 01:22:55 UTC

[1/2] git commit: Future-proof inter-major-version schema migrations

Updated Branches:
  refs/heads/trunk 4f2fdc2e7 -> 25c9293f2


Future-proof inter-major-version schema migrations

patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-5845


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/65daaf9d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/65daaf9d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/65daaf9d

Branch: refs/heads/trunk
Commit: 65daaf9d6ae90c1cd01cdbe23e36998411f0f2ed
Parents: 88bc4f2
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sun Aug 4 00:58:28 2013 +0200
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Aug 4 00:58:28 2013 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/service/MigrationManager.java     | 50 ++++++++++----------
 2 files changed, 25 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/65daaf9d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8eff1fd..960a8c2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
  * (Hadoop) fix support for Thrift tables in CqlPagingRecordReader 
    (CASSANDRA-5752)
  * add "all time blocked" to StatusLogger output (CASSANDRA-5825)
+ * Future-proof inter-major-version schema migrations (CASSANDRA-5845)
 
 
 1.2.8

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65daaf9d/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index de34785..9aa6f22 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -110,14 +110,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
      */
     private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint)
     {
-        // Can't request migrations from nodes with versions younger than 1.1.7
-        if (MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_117)
-            return;
-
-        if (Gossiper.instance.isFatClient(endpoint))
-            return;
-
-        if (Schema.instance.getVersion().equals(theirVersion))
+        if (Schema.instance.getVersion().equals(theirVersion) || !shouldPullSchemaFrom(endpoint))
             return;
 
         if (Schema.emptyVersion.equals(Schema.instance.getVersion()) || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
@@ -146,13 +139,25 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
         }
     }
 
-    private static void submitMigrationTask(InetAddress endpoint)
+    private static Future<?> submitMigrationTask(InetAddress endpoint)
     {
         /*
          * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are
          * running in the gossip stage.
          */
-        StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
+        return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
+    }
+
+    private static boolean shouldPullSchemaFrom(InetAddress endpoint)
+    {
+        /*
+         * Don't request schema from nodes with versions younger than 1.1.7 (timestamps in versions prior to 1.1.7 are broken)
+         * Don't request schema from nodes with a higher major (may have incompatible schema)
+         * Don't request schema from fat clients
+         */
+        return MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_117
+            && MessagingService.instance().getVersion(endpoint) <= MessagingService.current_version
+            && !Gossiper.instance.isFatClient(endpoint);
     }
 
     public static boolean isReadyForBootstrap()
@@ -303,10 +308,10 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
         for (InetAddress endpoint : Gossiper.instance.getLiveMembers())
         {
             if (endpoint.equals(FBUtilities.getBroadcastAddress()))
-                continue; // we've delt with localhost already
+                continue; // we've dealt with localhost already
 
-            // don't send migrations to the nodes with the versions older than < 1.2
-            if (MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_12)
+            // don't send schema to the nodes with the versions older than current major
+            if (MessagingService.instance().getVersion(endpoint) < MessagingService.current_version)
                 continue;
 
             pushSchemaMutation(endpoint, schema);
@@ -338,8 +343,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
 
         try
         {
-            if (logger.isDebugEnabled())
-                logger.debug("Truncating schema tables...");
+            logger.debug("Truncating schema tables...");
 
             // truncate schema tables
             FBUtilities.waitOnFutures(new ArrayList<Future<?>>(3)
@@ -349,26 +353,20 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
                 SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNS_CF).truncate();
             }});
 
-            if (logger.isDebugEnabled())
-                logger.debug("Clearing local schema keyspace definitions...");
+            logger.debug("Clearing local schema keyspace definitions...");
 
             Schema.instance.clear();
 
             Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers();
             liveEndpoints.remove(FBUtilities.getBroadcastAddress());
 
-            // force migration is there are nodes around, first of all
-            // check if there are nodes with versions >= 1.1.7 to request migrations from,
-            // because migration format of the nodes with versions < 1.1 is incompatible with older versions
-            // and due to broken timestamps in versions prior to 1.1.7
+            // force migration if there are nodes around
             for (InetAddress node : liveEndpoints)
             {
-                if (MessagingService.instance().getVersion(node) >= MessagingService.VERSION_117)
+                if (shouldPullSchemaFrom(node))
                 {
-                    if (logger.isDebugEnabled())
-                        logger.debug("Requesting schema from " + node);
-
-                    FBUtilities.waitOnFuture(StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(node)));
+                    logger.debug("Requesting schema from {}", node);
+                    FBUtilities.waitOnFuture(submitMigrationTask(node));
                     break;
                 }
             }


[2/2] git commit: Merge branch 'cassandra-1.2' into trunk

Posted by al...@apache.org.
Merge branch 'cassandra-1.2' into trunk

Conflicts:
	src/java/org/apache/cassandra/service/MigrationManager.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/25c9293f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/25c9293f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/25c9293f

Branch: refs/heads/trunk
Commit: 25c9293f200a83028597c691a4fd13d3f7d06850
Parents: 4f2fdc2 65daaf9
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sun Aug 4 01:22:44 2013 +0200
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Aug 4 01:22:44 2013 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 NEWS.txt                                        |  4 +-
 .../db/MigrationRequestVerbHandler.java         |  9 ++--
 .../cassandra/service/MigrationManager.java     | 51 +++++++++++---------
 .../apache/cassandra/service/MigrationTask.java |  2 +-
 5 files changed, 36 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/25c9293f/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25c9293f/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index ff8e387,11281ee..9b2c172
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -13,72 -13,6 +13,72 @@@ restore snapshots created with the prev
  'sstableloader' tool. You can upgrade the file format of your snapshots
  using the provided 'sstableupgrade' tool.
  
 +2.0.0
 +=====
 +
 +Upgrading
 +---------
 +    - CAS and new features in CQL such as DROP COLUMN assume that cell
 +      timestamps are microseconds-since-epoch.  Do not use these
 +      features if you are using client-specified timestamps with some
 +      other source.
-     - Upgrading is ONLY supported from Cassandra 1.2.7 or later.  This
++    - Upgrading is ONLY supported from Cassandra 1.2.9 or later. This
 +      goes for sstable compatibility as well as network.  When
-       upgrading from an earlier release, upgrade to 1.2.7 first and
++      upgrading from an earlier release, upgrade to 1.2.9 first and
 +      run upgradesstables before proceeding to 2.0.
 +    - Replication and strategy options do not accept unknown options anymore.
 +      This was already the case for CQL3 in 1.2 but this is now the case for
 +      thrift too.
 +    - auto_bootstrap of a single-token node with no initial_token will
 +      now pick a random token instead of bisecting an existing token
 +      range.  We recommend upgrading to vnodes; failing that, we
 +      recommend specifying initial_token.
 +    - reduce_cache_sizes_at, reduce_cache_capacity_to, and
 +      flush_largest_memtables_at options have been removed from cassandra.yaml.
 +    - CacheServiceMBean.reduceCacheSizes() has been removed.
 +      Use CacheServiceMBean.set{Key,Row}CacheCapacityInMB() instead.
 +    - authority option in cassandra.yaml has been deprecated since 1.2.0,
 +      but it has been completely removed in 2.0. Please use 'authorizer' option.
 +    - ASSUME command has been removed from cqlsh. Use CQL3 blobAsType() and
 +      typeAsBlob() conversion functions instead.
 +      See https://cassandra.apache.org/doc/cql3/CQL.html#blobFun for details.
 +    - Inputting blobs as string constants is now fully deprecated in
 +      favor of blob constants. Make sure to update your applications to use
 +      the new syntax while you are still on 1.2 (which supports both string
 +      and blob constants for blob input) before upgrading to 2.0.
 +    - index_interval is now moved to ColumnFamily property. You can change value
 +      with ALTER TABLE ... WITH statement and SSTables written after that will
 +      have new value. When upgrading, Cassandra will pick up the value defined in
 +      cassanda.yaml as the default for existing ColumnFamilies, until you explicitly
 +      set the value for those.
 +
 +Operations
 +----------
 +    - Major compactions, cleanup, scrub, and upgradesstables will interrupt 
 +      any in-progress compactions (but not repair validations) when invoked.
 +    - Disabling autocompactions by setting min/max compaction threshold to 0
 +      has been deprecated, instead, use the nodetool commands 'disableautocompaction'
 +      and 'enableautocompaction' or set the compaction strategy option enabled = false
 +    - ALTER TABLE DROP has been reenabled for CQL3 tables and has new semantics now.
 +      See https://cassandra.apache.org/doc/cql3/CQL.html#alterTableStmt and
 +      https://issues.apache.org/jira/browse/CASSANDRA-3919 for details.
 +    - CAS uses gc_grace_seconds to determine how long to keep unused paxos
 +      state around for, or a minimum of three hours.
 +    - A new hints created metric is tracked per target, replacing countPendingHints
 +    - After performance testing for CASSANDRA-5727, the default LCS filesize
 +      has been changed from 5MB to 160MB.
 +
 +Features
 +--------
 +    - Alias support has been added to CQL3 SELECT statement. Refer to
 +      CQL3 documentation (http://cassandra.apache.org/doc/cql3/CQL.html) for details.
 +    - JEMalloc support (see memory_allocator in cassandra.yaml)
 +    - Experimental triggers support.  See examples/ for how to use.  "Experimental"
 +      means "tied closely to internal data structures; we plan to decouple this in
 +      the future, which will probably break triggers written against this initial
 +      API."
 +
 +
  1.2.9
  =====
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25c9293f/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
index e593a98,1992c01..31a64a9
--- a/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
@@@ -36,15 -37,20 +36,12 @@@ public class MigrationRequestVerbHandle
  {
      private static final Logger logger = LoggerFactory.getLogger(MigrationRequestVerbHandler.class);
  
 -    public void doVerb(MessageIn message, String id)
 +    public void doVerb(MessageIn message, int id)
      {
          logger.debug("Received migration request from {}.", message.from);
--
-         Collection<RowMutation> schema = SystemKeyspace.serializeSchema();
- 
-         MessageOut<Collection<RowMutation>> response = new MessageOut<Collection<RowMutation>>(MessagingService.Verb.INTERNAL_RESPONSE,
-                                                                                                schema,
-                                                                                                MigrationManager.MigrationsSerializer.instance);
 -        if (message.version < MessagingService.VERSION_12)
 -            logger.debug("Returning empty response to the migration request from {} (version < 1.2).", message.from);
 -
 -        Collection<RowMutation> schema = message.version < MessagingService.VERSION_12
 -                                         ? Collections.EMPTY_SET
 -                                         : SystemTable.serializeSchema();
 -
 -        MessageOut<Collection<RowMutation>> response = new MessageOut<Collection<RowMutation>>(MessagingService.Verb.INTERNAL_RESPONSE,
 -                                                                                               schema,
 -                                                                                               MigrationManager.MigrationsSerializer.instance);
++        MessageOut<Collection<RowMutation>> response = new MessageOut<>(MessagingService.Verb.INTERNAL_RESPONSE,
++                                                                        SystemKeyspace.serializeSchema(),
++                                                                        MigrationManager.MigrationsSerializer.instance);
          MessagingService.instance().sendReply(response, id, message.from);
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25c9293f/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/MigrationManager.java
index 8c31632,9aa6f22..a59cffc
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@@ -148,7 -145,19 +145,17 @@@ public class MigrationManager implement
           * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are
           * running in the gossip stage.
           */
-         StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
+         return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
+     }
+ 
+     private static boolean shouldPullSchemaFrom(InetAddress endpoint)
+     {
+         /*
 -         * Don't request schema from nodes with versions younger than 1.1.7 (timestamps in versions prior to 1.1.7 are broken)
+          * Don't request schema from nodes with a higher major (may have incompatible schema)
+          * Don't request schema from fat clients
+          */
 -        return MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_117
 -            && MessagingService.instance().getVersion(endpoint) <= MessagingService.current_version
++        return MessagingService.instance().getVersion(endpoint) <= MessagingService.current_version
+             && !Gossiper.instance.isFatClient(endpoint);
      }
  
      public static boolean isReadyForBootstrap()
@@@ -279,9 -288,9 +286,9 @@@
  
      private static void pushSchemaMutation(InetAddress endpoint, Collection<RowMutation> schema)
      {
--        MessageOut<Collection<RowMutation>> msg = new MessageOut<Collection<RowMutation>>(MessagingService.Verb.DEFINITIONS_UPDATE,
--                                                                                          schema,
--                                                                                          MigrationsSerializer.instance);
++        MessageOut<Collection<RowMutation>> msg = new MessageOut<>(MessagingService.Verb.DEFINITIONS_UPDATE,
++                                                                   schema,
++                                                                   MigrationsSerializer.instance);
          MessagingService.instance().sendOneWay(msg, endpoint);
      }
  
@@@ -328,37 -341,64 +339,33 @@@
      {
          logger.info("Starting local schema reset...");
  
-         if (logger.isDebugEnabled())
-             logger.debug("Truncating schema tables...");
 -        try
 -        {
 -            logger.debug("Truncating schema tables...");
++        logger.debug("Truncating schema tables...");
  
 -            // truncate schema tables
 -            FBUtilities.waitOnFutures(new ArrayList<Future<?>>(3)
 -            {{
 -                SystemTable.schemaCFS(SystemTable.SCHEMA_KEYSPACES_CF).truncate();
 -                SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNFAMILIES_CF).truncate();
 -                SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNS_CF).truncate();
 -            }});
 +        // truncate schema tables
 +        SystemKeyspace.schemaCFS(SystemKeyspace.SCHEMA_KEYSPACES_CF).truncateBlocking();
 +        SystemKeyspace.schemaCFS(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF).truncateBlocking();
 +        SystemKeyspace.schemaCFS(SystemKeyspace.SCHEMA_COLUMNS_CF).truncateBlocking();
 +        SystemKeyspace.schemaCFS(SystemKeyspace.SCHEMA_TRIGGERS_CF).truncateBlocking();
  
-         if (logger.isDebugEnabled())
--            logger.debug("Clearing local schema keyspace definitions...");
++        logger.debug("Clearing local schema keyspace definitions...");
  
 -            Schema.instance.clear();
 +        Schema.instance.clear();
  
 -            Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers();
 -            liveEndpoints.remove(FBUtilities.getBroadcastAddress());
 +        Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers();
 +        liveEndpoints.remove(FBUtilities.getBroadcastAddress());
  
-         // force migration is there are nodes around, first of all
-         // check if there are nodes with versions >= 1.1.7 to request migrations from,
-         // because migration format of the nodes with versions < 1.1 is incompatible with older versions
-         // and due to broken timestamps in versions prior to 1.1.7
 -            // force migration if there are nodes around
 -            for (InetAddress node : liveEndpoints)
++        // force migration if there are nodes around
 +        for (InetAddress node : liveEndpoints)
 +        {
-             if (logger.isDebugEnabled())
-                 logger.debug("Requesting schema from " + node);
- 
-             FBUtilities.waitOnFuture(StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(node)));
-             break;
++            if (shouldPullSchemaFrom(node))
+             {
 -                if (shouldPullSchemaFrom(node))
 -                {
 -                    logger.debug("Requesting schema from {}", node);
 -                    FBUtilities.waitOnFuture(submitMigrationTask(node));
 -                    break;
 -                }
++                logger.debug("Requesting schema from {}", node);
++                FBUtilities.waitOnFuture(submitMigrationTask(node));
++                break;
+             }
 -
 -            logger.info("Local schema reset is complete.");
 -        }
 -        catch (InterruptedException e)
 -        {
 -            throw new RuntimeException(e);
          }
 -        catch (ExecutionException e)
 -        {
 -            throw new RuntimeException(e);
 -        }
 -    }
  
 -    /**
 -     * Used only in case node has old style migration schema (newly updated)
 -     * @return the UUID identifying version of the last applied migration
 -     */
 -    @Deprecated
 -    public static UUID getLastMigrationId()
 -    {
 -        DecoratedKey dkey = StorageService.getPartitioner().decorateKey(LAST_MIGRATION_KEY);
 -        Table defs = Table.open(Table.SYSTEM_KS);
 -        ColumnFamilyStore cfStore = defs.getColumnFamilyStore(DefsTable.OLD_SCHEMA_CF);
 -        QueryFilter filter = QueryFilter.getNamesFilter(dkey, new QueryPath(DefsTable.OLD_SCHEMA_CF), LAST_MIGRATION_KEY);
 -        ColumnFamily cf = cfStore.getColumnFamily(filter);
 -        if (cf == null || cf.getColumnNames().size() == 0)
 -            return null;
 -        else
 -            return UUIDGen.getUUID(cf.getColumn(LAST_MIGRATION_KEY).value());
 +        logger.info("Local schema reset is complete.");
      }
  
      public static class MigrationsSerializer implements IVersionedSerializer<Collection<RowMutation>>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25c9293f/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/MigrationTask.java
index df216ea,ac7c6be..93572f0
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@@ -48,7 -48,7 +48,7 @@@ class MigrationTask extends WrappedRunn
  
      public void runMayThrow() throws Exception
      {
--        MessageOut message = new MessageOut(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
++        MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
  
          if (!FailureDetector.instance.isAlive(endpoint))
          {