You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/02/05 01:36:37 UTC

[6/8] git commit: Move handling of migration event source to solve bootstrap race. Patch by Sergio Bossa and brandonwilliams, reviewed by Tyler Hobbs for CASSANDRA-6648

Move handling of migration event source to solve bootstrap race.
Patch by Sergio Bossa and brandonwilliams, reviewed by Tyler Hobbs for
CASSANDRA-6648


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/93bd89fb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/93bd89fb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/93bd89fb

Branch: refs/heads/trunk
Commit: 93bd89fbba7d76821d3d85ae371cb2e665176b6a
Parents: 34112ef
Author: Brandon Williams <br...@apache.org>
Authored: Tue Feb 4 18:33:44 2014 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Feb 4 18:33:44 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../cassandra/service/MigrationManager.java     | 33 +++-----------------
 .../cassandra/service/StorageService.java       |  7 ++---
 3 files changed, 9 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/93bd89fb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b1fade1..d3a78f7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -2,6 +2,8 @@
  * Fix direct Memory on architectures that do not support unaligned long access
    (CASSANDRA-6628)
  * Let scrub optionally skip broken counter partitions (CASSANDRA-5930)
+Merged from 1.2:
+ * Move handling of migration event source to solve bootstrap race. (CASSANDRA-6648)
 
 
 2.0.5

http://git-wip-us.apache.org/repos/asf/cassandra/blob/93bd89fb/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 0ffc7c4..b463116 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -50,7 +50,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
 
-public class MigrationManager implements IEndpointStateChangeSubscriber
+public class MigrationManager
 {
     private static final Logger logger = LoggerFactory.getLogger(MigrationManager.class);
 
@@ -63,7 +63,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
     public static final int MIGRATION_DELAY_IN_MS = 60000;
 
     private final List<IMigrationListener> listeners = new CopyOnWriteArrayList<IMigrationListener>();
-
+    
     private MigrationManager() {}
 
     public void register(IMigrationListener listener)
@@ -76,37 +76,14 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
         listeners.remove(listener);
     }
 
-    public void onJoin(InetAddress endpoint, EndpointState epState)
-    {}
-    
-    public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) 
-    {}
-
-    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
-    {
-        if (state != ApplicationState.SCHEMA || endpoint.equals(FBUtilities.getBroadcastAddress()))
-            return;
-
-        maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint);
-    }
-
-    public void onAlive(InetAddress endpoint, EndpointState state)
+    public void scheduleSchemaPull(InetAddress endpoint, EndpointState state)
     {
         VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA);
 
-        if (value != null)
+        if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && value != null)
             maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint);
     }
 
-    public void onDead(InetAddress endpoint, EndpointState state)
-    {}
-
-    public void onRestart(InetAddress endpoint, EndpointState state)
-    {}
-
-    public void onRemove(InetAddress endpoint)
-    {}
-
     /**
      * If versions differ this node sends request with local migration list to the endpoint
      * and expecting to receive a list of migrations to apply locally.
@@ -166,7 +143,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
 
     public static boolean isReadyForBootstrap()
     {
-        return ((ThreadPoolExecutor) StageManager.getStage(Stage.MIGRATION)).getActiveCount() == 0;
+        return Schema.instance.getVersion() != null && !Schema.emptyVersion.equals(Schema.instance.getVersion());
     }
 
     public void notifyCreateKeyspace(KSMetaData ksm)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/93bd89fb/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 5f9657d..c222570 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -179,8 +179,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     private static enum Mode { STARTING, NORMAL, CLIENT, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED, RELOCATING }
     private Mode operationMode = Mode.STARTING;
 
-    private final MigrationManager migrationManager = MigrationManager.instance;
-
     /* Used for tracking drain progress */
     private volatile int totalCFs, remainingCFs;
 
@@ -367,7 +365,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public void stopClient()
     {
-        Gossiper.instance.unregister(migrationManager);
         Gossiper.instance.unregister(this);
         Gossiper.instance.stop();
         MessagingService.instance().shutdown();
@@ -473,7 +470,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         logger.info("Starting up client gossip");
         setMode(Mode.CLIENT, false);
         Gossiper.instance.register(this);
-        Gossiper.instance.register(migrationManager);
         Gossiper.instance.start((int) (System.currentTimeMillis() / 1000)); // needed for node-ring gathering.
         Gossiper.instance.addLocalApplicationState(ApplicationState.NET_VERSION, valueFactory.networkVersion());
 
@@ -630,7 +626,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         appStates.put(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion());
         logger.info("Starting up server gossip");
         Gossiper.instance.register(this);
-        Gossiper.instance.register(migrationManager);
         Gossiper.instance.start(SystemKeyspace.incrementAndGetGeneration(), appStates); // needed for node-ring gathering.
         // gossip snitch infos (local DC and rack)
         gossipSnitchInfo();
@@ -1964,6 +1959,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         {
             onChange(endpoint, entry.getKey(), entry.getValue());
         }
+        MigrationManager.instance.scheduleSchemaPull(endpoint, epState);
     }
 
     public void onAlive(InetAddress endpoint, EndpointState state)
@@ -1982,6 +1978,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
                 subscriber.onJoinCluster(endpoint);
         }
+        MigrationManager.instance.scheduleSchemaPull(endpoint, state);
     }
 
     public void onRemove(InetAddress endpoint)