You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/10/30 16:12:37 UTC

git commit: Fix binary protocol NEW_NODE event

Updated Branches:
  refs/heads/trunk 7aee6602d -> 63a8bfe03


Fix binary protocol NEW_NODE event

patch by slebresne; reviewed by yukim for CASSANDRA-4679


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/63a8bfe0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/63a8bfe0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/63a8bfe0

Branch: refs/heads/trunk
Commit: 63a8bfe030ece2497c839bb34148b73e416466f4
Parents: 7aee660
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Oct 30 16:11:27 2012 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Oct 30 16:11:27 2012 +0100

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 doc/native_protocol.spec                           |    7 ++
 .../gms/IEndpointStateChangeSubscriber.java        |    1 -
 .../apache/cassandra/service/CassandraDaemon.java  |   22 +++--
 .../service/IEndpointLifecycleSubscriber.java      |   67 +++++++++++++++
 .../apache/cassandra/service/StorageService.java   |   63 +++++++++++++-
 src/java/org/apache/cassandra/transport/Event.java |    7 ++-
 .../org/apache/cassandra/transport/Server.java     |   28 +++----
 .../service/AntiEntropyServiceTestAbstract.java    |    1 -
 9 files changed, 167 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/63a8bfe0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2b87c85..e25bfbf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -39,6 +39,7 @@
  * Fix list prepend logic (CQL3) (CASSANDRA-4835)
  * Add booleans as literals in CQL3 (CASSANDRA-4776)
  * Allow renaming PK columns in CQL3 (CASSANDRA-4822)
+ * Fix binary protocol NEW_NODE event (CASSANDRA-4679)
 Merged from 1.1:
  * fix get_paged_slice to wrap to next row correctly (CASSANDRA-4816)
  * fix indexing empty column values (CASSANDRA-4832)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63a8bfe0/doc/native_protocol.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol.spec b/doc/native_protocol.spec
index 5c84f71..f534cde 100644
--- a/doc/native_protocol.spec
+++ b/doc/native_protocol.spec
@@ -481,6 +481,13 @@ Table of Contents
 
   All EVENT message have a streamId of -1 (Section 2.3).
 
+  Please note that "NEW_NODE" and "UP" events are sent based on internal Gossip
+  communication and as such may be sent a short delay before the binary
+  protocol server on the newly up node is fully started. Clients are thus
+  advise to wait a short time before trying to connect to the node (1 seconds
+  should be enough), otherwise they may experience a connection refusal at
+  first.
+
 
 5. Compression
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63a8bfe0/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java b/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
index fff613b..2abee01 100644
--- a/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
+++ b/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
@@ -27,7 +27,6 @@ import java.net.InetAddress;
  * instance to decide what he does with this change. Not all modules maybe interested
  * in all state changes.
  */
