You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2015/03/30 16:19:02 UTC

cassandra git commit: Delay "node up" and "node added" notifications until native protocol server is started

Repository: cassandra
Updated Branches:
  refs/heads/trunk cc0247b12 -> ff5ed7a03


Delay "node up" and "node added" notifications until native protocol server is started

Patch by brandonwilliams and stefania, reviewed by brandonwilliams for
CASSANDRA-8236


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ff5ed7a0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ff5ed7a0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ff5ed7a0

Branch: refs/heads/trunk
Commit: ff5ed7a03f7b9968c0156b05226af67882e5670e
Parents: cc0247b
Author: Brandon Williams <br...@apache.org>
Authored: Mon Mar 30 09:18:26 2015 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Mar 30 09:18:26 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/gms/ApplicationState.java  |  1 +
 .../org/apache/cassandra/gms/EndpointState.java | 17 ++++
 src/java/org/apache/cassandra/gms/Gossiper.java | 15 ++--
 .../apache/cassandra/gms/VersionedValue.java    |  5 ++
 .../cassandra/service/CassandraDaemon.java      |  2 +
 .../cassandra/service/StorageService.java       | 83 +++++++++++++++---
 .../org/apache/cassandra/transport/Server.java  | 88 +++++++++++++++++---
 8 files changed, 186 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff5ed7a0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e66b724..8b95fb3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Delay "node up" and "node added" notifications until native protocol server is started (CASSANDRA-8236)
  * Compressed Commit Log (CASSANDRA-6809)
  * Optimise IntervalTree (CASSANDRA-8988)
  * Add a key-value payload for third party usage (CASSANDRA-8553)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff5ed7a0/src/java/org/apache/cassandra/gms/ApplicationState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/ApplicationState.java b/src/java/org/apache/cassandra/gms/ApplicationState.java
index 777dfc5..ade9208 100644
--- a/src/java/org/apache/cassandra/gms/ApplicationState.java
+++ b/src/java/org/apache/cassandra/gms/ApplicationState.java
@@ -33,6 +33,7 @@ public enum ApplicationState
     NET_VERSION,
     HOST_ID,
     TOKENS,
+    RPC_READY,
     // pad to allow adding new states to existing cluster
     X1,
     X2,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff5ed7a0/src/java/org/apache/cassandra/gms/EndpointState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java
index 1029374..0e6985a 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -114,6 +114,23 @@ public class EndpointState
         isAlive = false;
     }
 
+    public boolean isRpcReady()
+    {
+        VersionedValue rpcState = getApplicationState(ApplicationState.RPC_READY);
+        return rpcState != null && Boolean.parseBoolean(rpcState.value);
+    }
+
+    public String getStatus()
+    {
+        VersionedValue status = getApplicationState(ApplicationState.STATUS);
+        if (status == null)
+            return "";
+
+        String[] pieces = status.value.split(VersionedValue.DELIMITER_STR, -1);
+        assert (pieces.length > 0);
+        return pieces[0];
+    }
+
     public String toString()
     {
         return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff5ed7a0/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index ff1240a..07f2615 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -979,14 +979,19 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
             subscriber.onJoin(ep, epState);
     }
 
+    public boolean isAlive(InetAddress endpoint)
+    {
+        EndpointState epState = getEndpointStateForEndpoint(endpoint);
+        if (epState == null)
+            return false;
+        return epState.isAlive() && !isDeadState(epState);
+    }
+
     public boolean isDeadState(EndpointState epState)
     {
-        if (epState.getApplicationState(ApplicationState.STATUS) == null)
+        String state = epState.getStatus();
+        if (state.isEmpty())
             return false;
-        String value = epState.getApplicationState(ApplicationState.STATUS).value;
-        String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1);
-        assert (pieces.length > 0);
-        String state = pieces[0];
         for (String deadstate : DEAD_STATES)
         {
             if (state.equals(deadstate))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff5ed7a0/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java
index e8cf748..203f3a7 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -212,6 +212,11 @@ public class VersionedValue implements Comparable<VersionedValue>
             return new VersionedValue(VersionedValue.HIBERNATE + VersionedValue.DELIMITER + value);
         }
 
+        public VersionedValue rpcReady(boolean value)
+        {
+            return new VersionedValue(String.valueOf(value));
+        }
+
         public VersionedValue datacenter(String dcId)
         {
             return new VersionedValue(dcId);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff5ed7a0/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 74e356d..d6b2d24 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -425,7 +425,9 @@ public class CassandraDaemon
     {
         String nativeFlag = System.getProperty("cassandra.start_native_transport");
         if ((nativeFlag != null && Boolean.parseBoolean(nativeFlag)) || (nativeFlag == null && DatabaseDescriptor.startNativeTransport()))
+        {
             nativeServer.start();
+        }
         else
             logger.info("Not starting native transport as requested. Use JMX (StorageService->startNativeTransport()) or nodetool (enablebinary) to start it");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff5ed7a0/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index a75c08c..40686e5 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1541,6 +1541,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 case HOST_ID:
                     SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value.value));
                     break;
