You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2015/03/30 16:19:02 UTC
cassandra git commit: Delay "node up" and "node added" notifications
until native protocol server is started
Repository: cassandra
Updated Branches:
refs/heads/trunk cc0247b12 -> ff5ed7a03
Delay "node up" and "node added" notifications until native protocol server is started
Patch by brandonwilliams and stefania, reviewed by brandonwilliams for
CASSANDRA-8236
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ff5ed7a0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ff5ed7a0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ff5ed7a0
Branch: refs/heads/trunk
Commit: ff5ed7a03f7b9968c0156b05226af67882e5670e
Parents: cc0247b
Author: Brandon Williams <br...@apache.org>
Authored: Mon Mar 30 09:18:26 2015 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Mar 30 09:18:26 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/gms/ApplicationState.java | 1 +
.../org/apache/cassandra/gms/EndpointState.java | 17 ++++
src/java/org/apache/cassandra/gms/Gossiper.java | 15 ++--
.../apache/cassandra/gms/VersionedValue.java | 5 ++
.../cassandra/service/CassandraDaemon.java | 2 +
.../cassandra/service/StorageService.java | 83 +++++++++++++++---
.../org/apache/cassandra/transport/Server.java | 88 +++++++++++++++++---
8 files changed, 186 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff5ed7a0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e66b724..8b95fb3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0
+ * Delay "node up" and "node added" notifications until native protocol server is started (CASSANDRA-8236)
* Compressed Commit Log (CASSANDRA-6809)
* Optimise IntervalTree (CASSANDRA-8988)
* Add a key-value payload for third party usage (CASSANDRA-8553)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff5ed7a0/src/java/org/apache/cassandra/gms/ApplicationState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/ApplicationState.java b/src/java/org/apache/cassandra/gms/ApplicationState.java
index 777dfc5..ade9208 100644
--- a/src/java/org/apache/cassandra/gms/ApplicationState.java
+++ b/src/java/org/apache/cassandra/gms/ApplicationState.java
@@ -33,6 +33,7 @@ public enum ApplicationState
NET_VERSION,
HOST_ID,
TOKENS,
+ RPC_READY,
// pad to allow adding new states to existing cluster
X1,
X2,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff5ed7a0/src/java/org/apache/cassandra/gms/EndpointState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java
index 1029374..0e6985a 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -114,6 +114,23 @@ public class EndpointState
isAlive = false;
}
+ public boolean isRpcReady()
+ {
+ VersionedValue rpcState = getApplicationState(ApplicationState.RPC_READY);
+ return rpcState != null && Boolean.parseBoolean(rpcState.value);
+ }
+
+ public String getStatus()
+ {
+ VersionedValue status = getApplicationState(ApplicationState.STATUS);
+ if (status == null)
+ return "";
+
+ String[] pieces = status.value.split(VersionedValue.DELIMITER_STR, -1);
+ assert (pieces.length > 0);
+ return pieces[0];
+ }
+
public String toString()
{
return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff5ed7a0/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index ff1240a..07f2615 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -979,14 +979,19 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
subscriber.onJoin(ep, epState);
}
+ public boolean isAlive(InetAddress endpoint)
+ {
+ EndpointState epState = getEndpointStateForEndpoint(endpoint);
+ if (epState == null)
+ return false;
+ return epState.isAlive() && !isDeadState(epState);
+ }
+
public boolean isDeadState(EndpointState epState)
{
- if (epState.getApplicationState(ApplicationState.STATUS) == null)
+ String state = epState.getStatus();
+ if (state.isEmpty())
return false;
- String value = epState.getApplicationState(ApplicationState.STATUS).value;
- String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1);
- assert (pieces.length > 0);
- String state = pieces[0];
for (String deadstate : DEAD_STATES)
{
if (state.equals(deadstate))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff5ed7a0/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java
index e8cf748..203f3a7 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -212,6 +212,11 @@ public class VersionedValue implements Comparable<VersionedValue>
return new VersionedValue(VersionedValue.HIBERNATE + VersionedValue.DELIMITER + value);
}
+ public VersionedValue rpcReady(boolean value)
+ {
+ return new VersionedValue(String.valueOf(value));
+ }
+
public VersionedValue datacenter(String dcId)
{
return new VersionedValue(dcId);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff5ed7a0/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 74e356d..d6b2d24 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -425,7 +425,9 @@ public class CassandraDaemon
{
String nativeFlag = System.getProperty("cassandra.start_native_transport");
if ((nativeFlag != null && Boolean.parseBoolean(nativeFlag)) || (nativeFlag == null && DatabaseDescriptor.startNativeTransport()))
+ {
nativeServer.start();
+ }
else
logger.info("Not starting native transport as requested. Use JMX (StorageService->startNativeTransport()) or nodetool (enablebinary) to start it");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff5ed7a0/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index a75c08c..40686e5 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1541,6 +1541,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
case HOST_ID:
SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value.value));
break;
+ case RPC_READY:
+ notifyRpcChange(endpoint, epState.isRpcReady());
+ break;
}
}
}
@@ -1587,6 +1590,71 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return vvalue.getBytes(ISO_8859_1);
}
+ private void notifyRpcChange(InetAddress endpoint, boolean ready)
+ {
+ if (ready)
+ {
+ notifyUp(endpoint);
+ notifyJoined(endpoint);
+ }
+ else
+ {
+ notifyDown(endpoint);
+ }
+ }
+
+ private void notifyUp(InetAddress endpoint)
+ {
+ if (!isRpcReady(endpoint) || !Gossiper.instance.isAlive(endpoint))
+ return;
+
+ for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
+ subscriber.onUp(endpoint);
+ }
+
+ private void notifyDown(InetAddress endpoint)
+ {
+ for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
+ subscriber.onDown(endpoint);
+ }
+
+ private void notifyJoined(InetAddress endpoint)
+ {
+ if (!isRpcReady(endpoint) || !isStatus(endpoint, VersionedValue.STATUS_NORMAL))
+ return;
+
+ for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
+ subscriber.onJoinCluster(endpoint);
+ }
+
+ private void notifyMoved(InetAddress endpoint)
+ {
+ for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
+ subscriber.onMove(endpoint);
+ }
+
+ private void notifyLeft(InetAddress endpoint)
+ {
+ for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
+ subscriber.onLeaveCluster(endpoint);
+ }
+
+ private boolean isStatus(InetAddress endpoint, String status)
+ {
+ return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getStatus().equals(status);
+ }
+
+ private boolean isRpcReady(InetAddress endpoint)
+ {
+ return MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_30 ||
+ Gossiper.instance.getEndpointStateForEndpoint(endpoint).isRpcReady();
+ }
+
+ public void setRpcReady(boolean value)
+ {
+ Gossiper.instance.addLocalApplicationState(ApplicationState.RPC_READY, valueFactory.rpcReady(value));
+ }
+
private Collection<Token> getTokensFor(InetAddress endpoint)
{
try
@@ -1756,13 +1824,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (isMoving || operationMode == Mode.MOVING)
{
tokenMetadata.removeFromMoving(endpoint);
- for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
- subscriber.onMove(endpoint);
+ notifyMoved(endpoint);
}
else
{
- for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
- subscriber.onJoinCluster(endpoint);
+ notifyJoined(endpoint);
}
PendingRangeCalculatorService.instance.update();
@@ -1902,8 +1968,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
tokenMetadata.removeEndpoint(endpoint);
tokenMetadata.removeBootstrapTokens(tokens);
- for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
- subscriber.onLeaveCluster(endpoint);
+ notifyLeft(endpoint);
PendingRangeCalculatorService.instance.update();
}
@@ -2118,8 +2183,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (tokenMetadata.isMember(endpoint))
{
HintedHandOffManager.instance.scheduleHintDelivery(endpoint, true);
- for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
- subscriber.onUp(endpoint);
+ notifyUp(endpoint);
}
}
@@ -2132,8 +2196,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public void onDead(InetAddress endpoint, EndpointState state)
{
MessagingService.instance().convict(endpoint);
- for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
- subscriber.onDown(endpoint);
+ notifyDown(endpoint);
}
public void onRestart(InetAddress endpoint, EndpointState state)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff5ed7a0/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 3ef7162..c7c1bdb 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -180,6 +180,8 @@ public class Server implements CassandraDaemon.Server
connectionTracker.allChannels.add(bindFuture.channel());
isRunning.set(true);
+
+ StorageService.instance.setRpcReady(true);
}
private void registerMetrics()
@@ -204,6 +206,8 @@ public class Server implements CassandraDaemon.Server
eventExecutorGroup.shutdown();
eventExecutorGroup = null;
logger.info("Stop listening for CQL clients");
+
+ StorageService.instance.setRpcReady(false);
}
@@ -211,7 +215,7 @@ public class Server implements CassandraDaemon.Server
{
// TODO: should we be using the GlobalEventExecutor or defining our own?
public final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
- private final EnumMap<Event.Type, ChannelGroup> groups = new EnumMap<Event.Type, ChannelGroup>(Event.Type.class);
+ private final EnumMap<Event.Type, ChannelGroup> groups = new EnumMap<>(Event.Type.class);
public ConnectionTracker()
{
@@ -333,10 +337,48 @@ public class Server implements CassandraDaemon.Server
}
}
+ private static class LatestEvent
+ {
+ public final Event.StatusChange.Status status;
+ public final Event.TopologyChange.Change topology;
+
+ private LatestEvent(Event.StatusChange.Status status, Event.TopologyChange.Change topology)
+ {
+ this.status = status;
+ this.topology = topology;
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("Status %s, Topology %s", status, topology);
+ }
+
+ public static LatestEvent forStatusChange(Event.StatusChange.Status status, LatestEvent prev)
+ {
+ return new LatestEvent(status,
+ prev == null ?
+ null :
+ prev.topology);
+ }
+
+ public static LatestEvent forTopologyChange(Event.TopologyChange.Change change, LatestEvent prev)
+ {
+ return new LatestEvent(prev == null ?
+ null :
+ prev.status,
+ change);
+ }
+ }
+
private static class EventNotifier extends MigrationListener implements IEndpointLifecycleSubscriber
{
private final Server server;
- private final Map<InetAddress, Event.StatusChange.Status> lastStatusChange = new ConcurrentHashMap<>();
+
+ // We keep track of the latest events we have sent to avoid sending duplicates
+ // since StorageService may send duplicate notifications (CASSANDRA-7816, CASSANDRA-8236)
+ private final Map<InetAddress, LatestEvent> latestEvents = new ConcurrentHashMap<>();
+
private static final InetAddress bindAll;
static {
try
@@ -376,31 +418,55 @@ public class Server implements CassandraDaemon.Server
public void onJoinCluster(InetAddress endpoint)
{
- server.connectionTracker.send(Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort()));
+ onTopologyChange(endpoint, Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort()));
}
public void onLeaveCluster(InetAddress endpoint)
{
- server.connectionTracker.send(Event.TopologyChange.removedNode(getRpcAddress(endpoint), server.socket.getPort()));
+ onTopologyChange(endpoint, Event.TopologyChange.removedNode(getRpcAddress(endpoint), server.socket.getPort()));
}
public void onMove(InetAddress endpoint)
{
- server.connectionTracker.send(Event.TopologyChange.movedNode(getRpcAddress(endpoint), server.socket.getPort()));
+ onTopologyChange(endpoint, Event.TopologyChange.movedNode(getRpcAddress(endpoint), server.socket.getPort()));
}
public void onUp(InetAddress endpoint)
{
- Event.StatusChange.Status prev = lastStatusChange.put(endpoint, Event.StatusChange.Status.UP);
- if (prev == null || prev != Event.StatusChange.Status.UP)
- server.connectionTracker.send(Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort()));
+ onStatusChange(endpoint, Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort()));
}
public void onDown(InetAddress endpoint)
{
- Event.StatusChange.Status prev = lastStatusChange.put(endpoint, Event.StatusChange.Status.DOWN);
- if (prev == null || prev != Event.StatusChange.Status.DOWN)
- server.connectionTracker.send(Event.StatusChange.nodeDown(getRpcAddress(endpoint), server.socket.getPort()));
+ onStatusChange(endpoint, Event.StatusChange.nodeDown(getRpcAddress(endpoint), server.socket.getPort()));
+ }
+
+ private void onTopologyChange(InetAddress endpoint, Event.TopologyChange event)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Topology changed event : {}, {}", endpoint, event.change);
+
+ LatestEvent prev = latestEvents.get(endpoint);
+ if (prev == null || prev.topology != event.change)
+ {
+ LatestEvent ret = latestEvents.put(endpoint, LatestEvent.forTopologyChange(event.change, prev));
+ if (ret == prev)
+ server.connectionTracker.send(event);
+ }
+ }
+
+ private void onStatusChange(InetAddress endpoint, Event.StatusChange event)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Status changed event : {}, {}", endpoint, event.status);
+
+ LatestEvent prev = latestEvents.get(endpoint);
+ if (prev == null || prev.status != event.status)
+ {
+ LatestEvent ret = latestEvents.put(endpoint, LatestEvent.forStatusChange(event.status, prev));
+ if (ret == prev)
+ server.connectionTracker.send(event);
+ }
}
public void onCreateKeyspace(String ksName)