You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/12/07 22:44:14 UTC
[7/14] git commit: Pull schema immediately when bootstrapping. Patch
by Chris Herron and brandonwilliams, reviewed by jbellis for CASSANDRA-5025
Pull schema immediately when bootstrapping.
Patch by Chris Herron and brandonwilliams, reviewed by jbellis for
CASSANDRA-5025
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/442a7b3a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/442a7b3a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/442a7b3a
Branch: refs/heads/trunk
Commit: 442a7b3a60c5cc765221cc6f07add3546a8e9c3d
Parents: 8d55474
Author: Brandon Williams <br...@apache.org>
Authored: Fri Dec 7 15:31:25 2012 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Dec 7 15:31:25 2012 -0600
----------------------------------------------------------------------
.../apache/cassandra/service/MigrationManager.java | 57 +++++++++------
.../apache/cassandra/service/StorageService.java | 7 ++-
2 files changed, 41 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/442a7b3a/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 102ea12..72a9a84 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.ArrayList;
@@ -92,6 +93,10 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
public void onRemove(InetAddress endpoint)
{}
+ /**
+ * If versions differ this node sends request with local migration list to the endpoint
+ * and expecting to receive a list of migrations to apply locally.
+ */
private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint)
{
// Can't request migrations from nodes with versions younger than 1.1.7
@@ -101,29 +106,39 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
if (Schema.instance.getVersion().equals(theirVersion))
return;
- // check our schema vs theirs, after a delay to make sure we have a chance to apply any changes
- // being pushed out simultaneously. See CASSANDRA-5025
- Runnable runnable = new Runnable()
+ if (Schema.emptyVersion.equals(Schema.instance.getVersion()))
{
- public void run()
+ // If we think we may be bootstrapping, submit MigrationTask immediately
+ submitMigrationTask(endpoint);
+ }
+ else
+ {
+ // Include a delay to make sure we have a chance to apply any changes being
+ // pushed out simultaneously. See CASSANDRA-5025
+ Runnable runnable = new Runnable()
{
- // grab the latest version of the schema since it may have changed again since the initial scheduling
- VersionedValue value = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA);
- UUID currentVersion = UUID.fromString(value.value);
- if (Schema.instance.getVersion().equals(currentVersion))
- return;
-
- /**
- * if versions differ this node sends request with local migration list to the endpoint
- * and expecting to receive a list of migrations to apply locally.
- *
- * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are
- * running in the gossip stage.
- */
- StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
- }
- };
- StorageService.optionalTasks.schedule(runnable, 1, TimeUnit.MINUTES);
+ public void run()
+ {
+ // grab the latest version of the schema since it may have changed again since the initial scheduling
+ VersionedValue value = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA);
+ UUID currentVersion = UUID.fromString(value.value);
+ if (Schema.instance.getVersion().equals(currentVersion))
+ return;
+
+ submitMigrationTask(endpoint);
+ }
+ };
+ StorageService.optionalTasks.schedule(runnable, 1, TimeUnit.MINUTES);
+ }
+ }
+
+ private static void submitMigrationTask(InetAddress endpoint)
+ {
+ /*
+ * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are
+ * running in the gossip stage.
+ */
+ StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
}
public static boolean isReadyForBootstrap()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/442a7b3a/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 30da45c..d041279 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -544,8 +544,11 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
Gossiper.instance.register(this);
Gossiper.instance.register(migrationManager);
Gossiper.instance.start(SystemTable.incrementAndGetGeneration()); // needed for node-ring gathering.
- // gossip schema version when gossiper is running
- Schema.instance.updateVersionAndAnnounce();
+
+ // gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull)
+ Schema.instance.updateVersion(); // Ensure we know our own actual Schema UUID in preparation for updates
+ MigrationManager.passiveAnnounce(Schema.emptyVersion);
+
// add rpc listening info
Gossiper.instance.addLocalApplicationState(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(DatabaseDescriptor.getRpcAddress()));
if (null != DatabaseDescriptor.getReplaceToken())