You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/08/04 01:22:55 UTC
[1/2] git commit: Future-proof inter-major-version schema migrations
Updated Branches:
refs/heads/trunk 4f2fdc2e7 -> 25c9293f2
Future-proof inter-major-version schema migrations
patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-5845
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/65daaf9d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/65daaf9d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/65daaf9d
Branch: refs/heads/trunk
Commit: 65daaf9d6ae90c1cd01cdbe23e36998411f0f2ed
Parents: 88bc4f2
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sun Aug 4 00:58:28 2013 +0200
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Aug 4 00:58:28 2013 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/service/MigrationManager.java | 50 ++++++++++----------
2 files changed, 25 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65daaf9d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8eff1fd..960a8c2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
* (Hadoop) fix support for Thrift tables in CqlPagingRecordReader
(CASSANDRA-5752)
* add "all time blocked" to StatusLogger output (CASSANDRA-5825)
+ * Future-proof inter-major-version schema migrations (CASSANDRA-5845)
1.2.8
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65daaf9d/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index de34785..9aa6f22 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -110,14 +110,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
*/
private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint)
{
- // Can't request migrations from nodes with versions younger than 1.1.7
- if (MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_117)
- return;
-
- if (Gossiper.instance.isFatClient(endpoint))
- return;
-
- if (Schema.instance.getVersion().equals(theirVersion))
+ if (Schema.instance.getVersion().equals(theirVersion) || !shouldPullSchemaFrom(endpoint))
return;
if (Schema.emptyVersion.equals(Schema.instance.getVersion()) || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
@@ -146,13 +139,25 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
}
}
- private static void submitMigrationTask(InetAddress endpoint)
+ private static Future<?> submitMigrationTask(InetAddress endpoint)
{
/*
* Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are
* running in the gossip stage.
*/
- StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
+ return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
+ }
+
+ private static boolean shouldPullSchemaFrom(InetAddress endpoint)
+ {
+ /*
+ * Don't request schema from nodes with versions younger than 1.1.7 (timestamps in versions prior to 1.1.7 are broken)
+ * Don't request schema from nodes with a higher major (may have incompatible schema)
+ * Don't request schema from fat clients
+ */
+ return MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_117
+ && MessagingService.instance().getVersion(endpoint) <= MessagingService.current_version
+ && !Gossiper.instance.isFatClient(endpoint);
}
public static boolean isReadyForBootstrap()
@@ -303,10 +308,10 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
for (InetAddress endpoint : Gossiper.instance.getLiveMembers())
{
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
- continue; // we've delt with localhost already
+ continue; // we've dealt with localhost already
- // don't send migrations to the nodes with the versions older than < 1.2
- if (MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_12)
+ // don't send schema to the nodes with the versions older than current major
+ if (MessagingService.instance().getVersion(endpoint) < MessagingService.current_version)
continue;
pushSchemaMutation(endpoint, schema);
@@ -338,8 +343,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
try
{
- if (logger.isDebugEnabled())
- logger.debug("Truncating schema tables...");
+ logger.debug("Truncating schema tables...");
// truncate schema tables
FBUtilities.waitOnFutures(new ArrayList<Future<?>>(3)
@@ -349,26 +353,20 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNS_CF).truncate();
}});
- if (logger.isDebugEnabled())
- logger.debug("Clearing local schema keyspace definitions...");
+ logger.debug("Clearing local schema keyspace definitions...");
Schema.instance.clear();
Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers();
liveEndpoints.remove(FBUtilities.getBroadcastAddress());
- // force migration is there are nodes around, first of all
- // check if there are nodes with versions >= 1.1.7 to request migrations from,
- // because migration format of the nodes with versions < 1.1 is incompatible with older versions
- // and due to broken timestamps in versions prior to 1.1.7
+ // force migration if there are nodes around
for (InetAddress node : liveEndpoints)
{
- if (MessagingService.instance().getVersion(node) >= MessagingService.VERSION_117)
+ if (shouldPullSchemaFrom(node))
{
- if (logger.isDebugEnabled())
- logger.debug("Requesting schema from " + node);
-
- FBUtilities.waitOnFuture(StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(node)));
+ logger.debug("Requesting schema from {}", node);
+ FBUtilities.waitOnFuture(submitMigrationTask(node));
break;
}
}
[2/2] git commit: Merge branch 'cassandra-1.2' into trunk
Posted by al...@apache.org.
Merge branch 'cassandra-1.2' into trunk
Conflicts:
src/java/org/apache/cassandra/service/MigrationManager.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/25c9293f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/25c9293f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/25c9293f
Branch: refs/heads/trunk
Commit: 25c9293f200a83028597c691a4fd13d3f7d06850
Parents: 4f2fdc2 65daaf9
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sun Aug 4 01:22:44 2013 +0200
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Aug 4 01:22:44 2013 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 4 +-
.../db/MigrationRequestVerbHandler.java | 9 ++--
.../cassandra/service/MigrationManager.java | 51 +++++++++++---------
.../apache/cassandra/service/MigrationTask.java | 2 +-
5 files changed, 36 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25c9293f/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25c9293f/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index ff8e387,11281ee..9b2c172
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -13,72 -13,6 +13,72 @@@ restore snapshots created with the prev
'sstableloader' tool. You can upgrade the file format of your snapshots
using the provided 'sstableupgrade' tool.
+2.0.0
+=====
+
+Upgrading
+---------
+ - CAS and new features in CQL such as DROP COLUMN assume that cell
+ timestamps are microseconds-since-epoch. Do not use these
+ features if you are using client-specified timestamps with some
+ other source.
- - Upgrading is ONLY supported from Cassandra 1.2.7 or later. This
++ - Upgrading is ONLY supported from Cassandra 1.2.9 or later. This
+ goes for sstable compatibility as well as network. When
- upgrading from an earlier release, upgrade to 1.2.7 first and
++ upgrading from an earlier release, upgrade to 1.2.9 first and
+ run upgradesstables before proceeding to 2.0.
+ - Replication and strategy options do not accept unknown options anymore.
+ This was already the case for CQL3 in 1.2 but this is now the case for
+ thrift too.
+ - auto_bootstrap of a single-token node with no initial_token will
+ now pick a random token instead of bisecting an existing token
+ range. We recommend upgrading to vnodes; failing that, we
+ recommend specifying initial_token.
+ - reduce_cache_sizes_at, reduce_cache_capacity_to, and
+ flush_largest_memtables_at options have been removed from cassandra.yaml.
+ - CacheServiceMBean.reduceCacheSizes() has been removed.
+ Use CacheServiceMBean.set{Key,Row}CacheCapacityInMB() instead.
+ - authority option in cassandra.yaml has been deprecated since 1.2.0,
+ but it has been completely removed in 2.0. Please use 'authorizer' option.
+ - ASSUME command has been removed from cqlsh. Use CQL3 blobAsType() and
+ typeAsBlob() conversion functions instead.
+ See https://cassandra.apache.org/doc/cql3/CQL.html#blobFun for details.
+ - Inputting blobs as string constants is now fully deprecated in
+ favor of blob constants. Make sure to update your applications to use
+ the new syntax while you are still on 1.2 (which supports both string
+ and blob constants for blob input) before upgrading to 2.0.
+ - index_interval is now moved to ColumnFamily property. You can change value
+ with ALTER TABLE ... WITH statement and SSTables written after that will
+ have new value. When upgrading, Cassandra will pick up the value defined in
+ cassanda.yaml as the default for existing ColumnFamilies, until you explicitly
+ set the value for those.
+
+Operations
+----------
+ - Major compactions, cleanup, scrub, and upgradesstables will interrupt
+ any in-progress compactions (but not repair validations) when invoked.
+ - Disabling autocompactions by setting min/max compaction threshold to 0
+ has been deprecated, instead, use the nodetool commands 'disableautocompaction'
+ and 'enableautocompaction' or set the compaction strategy option enabled = false
+ - ALTER TABLE DROP has been reenabled for CQL3 tables and has new semantics now.
+ See https://cassandra.apache.org/doc/cql3/CQL.html#alterTableStmt and
+ https://issues.apache.org/jira/browse/CASSANDRA-3919 for details.
+ - CAS uses gc_grace_seconds to determine how long to keep unused paxos
+ state around for, or a minimum of three hours.
+ - A new hints created metric is tracked per target, replacing countPendingHints
+ - After performance testing for CASSANDRA-5727, the default LCS filesize
+ has been changed from 5MB to 160MB.
+
+Features
+--------
+ - Alias support has been added to CQL3 SELECT statement. Refer to
+ CQL3 documentation (http://cassandra.apache.org/doc/cql3/CQL.html) for details.
+ - JEMalloc support (see memory_allocator in cassandra.yaml)
+ - Experimental triggers support. See examples/ for how to use. "Experimental"
+ means "tied closely to internal data structures; we plan to decouple this in
+ the future, which will probably break triggers written against this initial
+ API."
+
+
1.2.9
=====
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25c9293f/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
index e593a98,1992c01..31a64a9
--- a/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
@@@ -36,15 -37,20 +36,12 @@@ public class MigrationRequestVerbHandle
{
private static final Logger logger = LoggerFactory.getLogger(MigrationRequestVerbHandler.class);
- public void doVerb(MessageIn message, String id)
+ public void doVerb(MessageIn message, int id)
{
logger.debug("Received migration request from {}.", message.from);
--
- Collection<RowMutation> schema = SystemKeyspace.serializeSchema();
-
- MessageOut<Collection<RowMutation>> response = new MessageOut<Collection<RowMutation>>(MessagingService.Verb.INTERNAL_RESPONSE,
- schema,
- MigrationManager.MigrationsSerializer.instance);
- if (message.version < MessagingService.VERSION_12)
- logger.debug("Returning empty response to the migration request from {} (version < 1.2).", message.from);
-
- Collection<RowMutation> schema = message.version < MessagingService.VERSION_12
- ? Collections.EMPTY_SET
- : SystemTable.serializeSchema();
-
- MessageOut<Collection<RowMutation>> response = new MessageOut<Collection<RowMutation>>(MessagingService.Verb.INTERNAL_RESPONSE,
- schema,
- MigrationManager.MigrationsSerializer.instance);
++ MessageOut<Collection<RowMutation>> response = new MessageOut<>(MessagingService.Verb.INTERNAL_RESPONSE,
++ SystemKeyspace.serializeSchema(),
++ MigrationManager.MigrationsSerializer.instance);
MessagingService.instance().sendReply(response, id, message.from);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25c9293f/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/MigrationManager.java
index 8c31632,9aa6f22..a59cffc
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@@ -148,7 -145,19 +145,17 @@@ public class MigrationManager implement
* Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are
* running in the gossip stage.
*/
- StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
+ return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
+ }
+
+ private static boolean shouldPullSchemaFrom(InetAddress endpoint)
+ {
+ /*
- * Don't request schema from nodes with versions younger than 1.1.7 (timestamps in versions prior to 1.1.7 are broken)
+ * Don't request schema from nodes with a higher major (may have incompatible schema)
+ * Don't request schema from fat clients
+ */
- return MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_117
- && MessagingService.instance().getVersion(endpoint) <= MessagingService.current_version
++ return MessagingService.instance().getVersion(endpoint) <= MessagingService.current_version
+ && !Gossiper.instance.isFatClient(endpoint);
}
public static boolean isReadyForBootstrap()
@@@ -279,9 -288,9 +286,9 @@@
private static void pushSchemaMutation(InetAddress endpoint, Collection<RowMutation> schema)
{
-- MessageOut<Collection<RowMutation>> msg = new MessageOut<Collection<RowMutation>>(MessagingService.Verb.DEFINITIONS_UPDATE,
-- schema,
-- MigrationsSerializer.instance);
++ MessageOut<Collection<RowMutation>> msg = new MessageOut<>(MessagingService.Verb.DEFINITIONS_UPDATE,
++ schema,
++ MigrationsSerializer.instance);
MessagingService.instance().sendOneWay(msg, endpoint);
}
@@@ -328,37 -341,64 +339,33 @@@
{
logger.info("Starting local schema reset...");
- if (logger.isDebugEnabled())
- logger.debug("Truncating schema tables...");
- try
- {
- logger.debug("Truncating schema tables...");
++ logger.debug("Truncating schema tables...");
- // truncate schema tables
- FBUtilities.waitOnFutures(new ArrayList<Future<?>>(3)
- {{
- SystemTable.schemaCFS(SystemTable.SCHEMA_KEYSPACES_CF).truncate();
- SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNFAMILIES_CF).truncate();
- SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNS_CF).truncate();
- }});
+ // truncate schema tables
+ SystemKeyspace.schemaCFS(SystemKeyspace.SCHEMA_KEYSPACES_CF).truncateBlocking();
+ SystemKeyspace.schemaCFS(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF).truncateBlocking();
+ SystemKeyspace.schemaCFS(SystemKeyspace.SCHEMA_COLUMNS_CF).truncateBlocking();
+ SystemKeyspace.schemaCFS(SystemKeyspace.SCHEMA_TRIGGERS_CF).truncateBlocking();
- if (logger.isDebugEnabled())
-- logger.debug("Clearing local schema keyspace definitions...");
++ logger.debug("Clearing local schema keyspace definitions...");
- Schema.instance.clear();
+ Schema.instance.clear();
- Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers();
- liveEndpoints.remove(FBUtilities.getBroadcastAddress());
+ Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers();
+ liveEndpoints.remove(FBUtilities.getBroadcastAddress());
- // force migration is there are nodes around, first of all
- // check if there are nodes with versions >= 1.1.7 to request migrations from,
- // because migration format of the nodes with versions < 1.1 is incompatible with older versions
- // and due to broken timestamps in versions prior to 1.1.7
- // force migration if there are nodes around
- for (InetAddress node : liveEndpoints)
++ // force migration if there are nodes around
+ for (InetAddress node : liveEndpoints)
+ {
- if (logger.isDebugEnabled())
- logger.debug("Requesting schema from " + node);
-
- FBUtilities.waitOnFuture(StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(node)));
- break;
++ if (shouldPullSchemaFrom(node))
+ {
- if (shouldPullSchemaFrom(node))
- {
- logger.debug("Requesting schema from {}", node);
- FBUtilities.waitOnFuture(submitMigrationTask(node));
- break;
- }
++ logger.debug("Requesting schema from {}", node);
++ FBUtilities.waitOnFuture(submitMigrationTask(node));
++ break;
+ }
-
- logger.info("Local schema reset is complete.");
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
}
- catch (ExecutionException e)
- {
- throw new RuntimeException(e);
- }
- }
- /**
- * Used only in case node has old style migration schema (newly updated)
- * @return the UUID identifying version of the last applied migration
- */
- @Deprecated
- public static UUID getLastMigrationId()
- {
- DecoratedKey dkey = StorageService.getPartitioner().decorateKey(LAST_MIGRATION_KEY);
- Table defs = Table.open(Table.SYSTEM_KS);
- ColumnFamilyStore cfStore = defs.getColumnFamilyStore(DefsTable.OLD_SCHEMA_CF);
- QueryFilter filter = QueryFilter.getNamesFilter(dkey, new QueryPath(DefsTable.OLD_SCHEMA_CF), LAST_MIGRATION_KEY);
- ColumnFamily cf = cfStore.getColumnFamily(filter);
- if (cf == null || cf.getColumnNames().size() == 0)
- return null;
- else
- return UUIDGen.getUUID(cf.getColumn(LAST_MIGRATION_KEY).value());
+ logger.info("Local schema reset is complete.");
}
public static class MigrationsSerializer implements IVersionedSerializer<Collection<RowMutation>>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25c9293f/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/MigrationTask.java
index df216ea,ac7c6be..93572f0
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@@ -48,7 -48,7 +48,7 @@@ class MigrationTask extends WrappedRunn
public void runMayThrow() throws Exception
{
-- MessageOut message = new MessageOut(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
++ MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
if (!FailureDetector.instance.isAlive(endpoint))
{