You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/09/18 23:38:03 UTC

[2/6] git commit: Allow propagating multiple gossip states atomically

Allow propagating multiple gossip states atomically

Patch by brandonwilliams, reviewed by jasobrown for CASSANDRA-6125


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3f79af23
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3f79af23
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3f79af23

Branch: refs/heads/cassandra-2.1
Commit: 3f79af23e735571bf1a14fd73c81942ef2b6203a
Parents: cbc705d
Author: Brandon Williams <br...@apache.org>
Authored: Thu Sep 18 14:36:31 2014 +0000
Committer: Brandon Williams <br...@apache.org>
Committed: Thu Sep 18 14:36:31 2014 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/gms/Gossiper.java | 26 ++++++++++++++++++++
 .../cassandra/service/StorageService.java       | 23 +++++++++--------
 3 files changed, 40 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f79af23/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c9d507a..fcf229d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.11:
+ * Allow propagating multiple gossip states atomically (CASSANDRA-6125)
  * Log exceptions related to unclean native protocol client disconnects
    at DEBUG or INFO (CASSANDRA-7849)
  * Allow permissions cache to be set via JMX (CASSANDRA-7698)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f79af23/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 4514da7..eb0cf39 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -23,6 +23,7 @@ import java.net.UnknownHostException;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.*;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -30,6 +31,7 @@ import javax.management.ObjectName;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.Uninterruptibles;
 
+import org.apache.cassandra.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,6 +74,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
                                                           VersionedValue.STATUS_LEFT, VersionedValue.HIBERNATE);
 
     private ScheduledFuture<?> scheduledGossipTask;
+    private static final ReentrantLock taskLock = new ReentrantLock();
     public final static int intervalInMillis = 1000;
     public final static int QUARANTINE_DELAY = StorageService.RING_DELAY * 2;
     private static final Logger logger = LoggerFactory.getLogger(Gossiper.class);
@@ -124,6 +127,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
                 //wait on messaging service to start listening
                 MessagingService.instance().waitUntilListening();
 
+                taskLock.lock();
+
                 /* Update the local heartbeat counter. */
                 endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().updateHeartBeat();
                 if (logger.isTraceEnabled())
@@ -171,6 +176,10 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
             {
                 logger.error("Gossip error", e);
             }
+            finally
+            {
+                taskLock.unlock();
+            }
         }
     }
 
@@ -1274,6 +1283,23 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         doOnChangeNotifications(epAddr, state, value);
     }
 
+    public void addLocalApplicationStates(List<Pair<ApplicationState, VersionedValue>> states)
+    {
+        taskLock.lock();
+        try
+        {
+            for (Pair<ApplicationState, VersionedValue> pair : states)
+            {
+               addLocalApplicationState(pair.left, pair.right);
+            }
+        }
+        finally
+        {
+            taskLock.unlock();
+        }
+
+    }
+
     public void stop()
     {
     	if (scheduledGossipTask != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f79af23/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 7f92034..f693a0b 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -203,10 +203,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             logger.debug("Setting tokens to {}", tokens);
         SystemKeyspace.updateTokens(tokens);
         tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
-        // order is important here, the gossiper can fire in between adding these two states.  It's ok to send TOKENS without STATUS, but *not* vice versa.
         Collection<Token> localTokens = getLocalTokens();
-        Gossiper.instance.addLocalApplicationState(ApplicationState.TOKENS, valueFactory.tokens(localTokens));
-        Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.normal(localTokens));
+        List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<Pair<ApplicationState, VersionedValue>>();
+        states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(localTokens)));
+        states.add(Pair.create(ApplicationState.STATUS, valueFactory.normal(localTokens)));
+        Gossiper.instance.addLocalApplicationStates(states);
         setMode(Mode.NORMAL, false);
     }
 
@@ -619,8 +620,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             {
                 tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
                 // order is important here, the gossiper can fire in between adding these two states.  It's ok to send TOKENS without STATUS, but *not* vice versa.
-                Gossiper.instance.addLocalApplicationState(ApplicationState.TOKENS, valueFactory.tokens(tokens));
-                Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.hibernate(true));
+                List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<Pair<ApplicationState, VersionedValue>>();
+                states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens)));
+                states.add(Pair.create(ApplicationState.STATUS, valueFactory.hibernate(true)));
+                Gossiper.instance.addLocalApplicationStates(states);
             }
             logger.info("Not joining ring as requested. Use JMX (StorageService->joinRing()) to initiate ring joining");
         }
@@ -648,8 +651,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 if (!DatabaseDescriptor.isAutoBootstrap())
                     throw new RuntimeException("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration");
                 bootstrapTokens = prepareReplacementInfo();
-                appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
                 appStates.put(ApplicationState.TOKENS, valueFactory.tokens(bootstrapTokens));
+                appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
             }
             else if (shouldBootstrap())
             {
@@ -976,10 +979,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         if (!DatabaseDescriptor.isReplacing())
         {
             // if not an existing token then bootstrap
-            // order is important here, the gossiper can fire in between adding these two states.  It's ok to send TOKENS without STATUS, but *not* vice versa.
-            Gossiper.instance.addLocalApplicationState(ApplicationState.TOKENS, valueFactory.tokens(tokens));
-            Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS,
-                                                       valueFactory.bootstrapping(tokens));
+            List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<Pair<ApplicationState, VersionedValue>>();
+            states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens)));
+            states.add(Pair.create(ApplicationState.STATUS, valueFactory.bootstrapping(tokens)));
+            Gossiper.instance.addLocalApplicationStates(states);
             setMode(Mode.JOINING, "sleeping " + RING_DELAY + " ms for pending range setup", true);
             Uninterruptibles.sleepUninterruptibly(RING_DELAY, TimeUnit.MILLISECONDS);
         }