+                case RPC_READY:
+                    notifyRpcChange(endpoint, epState.isRpcReady());
+                    break;
             }
         }
     }
@@ -1587,6 +1590,71 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return vvalue.getBytes(ISO_8859_1);
     }
 
+    private void notifyRpcChange(InetAddress endpoint, boolean ready)
+    {
+        if (ready)
+        {
+            notifyUp(endpoint);
+            notifyJoined(endpoint);
+        }
+        else
+        {
+            notifyDown(endpoint);
+        }
+    }
+
+    private void notifyUp(InetAddress endpoint)
+    {
+        if (!isRpcReady(endpoint) || !Gossiper.instance.isAlive(endpoint))
+            return;
+
+        for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
+            subscriber.onUp(endpoint);
+    }
+
+    private void notifyDown(InetAddress endpoint)
+    {
+        for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
+            subscriber.onDown(endpoint);
+    }
+
+    private void notifyJoined(InetAddress endpoint)
+    {
+        if (!isRpcReady(endpoint) || !isStatus(endpoint, VersionedValue.STATUS_NORMAL))
+            return;
+
+        for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
+            subscriber.onJoinCluster(endpoint);
+    }
+
+    private void notifyMoved(InetAddress endpoint)
+    {
+        for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
+            subscriber.onMove(endpoint);
+    }
+
+    private void notifyLeft(InetAddress endpoint)
+    {
+        for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
+            subscriber.onLeaveCluster(endpoint);
+    }
+
+    private boolean isStatus(InetAddress endpoint, String status)
+    {
+        return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getStatus().equals(status);
+    }
+
+    private boolean isRpcReady(InetAddress endpoint)
+    {
+        return MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_30 ||
+                Gossiper.instance.getEndpointStateForEndpoint(endpoint).isRpcReady();
+    }
+
+    public void setRpcReady(boolean value)
+    {
+        Gossiper.instance.addLocalApplicationState(ApplicationState.RPC_READY, valueFactory.rpcReady(value));
+    }
+
     private Collection<Token> getTokensFor(InetAddress endpoint)
     {
         try
@@ -1756,13 +1824,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         if (isMoving || operationMode == Mode.MOVING)
         {
             tokenMetadata.removeFromMoving(endpoint);
-            for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
-                subscriber.onMove(endpoint);
+            notifyMoved(endpoint);
         }
         else
         {
-            for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
-                subscriber.onJoinCluster(endpoint);
+            notifyJoined(endpoint);
         }
 
         PendingRangeCalculatorService.instance.update();
@@ -1902,8 +1968,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         tokenMetadata.removeEndpoint(endpoint);
         tokenMetadata.removeBootstrapTokens(tokens);
 
-        for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
-            subscriber.onLeaveCluster(endpoint);
+        notifyLeft(endpoint);
         PendingRangeCalculatorService.instance.update();
     }
 
@@ -2118,8 +2183,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         if (tokenMetadata.isMember(endpoint))
         {
             HintedHandOffManager.instance.scheduleHintDelivery(endpoint, true);
-            for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
-                subscriber.onUp(endpoint);
+            notifyUp(endpoint);
         }
     }
 
@@ -2132,8 +2196,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     public void onDead(InetAddress endpoint, EndpointState state)
     {
         MessagingService.instance().convict(endpoint);
-        for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
-            subscriber.onDown(endpoint);
+        notifyDown(endpoint);
     }
 
     public void onRestart(InetAddress endpoint, EndpointState state)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff5ed7a0/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 3ef7162..c7c1bdb 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -180,6 +180,8 @@ public class Server implements CassandraDaemon.Server
 
         connectionTracker.allChannels.add(bindFuture.channel());
         isRunning.set(true);
+
+        StorageService.instance.setRpcReady(true);
     }
 
     private void registerMetrics()
@@ -204,6 +206,8 @@ public class Server implements CassandraDaemon.Server
         eventExecutorGroup.shutdown();
         eventExecutorGroup = null;
         logger.info("Stop listening for CQL clients");
+
+        StorageService.instance.setRpcReady(false);
     }
 
 
