You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/09/18 23:38:03 UTC
[2/6] git commit: Allow propagating multiple gossip states atomically
Allow propagating multiple gossip states atomically
Patch by brandonwilliams, reviewed by jasobrown for CASSANDRA-6125
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3f79af23
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3f79af23
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3f79af23
Branch: refs/heads/cassandra-2.1
Commit: 3f79af23e735571bf1a14fd73c81942ef2b6203a
Parents: cbc705d
Author: Brandon Williams <br...@apache.org>
Authored: Thu Sep 18 14:36:31 2014 +0000
Committer: Brandon Williams <br...@apache.org>
Committed: Thu Sep 18 14:36:31 2014 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/gms/Gossiper.java | 26 ++++++++++++++++++++
.../cassandra/service/StorageService.java | 23 +++++++++--------
3 files changed, 40 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f79af23/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c9d507a..fcf229d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.11:
+ * Allow propagating multiple gossip states atomically (CASSANDRA-6125)
* Log exceptions related to unclean native protocol client disconnects
at DEBUG or INFO (CASSANDRA-7849)
* Allow permissions cache to be set via JMX (CASSANDRA-7698)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f79af23/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 4514da7..eb0cf39 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -23,6 +23,7 @@ import java.net.UnknownHostException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.*;
+import java.util.concurrent.locks.ReentrantLock;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -30,6 +31,7 @@ import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,6 +74,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
VersionedValue.STATUS_LEFT, VersionedValue.HIBERNATE);
private ScheduledFuture<?> scheduledGossipTask;
+ private static final ReentrantLock taskLock = new ReentrantLock();
public final static int intervalInMillis = 1000;
public final static int QUARANTINE_DELAY = StorageService.RING_DELAY * 2;
private static final Logger logger = LoggerFactory.getLogger(Gossiper.class);
@@ -124,6 +127,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
//wait on messaging service to start listening
MessagingService.instance().waitUntilListening();
+ taskLock.lock();
+
/* Update the local heartbeat counter. */
endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().updateHeartBeat();
if (logger.isTraceEnabled())
@@ -171,6 +176,10 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
{
logger.error("Gossip error", e);
}
+ finally
+ {
+ taskLock.unlock();
+ }
}
}
@@ -1274,6 +1283,23 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
doOnChangeNotifications(epAddr, state, value);
}
+ public void addLocalApplicationStates(List<Pair<ApplicationState, VersionedValue>> states)
+ {
+ taskLock.lock();
+ try
+ {
+ for (Pair<ApplicationState, VersionedValue> pair : states)
+ {
+ addLocalApplicationState(pair.left, pair.right);
+ }
+ }
+ finally
+ {
+ taskLock.unlock();
+ }
+
+ }
+
public void stop()
{
if (scheduledGossipTask != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f79af23/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 7f92034..f693a0b 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -203,10 +203,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
logger.debug("Setting tokens to {}", tokens);
SystemKeyspace.updateTokens(tokens);
tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
- // order is important here, the gossiper can fire in between adding these two states. It's ok to send TOKENS without STATUS, but *not* vice versa.
Collection<Token> localTokens = getLocalTokens();
- Gossiper.instance.addLocalApplicationState(ApplicationState.TOKENS, valueFactory.tokens(localTokens));
- Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.normal(localTokens));
+ List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<Pair<ApplicationState, VersionedValue>>();
+ states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(localTokens)));
+ states.add(Pair.create(ApplicationState.STATUS, valueFactory.normal(localTokens)));
+ Gossiper.instance.addLocalApplicationStates(states);
setMode(Mode.NORMAL, false);
}
@@ -619,8 +620,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
// order is important here, the gossiper can fire in between adding these two states. It's ok to send TOKENS without STATUS, but *not* vice versa.
- Gossiper.instance.addLocalApplicationState(ApplicationState.TOKENS, valueFactory.tokens(tokens));
- Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.hibernate(true));
+ List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<Pair<ApplicationState, VersionedValue>>();
+ states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens)));
+ states.add(Pair.create(ApplicationState.STATUS, valueFactory.hibernate(true)));
+ Gossiper.instance.addLocalApplicationStates(states);
}
logger.info("Not joining ring as requested. Use JMX (StorageService->joinRing()) to initiate ring joining");
}
@@ -648,8 +651,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (!DatabaseDescriptor.isAutoBootstrap())
throw new RuntimeException("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration");
bootstrapTokens = prepareReplacementInfo();
- appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
appStates.put(ApplicationState.TOKENS, valueFactory.tokens(bootstrapTokens));
+ appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
}
else if (shouldBootstrap())
{
@@ -976,10 +979,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (!DatabaseDescriptor.isReplacing())
{
// if not an existing token then bootstrap
- // order is important here, the gossiper can fire in between adding these two states. It's ok to send TOKENS without STATUS, but *not* vice versa.
- Gossiper.instance.addLocalApplicationState(ApplicationState.TOKENS, valueFactory.tokens(tokens));
- Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS,
- valueFactory.bootstrapping(tokens));
+ List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<Pair<ApplicationState, VersionedValue>>();
+ states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens)));
+ states.add(Pair.create(ApplicationState.STATUS, valueFactory.bootstrapping(tokens)));
+ Gossiper.instance.addLocalApplicationStates(states);
setMode(Mode.JOINING, "sleeping " + RING_DELAY + " ms for pending range setup", true);
Uninterruptibles.sleepUninterruptibly(RING_DELAY, TimeUnit.MILLISECONDS);
}