You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2015/03/13 14:06:22 UTC
[2/6] cassandra git commit: Fix duplicate up/down messages sent to
native clients
Fix duplicate up/down messages sent to native clients
Patch by Stefania, reviewed by brandonwilliams for CASSANDRA-7816
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2199a87a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2199a87a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2199a87a
Branch: refs/heads/cassandra-2.1
Commit: 2199a87aab8322c41f1b590c0fd8f08f448952ca
Parents: 77c66bf
Author: Brandon Williams <br...@apache.org>
Authored: Fri Mar 13 08:02:12 2015 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Mar 13 08:02:12 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/gms/EndpointState.java | 12 ++++++++++
src/java/org/apache/cassandra/gms/Gossiper.java | 25 +++++++++++++++-----
3 files changed, 32 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2199a87a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 382b3dd..8843908 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.14:
+ * Fix duplicate up/down messages sent to native clients (CASSANDRA-7816)
* Expose commit log archive status via JMX (CASSANDRA-8734)
* Provide better exceptions for invalid replication strategy parameters
(CASSANDRA-8909)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2199a87a/src/java/org/apache/cassandra/gms/EndpointState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java
index 3df9155..518e575 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -46,12 +46,14 @@ public class EndpointState
/* fields below do not get serialized */
private volatile long updateTimestamp;
private volatile boolean isAlive;
+ private volatile boolean hasPendingEcho;
EndpointState(HeartBeatState initialHbState)
{
hbState = initialHbState;
updateTimestamp = System.nanoTime();
isAlive = true;
+ hasPendingEcho = false;
}
HeartBeatState getHeartBeatState()
@@ -113,6 +115,16 @@ public class EndpointState
isAlive = false;
}
+ public boolean hasPendingEcho()
+ {
+ return hasPendingEcho;
+ }
+
+ public void markPendingEcho(boolean val)
+ {
+ hasPendingEcho = val;
+ }
+
public String toString()
{
return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2199a87a/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index a478405..97dc506 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -29,6 +29,7 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.cassandra.utils.Pair;
@@ -48,8 +49,6 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
-import com.google.common.collect.ImmutableList;
-
/**
* This module is responsible for Gossiping information for the local endpoint. This abstraction
* maintains the list of live and dead endpoints. Periodically i.e. every 1 second this module
@@ -878,6 +877,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return;
}
+ if (localState.hasPendingEcho())
+ {
+ logger.debug("{} has already a pending echo, skipping it", localState);
+ return;
+ }
+
localState.markDead();
MessageOut<EchoMessage> echoMessage = new MessageOut<EchoMessage>(MessagingService.Verb.ECHO, new EchoMessage(), EchoMessage.serializer);
@@ -891,9 +896,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
public void response(MessageIn msg)
{
+ localState.markPendingEcho(false);
realMarkAlive(addr, localState);
}
};
+
+ localState.markPendingEcho(true);
MessagingService.instance().sendRR(echoMessage, addr, echoHandler);
}
@@ -936,9 +944,10 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
*/
private void handleMajorStateChange(InetAddress ep, EndpointState epState)
{
+ EndpointState localEpState = endpointStateMap.get(ep);
if (!isDeadState(epState))
{
- if (endpointStateMap.get(ep) != null)
+ if (localEpState != null)
logger.info("Node {} has restarted, now UP", ep);
else
logger.info("Node {} is now part of the cluster", ep);
@@ -947,9 +956,11 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
logger.trace("Adding endpoint state for " + ep);
endpointStateMap.put(ep, epState);
- // the node restarted: it is up to the subscriber to take whatever action is necessary
- for (IEndpointStateChangeSubscriber subscriber : subscribers)
- subscriber.onRestart(ep, epState);
+ if (localEpState != null)
+ { // the node restarted: it is up to the subscriber to take whatever action is necessary
+ for (IEndpointStateChangeSubscriber subscriber : subscribers)
+ subscriber.onRestart(ep, localEpState);
+ }
if (!isDeadState(epState))
markAlive(ep, epState);
@@ -994,6 +1005,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
EndpointState localEpStatePtr = endpointStateMap.get(ep);
EndpointState remoteState = entry.getValue();
+
/*
If state does not exist just add it. If it does then add it if the remote generation is greater.
If there is a generation tie, attempt to break it by heartbeat version.
@@ -1024,6 +1036,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
}
else if (logger.isTraceEnabled())
logger.trace("Ignoring remote version " + remoteMaxVersion + " <= " + localMaxVersion + " for " + ep);
+
if (!localEpStatePtr.isAlive() && !isDeadState(localEpStatePtr)) // unless of course, it was dead
markAlive(ep, localEpStatePtr);
}