You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/02/05 01:36:37 UTC
[6/8] git commit: Move handling of migration event source to solve
bootstrap race. Patch by Sergio Bossa and brandonwilliams,
reviewed by Tyler Hobbs for CASSANDRA-6648
Move handling of migration event source to solve bootstrap race.
Patch by Sergio Bossa and brandonwilliams, reviewed by Tyler Hobbs for
CASSANDRA-6648
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/93bd89fb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/93bd89fb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/93bd89fb
Branch: refs/heads/trunk
Commit: 93bd89fbba7d76821d3d85ae371cb2e665176b6a
Parents: 34112ef
Author: Brandon Williams <br...@apache.org>
Authored: Tue Feb 4 18:33:44 2014 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Feb 4 18:33:44 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../cassandra/service/MigrationManager.java | 33 +++-----------------
.../cassandra/service/StorageService.java | 7 ++---
3 files changed, 9 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/93bd89fb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b1fade1..d3a78f7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -2,6 +2,8 @@
* Fix direct Memory on architectures that do not support unaligned long access
(CASSANDRA-6628)
* Let scrub optionally skip broken counter partitions (CASSANDRA-5930)
+Merged from 1.2:
+ * Move handling of migration event source to solve bootstrap race. (CASSANDRA-6648)
2.0.5
http://git-wip-us.apache.org/repos/asf/cassandra/blob/93bd89fb/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 0ffc7c4..b463116 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -50,7 +50,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
-public class MigrationManager implements IEndpointStateChangeSubscriber
+public class MigrationManager
{
private static final Logger logger = LoggerFactory.getLogger(MigrationManager.class);
@@ -63,7 +63,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
public static final int MIGRATION_DELAY_IN_MS = 60000;
private final List<IMigrationListener> listeners = new CopyOnWriteArrayList<IMigrationListener>();
-
+
private MigrationManager() {}
public void register(IMigrationListener listener)
@@ -76,37 +76,14 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
listeners.remove(listener);
}
- public void onJoin(InetAddress endpoint, EndpointState epState)
- {}
-
- public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue)
- {}
-
- public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
- {
- if (state != ApplicationState.SCHEMA || endpoint.equals(FBUtilities.getBroadcastAddress()))
- return;
-
- maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint);
- }
-
- public void onAlive(InetAddress endpoint, EndpointState state)
+ public void scheduleSchemaPull(InetAddress endpoint, EndpointState state)
{
VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA);
- if (value != null)
+ if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && value != null)
maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint);
}
- public void onDead(InetAddress endpoint, EndpointState state)
- {}
-
- public void onRestart(InetAddress endpoint, EndpointState state)
- {}
-
- public void onRemove(InetAddress endpoint)
- {}
-
/**
* If versions differ this node sends request with local migration list to the endpoint
* and expecting to receive a list of migrations to apply locally.
@@ -166,7 +143,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
public static boolean isReadyForBootstrap()
{
- return ((ThreadPoolExecutor) StageManager.getStage(Stage.MIGRATION)).getActiveCount() == 0;
+ return Schema.instance.getVersion() != null && !Schema.emptyVersion.equals(Schema.instance.getVersion());
}
public void notifyCreateKeyspace(KSMetaData ksm)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/93bd89fb/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 5f9657d..c222570 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -179,8 +179,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private static enum Mode { STARTING, NORMAL, CLIENT, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED, RELOCATING }
private Mode operationMode = Mode.STARTING;
- private final MigrationManager migrationManager = MigrationManager.instance;
-
/* Used for tracking drain progress */
private volatile int totalCFs, remainingCFs;
@@ -367,7 +365,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public void stopClient()
{
- Gossiper.instance.unregister(migrationManager);
Gossiper.instance.unregister(this);
Gossiper.instance.stop();
MessagingService.instance().shutdown();
@@ -473,7 +470,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
logger.info("Starting up client gossip");
setMode(Mode.CLIENT, false);
Gossiper.instance.register(this);
- Gossiper.instance.register(migrationManager);
Gossiper.instance.start((int) (System.currentTimeMillis() / 1000)); // needed for node-ring gathering.
Gossiper.instance.addLocalApplicationState(ApplicationState.NET_VERSION, valueFactory.networkVersion());
@@ -630,7 +626,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
appStates.put(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion());
logger.info("Starting up server gossip");
Gossiper.instance.register(this);
- Gossiper.instance.register(migrationManager);
Gossiper.instance.start(SystemKeyspace.incrementAndGetGeneration(), appStates); // needed for node-ring gathering.
// gossip snitch infos (local DC and rack)
gossipSnitchInfo();
@@ -1964,6 +1959,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
onChange(endpoint, entry.getKey(), entry.getValue());
}
+ MigrationManager.instance.scheduleSchemaPull(endpoint, epState);
}
public void onAlive(InetAddress endpoint, EndpointState state)
@@ -1982,6 +1978,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
subscriber.onJoinCluster(endpoint);
}
+ MigrationManager.instance.scheduleSchemaPull(endpoint, state);
}
public void onRemove(InetAddress endpoint)