You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/12/06 23:15:42 UTC
[9/10] git commit: Improve schema propagation performance patch by
jbellis; reviewed by Chris Herron and xedin for CASSANDRA-5025
Improve schema propagation performance
patch by jbellis; reviewed by Chris Herron and xedin for CASSANDRA-5025
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8d55474f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8d55474f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8d55474f
Branch: refs/heads/cassandra-1.2.0
Commit: 8d55474febf31980f2eb158ae0ff9717b0761f1e
Parents: 5abeecc
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Dec 6 16:12:31 2012 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Dec 6 16:12:38 2012 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/service/MigrationManager.java | 39 ++++++++++-----
2 files changed, 28 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8d55474f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a0e3eb7..5d1fd8d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.1.8
+ * Improve schema propagation performance (CASSANDRA-5025)
* Fall back to old describe_splits if d_s_ex is not available (CASSANDRA-4803)
* Improve error reporting when streaming ranges fail (CASSANDRA-5009)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8d55474f/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 94c0dcc..102ea12 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -25,12 +25,12 @@ import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.ArrayList;
import java.util.Collection;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,7 +72,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
if (state != ApplicationState.SCHEMA || endpoint.equals(FBUtilities.getBroadcastAddress()))
return;
- rectifySchema(UUID.fromString(value.value), endpoint);
+ maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint);
}
public void onAlive(InetAddress endpoint, EndpointState state)
@@ -80,7 +80,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA);
if (value != null)
- rectifySchema(UUID.fromString(value.value), endpoint);
+ maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint);
}
public void onDead(InetAddress endpoint, EndpointState state)
@@ -92,7 +92,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
public void onRemove(InetAddress endpoint)
{}
- private static void rectifySchema(UUID theirVersion, final InetAddress endpoint)
+ private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint)
{
// Can't request migrations from nodes with versions younger than 1.1.7
if (Gossiper.instance.getVersion(endpoint) < MessagingService.VERSION_117)
@@ -101,14 +101,29 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
if (Schema.instance.getVersion().equals(theirVersion))
return;
- /**
- * if versions differ this node sends request with local migration list to the endpoint
- * and expecting to receive a list of migrations to apply locally.
- *
- * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are
- * running in the gossip stage.
- */
- StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
+ // check our schema vs theirs, after a delay to make sure we have a chance to apply any changes
+ // being pushed out simultaneously. See CASSANDRA-5025
+ Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ // grab the latest version of the schema since it may have changed again since the initial scheduling
+ VersionedValue value = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA);
+ UUID currentVersion = UUID.fromString(value.value);
+ if (Schema.instance.getVersion().equals(currentVersion))
+ return;
+
+ /**
+ * if versions differ this node sends request with local migration list to the endpoint
+ * and expecting to receive a list of migrations to apply locally.
+ *
+ * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are
+ * running in the gossip stage.
+ */
+ StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
+ }
+ };
+ StorageService.optionalTasks.schedule(runnable, 1, TimeUnit.MINUTES);
}
public static boolean isReadyForBootstrap()