-
 public interface IEndpointStateChangeSubscriber
 {
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63a8bfe0/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 03da7e3..7b8e7d8 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -300,16 +300,7 @@ public class CassandraDaemon
 
         // start server internals
         StorageService.instance.registerDaemon(this);
-        try
-        {
-            StorageService.instance.initServer();
-        }
-        catch (ConfigurationException e)
-        {
-            logger.error("Fatal configuration error", e);
-            System.err.println(e.getMessage() + "\nFatal configuration error; unable to start server.  See log for stacktrace.");
-            System.exit(1);
-        }
+        StorageService.instance.initServerLocally();
 
         Mx4jTool.maybeLoad();
 
@@ -357,6 +348,17 @@ public class CassandraDaemon
             nativeServer.start();
         else
             logger.info("Not starting native transport as requested. Use JMX (StorageService->startNativeTransport()) to start it");
+
+        try
+        {
+            StorageService.instance.maybeJoinRing(StorageService.RING_DELAY);
+        }
+        catch (ConfigurationException e)
+        {
+            logger.error("Fatal configuration error", e);
+            System.err.println(e.getMessage() + "\nFatal configuration error; unable to start server.  See log for stacktrace.");
+            System.exit(1);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63a8bfe0/src/java/org/apache/cassandra/service/IEndpointLifecycleSubscriber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IEndpointLifecycleSubscriber.java b/src/java/org/apache/cassandra/service/IEndpointLifecycleSubscriber.java
new file mode 100644
index 0000000..24cb3d7
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/IEndpointLifecycleSubscriber.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+
+/**
+ * Interface on which interested parties can be notified of high level endpoint
+ * state changes.
+ *
+ * Note that while IEndpointStateChangeSubscriber notify about gossip related
+ * changes (IEndpointStateChangeSubscriber.onJoin() is called when a node join
+ * gossip), this interface allows to be notified about higher level events.
+ */
+public interface IEndpointLifecycleSubscriber
+{
+    /**
+     * Called when a new node joins the cluster, i.e. either has just been
+     * bootstrapped or "instajoins".
+     *
+     * @param endpoint the newly added endpoint.
+     */
+    public void onJoinCluster(InetAddress endpoint);
+
+    /**
+     * Called when a new node leave the cluster (decommission or removeToken).
+     *
+     * @param endpoint the endpoint that is leaving.
+     */
+    public void onLeaveCluster(InetAddress endpoint);
+
+    /**
+     * Called when a node is marked UP.
+     *
+     * @param endpoint the endpoint marked UP.
+     */
+    public void onUp(InetAddress endpoint);
+
+    /**
+     * Called when a node is marked DOWN.
+     *
+     * @param endpoint the endpoint marked DOWN.
+     */
+    public void onDown(InetAddress endpoint);
+
+    /**
+     * Called when a node has moved (to a new token).
+     *
+     * @param endpoint the endpoint that has moved.
+     */
+    public void onMove(InetAddress endpoint);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63a8bfe0/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 58ce112..ad4a534 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -105,7 +105,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
      * This pool is used by tasks that can have longer execution times, and usually are non periodic.
      */
     public static final DebuggableScheduledThreadPoolExecutor tasks = new DebuggableScheduledThreadPoolExecutor("NonPeriodicTasks");
-/**
+    /**
      * tasks that do not need to be waited for on shutdown/drain
      */
     public static final DebuggableScheduledThreadPoolExecutor optionalTasks = new DebuggableScheduledThreadPoolExecutor("OptionalTasks");
@@ -181,6 +181,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
 
     private static ScheduledRangeTransferExecutorService rangeXferExecutor = new ScheduledRangeTransferExecutorService();
 
+    private final List<IEndpointLifecycleSubscriber> lifecycleSubscribers = new CopyOnWriteArrayList<IEndpointLifecycleSubscriber>();
+
     public void finishBootstrapping()
     {
         isBootstrapMode = false;
@@ -253,6 +255,16 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         this.daemon = daemon;
     }
 
+    public void register(IEndpointLifecycleSubscriber subscriber)
+    {
+        lifecycleSubscribers.add(subscriber);
+    }
+
+    public void unregister(IEndpointLifecycleSubscriber subscriber)
+    {
+        lifecycleSubscribers.remove(subscriber);
+    }
+
     // should only be called via JMX
     public void stopGossiping()
     {
@@ -388,6 +400,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
 
     public synchronized void initServer(int delay) throws ConfigurationException
     {
+        initServerLocally();
+        maybeJoinRing(delay);
+    }
+
+    public void initServerLocally()
+    {
         logger.info("Cassandra version: " + FBUtilities.getReleaseVersionString());
         logger.info("Thrift API version: " + Constants.VERSION);
         logger.info("CQL supported versions: " + StringUtils.join(ClientState.getCQLSupportedVersion(), ",") + " (default: " + ClientState.DEFAULT_CQL_VERSION + ")");
@@ -484,6 +502,18 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
             }
         }, "StorageServiceShutdownHook");
         Runtime.getRuntime().addShutdownHook(drainOnShutdown);
+    }
+
+    public synchronized void maybeJoinRing(int delay) throws ConfigurationException
+    {
+        // This method should only be called as part of the server initialization, so if initialized == true, we've already gone
+        // through that. If the ring must be joined after the server initialization, use joinTokenRing() directly.
+        if (initialized)
+        {
+            if (isClientMode)
+                throw new UnsupportedOperationException("StorageService does not support switching modes.");
+            return;
+        }
 
         if (Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true")))
         {
@@ -1249,8 +1279,21 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
             logger.debug("Node " + endpoint + " state normal, token " + tokens);
 
         if (tokenMetadata.isMember(endpoint))
+        {
             logger.info("Node " + endpoint + " state jump to normal");
 
+            if (!isClientMode)
+            {
+                for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
+                    subscriber.onUp(endpoint);
+            }
+        }
+        else if (!isClientMode)
+        {
+            for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
+                subscriber.onJoinCluster(endpoint);
+        }
+
         // Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300).
         if (Gossiper.instance.usesHostId(endpoint))
             tokenMetadata.updateHostId(Gossiper.instance.getHostId(endpoint), endpoint);
@@ -1344,8 +1387,16 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         SystemTable.updateLocalTokens(Collections.<Token>emptyList(), localTokensToRemove);
 
         if (tokenMetadata.isMoving(endpoint)) // if endpoint was moving to a new token
+        {
             tokenMetadata.removeFromMoving(endpoint);
 
+            if (!isClientMode)
+            {
+                for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
+                    subscriber.onMove(endpoint);
+            }
+        }
+
         calculatePendingRanges();
     }
 
@@ -1504,6 +1555,11 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         Gossiper.instance.removeEndpoint(endpoint);
         tokenMetadata.removeEndpoint(endpoint);
         tokenMetadata.removeBootstrapTokens(tokens);
+        if (!isClientMode)
+        {
+            for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
+                subscriber.onLeaveCluster(endpoint);
+        }
         calculatePendingRanges();
         if (!isClientMode)
         {
@@ -1856,6 +1912,11 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
     public void onDead(InetAddress endpoint, EndpointState state)
     {
         MessagingService.instance().convict(endpoint);
+        if (!isClientMode)
+        {
+            for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
+                subscriber.onDown(endpoint);
+        }
     }
 
     public void onRestart(InetAddress endpoint, EndpointState state)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63a8bfe0/src/java/org/apache/cassandra/transport/Event.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java
index 855049d..33b08f4 100644
--- a/src/java/org/apache/cassandra/transport/Event.java
+++ b/src/java/org/apache/cassandra/transport/Event.java
@@ -58,7 +58,7 @@ public abstract class Event
 
     public static class TopologyChange extends Event
     {
-        public enum Change { NEW_NODE, REMOVED_NODE }
+        public enum Change { NEW_NODE, REMOVED_NODE, MOVED_NODE }
 
         public final Change change;
         public final InetSocketAddress node;
@@ -80,6 +80,11 @@ public abstract class Event
             return new TopologyChange(Change.REMOVED_NODE, new InetSocketAddress(host, port));
         }
 
+        public static TopologyChange movedNode(InetAddress host, int port)
+        {
+            return new TopologyChange(Change.MOVED_NODE, new InetSocketAddress(host, port));
+        }
+
         // Assumes the type has already by been deserialized
         private static TopologyChange deserializeEvent(ChannelBuffer cb)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63a8bfe0/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index e820554..e629ca4 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -35,10 +35,10 @@ import org.jboss.netty.logging.Slf4JLoggerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.gms.*;
 import org.apache.cassandra.service.CassandraDaemon;
 import org.apache.cassandra.service.IMigrationListener;
 import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.service.IEndpointLifecycleSubscriber;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.transport.messages.EventMessage;
 
@@ -63,7 +63,7 @@ public class Server implements CassandraDaemon.Server
     {
         this.socket = socket;
         EventNotifier notifier = new EventNotifier(this);
-        Gossiper.instance.register(notifier);
+        StorageService.instance.register(notifier);
         MigrationManager.instance.register(notifier);
     }
 
@@ -203,7 +203,7 @@ public class Server implements CassandraDaemon.Server
       }
     }
 
-    private static class EventNotifier implements IEndpointStateChangeSubscriber, IMigrationListener
+    private static class EventNotifier implements IEndpointLifecycleSubscriber, IMigrationListener
     {
         private final Server server;
 
@@ -227,33 +227,29 @@ public class Server implements CassandraDaemon.Server
             }
         }
 
-        public void onJoin(InetAddress endpoint, EndpointState epState)
+        public void onJoinCluster(InetAddress endpoint)
         {
             server.connectionTracker.send(Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort()));
         }
 