@@ -211,7 +215,7 @@ public class Server implements CassandraDaemon.Server
     {
         // TODO: should we be using the GlobalEventExecutor or defining our own?
         public final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
-        private final EnumMap<Event.Type, ChannelGroup> groups = new EnumMap<Event.Type, ChannelGroup>(Event.Type.class);
+        private final EnumMap<Event.Type, ChannelGroup> groups = new EnumMap<>(Event.Type.class);
 
         public ConnectionTracker()
         {
@@ -333,10 +337,48 @@ public class Server implements CassandraDaemon.Server
         }
     }
 
+    private static class LatestEvent
+    {
+        public final Event.StatusChange.Status status;
+        public final Event.TopologyChange.Change topology;
+
+        private LatestEvent(Event.StatusChange.Status status, Event.TopologyChange.Change topology)
+        {
+            this.status = status;
+            this.topology = topology;
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("Status %s, Topology %s", status, topology);
+        }
+
+        public static LatestEvent forStatusChange(Event.StatusChange.Status status, LatestEvent prev)
+        {
+            return new LatestEvent(status,
+                                   prev == null ?
+                                           null :
+                                           prev.topology);
+        }
+
+        public static LatestEvent forTopologyChange(Event.TopologyChange.Change change, LatestEvent prev)
+        {
+            return new LatestEvent(prev == null ?
+                                           null :
+                                           prev.status,
+                                           change);
+        }
+    }
+
     private static class EventNotifier extends MigrationListener implements IEndpointLifecycleSubscriber
     {
         private final Server server;
-        private final Map<InetAddress, Event.StatusChange.Status> lastStatusChange = new ConcurrentHashMap<>();
+
+        // We keep track of the latest events we have sent to avoid sending duplicates
+        // since StorageService may send duplicate notifications (CASSANDRA-7816, CASSANDRA-8236)
+        private final Map<InetAddress, LatestEvent> latestEvents = new ConcurrentHashMap<>();
+
         private static final InetAddress bindAll;
         static {
             try
@@ -376,31 +418,55 @@ public class Server implements CassandraDaemon.Server
 
         public void onJoinCluster(InetAddress endpoint)
         {
-            server.connectionTracker.send(Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort()));
+            onTopologyChange(endpoint, Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort()));
         }
 
         public void onLeaveCluster(InetAddress endpoint)
         {
-            server.connectionTracker.send(Event.TopologyChange.removedNode(getRpcAddress(endpoint), server.socket.getPort()));
+            onTopologyChange(endpoint, Event.TopologyChange.removedNode(getRpcAddress(endpoint), server.socket.getPort()));
         }
 
         public void onMove(InetAddress endpoint)
         {
-            server.connectionTracker.send(Event.TopologyChange.movedNode(getRpcAddress(endpoint), server.socket.getPort()));
+            onTopologyChange(endpoint, Event.TopologyChange.movedNode(getRpcAddress(endpoint), server.socket.getPort()));
         }
 
         public void onUp(InetAddress endpoint)
         {
-            Event.StatusChange.Status prev = lastStatusChange.put(endpoint, Event.StatusChange.Status.UP);
-            if (prev == null || prev != Event.StatusChange.Status.UP)
-                server.connectionTracker.send(Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort()));
+            onStatusChange(endpoint, Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort()));
         }
 
         public void onDown(InetAddress endpoint)
         {
-            Event.StatusChange.Status prev = lastStatusChange.put(endpoint, Event.StatusChange.Status.DOWN);
-            if (prev == null || prev != Event.StatusChange.Status.DOWN)
-                server.connectionTracker.send(Event.StatusChange.nodeDown(getRpcAddress(endpoint), server.socket.getPort()));
+            onStatusChange(endpoint, Event.StatusChange.nodeDown(getRpcAddress(endpoint), server.socket.getPort()));
+        }
+
+        private void onTopologyChange(InetAddress endpoint, Event.TopologyChange event)
+        {
+            if (logger.isTraceEnabled())
+                logger.trace("Topology changed event : {}, {}", endpoint, event.change);
+
+            LatestEvent prev = latestEvents.get(endpoint);
+            if (prev == null || prev.topology != event.change)
+            {
+                LatestEvent ret = latestEvents.put(endpoint, LatestEvent.forTopologyChange(event.change, prev));
+                if (ret == prev)
+                    server.connectionTracker.send(event);
+            }
+        }
+
+        private void onStatusChange(InetAddress endpoint, Event.StatusChange event)
+        {
+            if (logger.isTraceEnabled())
+                logger.trace("Status changed event : {}, {}", endpoint, event.status);
+
+            LatestEvent prev = latestEvents.get(endpoint);
+            if (prev == null || prev.status != event.status)
+            {
+                LatestEvent ret = latestEvents.put(endpoint, LatestEvent.forStatusChange(event.status, prev));
+                if (ret == prev)
+                    server.connectionTracker.send(event);
+            }
         }
 
         public void onCreateKeyspace(String ksName)