You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/08/04 01:01:10 UTC
git commit: Future-proof inter-major-version schema migrations
Updated Branches:
refs/heads/cassandra-1.2 88bc4f237 -> 65daaf9d6
Future-proof inter-major-version schema migrations
patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-5845
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/65daaf9d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/65daaf9d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/65daaf9d
Branch: refs/heads/cassandra-1.2
Commit: 65daaf9d6ae90c1cd01cdbe23e36998411f0f2ed
Parents: 88bc4f2
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sun Aug 4 00:58:28 2013 +0200
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Aug 4 00:58:28 2013 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/service/MigrationManager.java | 50 ++++++++++----------
2 files changed, 25 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65daaf9d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8eff1fd..960a8c2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
* (Hadoop) fix support for Thrift tables in CqlPagingRecordReader
(CASSANDRA-5752)
* add "all time blocked" to StatusLogger output (CASSANDRA-5825)
+ * Future-proof inter-major-version schema migrations (CASSANDRA-5845)
1.2.8
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65daaf9d/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index de34785..9aa6f22 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -110,14 +110,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
*/
private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint)
{
- // Can't request migrations from nodes with versions younger than 1.1.7
- if (MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_117)
- return;
-
- if (Gossiper.instance.isFatClient(endpoint))
- return;
-
- if (Schema.instance.getVersion().equals(theirVersion))
+ if (Schema.instance.getVersion().equals(theirVersion) || !shouldPullSchemaFrom(endpoint))
return;
if (Schema.emptyVersion.equals(Schema.instance.getVersion()) || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
@@ -146,13 +139,25 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
}
}
- private static void submitMigrationTask(InetAddress endpoint)
+ private static Future<?> submitMigrationTask(InetAddress endpoint)
{
/*
* Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are
* running in the gossip stage.
*/
- StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
+ return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
+ }
+
+ private static boolean shouldPullSchemaFrom(InetAddress endpoint)
+ {
+ /*
+ * Don't request schema from nodes with versions younger than 1.1.7 (timestamps in versions prior to 1.1.7 are broken)
+ * Don't request schema from nodes with a higher major (may have incompatible schema)
+ * Don't request schema from fat clients
+ */
+ return MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_117
+ && MessagingService.instance().getVersion(endpoint) <= MessagingService.current_version
+ && !Gossiper.instance.isFatClient(endpoint);
}
public static boolean isReadyForBootstrap()
@@ -303,10 +308,10 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
for (InetAddress endpoint : Gossiper.instance.getLiveMembers())
{
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
- continue; // we've delt with localhost already
+ continue; // we've dealt with localhost already
- // don't send migrations to the nodes with the versions older than < 1.2
- if (MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_12)
+ // don't send schema to the nodes with the versions older than current major
+ if (MessagingService.instance().getVersion(endpoint) < MessagingService.current_version)
continue;
pushSchemaMutation(endpoint, schema);
@@ -338,8 +343,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
try
{
- if (logger.isDebugEnabled())
- logger.debug("Truncating schema tables...");
+ logger.debug("Truncating schema tables...");
// truncate schema tables
FBUtilities.waitOnFutures(new ArrayList<Future<?>>(3)
@@ -349,26 +353,20 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNS_CF).truncate();
}});
- if (logger.isDebugEnabled())
- logger.debug("Clearing local schema keyspace definitions...");
+ logger.debug("Clearing local schema keyspace definitions...");
Schema.instance.clear();
Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers();
liveEndpoints.remove(FBUtilities.getBroadcastAddress());
- // force migration is there are nodes around, first of all
- // check if there are nodes with versions >= 1.1.7 to request migrations from,
- // because migration format of the nodes with versions < 1.1 is incompatible with older versions
- // and due to broken timestamps in versions prior to 1.1.7
+ // force migration if there are nodes around
for (InetAddress node : liveEndpoints)
{
- if (MessagingService.instance().getVersion(node) >= MessagingService.VERSION_117)
+ if (shouldPullSchemaFrom(node))
{
- if (logger.isDebugEnabled())
- logger.debug("Requesting schema from " + node);
-
- FBUtilities.waitOnFuture(StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(node)));
+ logger.debug("Requesting schema from {}", node);
+ FBUtilities.waitOnFuture(submitMigrationTask(node));
break;
}
}