-        public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
+        public void onLeaveCluster(InetAddress endpoint)
         {
+            server.connectionTracker.send(Event.TopologyChange.removedNode(getRpcAddress(endpoint), server.socket.getPort()));
         }
 
-        public void onAlive(InetAddress endpoint, EndpointState state)
-        {
-            server.connectionTracker.send(Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort()));
-        }
-
-        public void onDead(InetAddress endpoint, EndpointState state)
+        public void onMove(InetAddress endpoint)
         {
-            server.connectionTracker.send(Event.StatusChange.nodeDown(getRpcAddress(endpoint), server.socket.getPort()));
+            server.connectionTracker.send(Event.TopologyChange.movedNode(getRpcAddress(endpoint), server.socket.getPort()));
         }
 
-        public void onRemove(InetAddress endpoint)
+        public void onUp(InetAddress endpoint)
         {
-            server.connectionTracker.send(Event.TopologyChange.removedNode(getRpcAddress(endpoint), server.socket.getPort()));
+            server.connectionTracker.send(Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort()));
         }
 
-        public void onRestart(InetAddress endpoint, EndpointState state)
+        public void onDown(InetAddress endpoint)
         {
-            server.connectionTracker.send(Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort()));
+            server.connectionTracker.send(Event.StatusChange.nodeDown(getRpcAddress(endpoint), server.socket.getPort()));
         }
 
         public void onCreateKeyspace(String ksName)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63a8bfe0/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index 87cc910..ef7a2ab 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -79,7 +79,6 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
             init();
 
             LOCAL = FBUtilities.getBroadcastAddress();
-            StorageService.instance.initServer(0);
             // generate a fake endpoint for which we can spoof receiving/sending trees
             REMOTE = InetAddress.getByName("127.0.0.2");
             store = null;