You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/10/30 16:12:37 UTC
git commit: Fix binary protocol NEW_NODE event
Updated Branches:
refs/heads/trunk 7aee6602d -> 63a8bfe03
Fix binary protocol NEW_NODE event
patch by slebresne; reviewed by yukim for CASSANDRA-4679
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/63a8bfe0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/63a8bfe0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/63a8bfe0
Branch: refs/heads/trunk
Commit: 63a8bfe030ece2497c839bb34148b73e416466f4
Parents: 7aee660
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Oct 30 16:11:27 2012 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Oct 30 16:11:27 2012 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
doc/native_protocol.spec | 7 ++
.../gms/IEndpointStateChangeSubscriber.java | 1 -
.../apache/cassandra/service/CassandraDaemon.java | 22 +++--
.../service/IEndpointLifecycleSubscriber.java | 67 +++++++++++++++
.../apache/cassandra/service/StorageService.java | 63 +++++++++++++-
src/java/org/apache/cassandra/transport/Event.java | 7 ++-
.../org/apache/cassandra/transport/Server.java | 28 +++----
.../service/AntiEntropyServiceTestAbstract.java | 1 -
9 files changed, 167 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/63a8bfe0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2b87c85..e25bfbf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -39,6 +39,7 @@
* Fix list prepend logic (CQL3) (CASSANDRA-4835)
* Add booleans as literals in CQL3 (CASSANDRA-4776)
* Allow renaming PK columns in CQL3 (CASSANDRA-4822)
+ * Fix binary protocol NEW_NODE event (CASSANDRA-4679)
Merged from 1.1:
* fix get_paged_slice to wrap to next row correctly (CASSANDRA-4816)
* fix indexing empty column values (CASSANDRA-4832)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/63a8bfe0/doc/native_protocol.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol.spec b/doc/native_protocol.spec
index 5c84f71..f534cde 100644
--- a/doc/native_protocol.spec
+++ b/doc/native_protocol.spec
@@ -481,6 +481,13 @@ Table of Contents
All EVENT message have a streamId of -1 (Section 2.3).
+ Please note that "NEW_NODE" and "UP" events are sent based on internal Gossip
+ communication and as such may be sent a short delay before the binary
+ protocol server on the newly up node is fully started. Clients are thus
+ advise to wait a short time before trying to connect to the node (1 seconds
+ should be enough), otherwise they may experience a connection refusal at
+ first.
+
5. Compression
http://git-wip-us.apache.org/repos/asf/cassandra/blob/63a8bfe0/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java b/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
index fff613b..2abee01 100644
--- a/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
+++ b/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
@@ -27,7 +27,6 @@ import java.net.InetAddress;
* instance to decide what he does with this change. Not all modules maybe interested
* in all state changes.
*/
-
public interface IEndpointStateChangeSubscriber
{
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/63a8bfe0/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 03da7e3..7b8e7d8 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -300,16 +300,7 @@ public class CassandraDaemon
// start server internals
StorageService.instance.registerDaemon(this);
- try
- {
- StorageService.instance.initServer();
- }
- catch (ConfigurationException e)
- {
- logger.error("Fatal configuration error", e);
- System.err.println(e.getMessage() + "\nFatal configuration error; unable to start server. See log for stacktrace.");
- System.exit(1);
- }
+ StorageService.instance.initServerLocally();
Mx4jTool.maybeLoad();
@@ -357,6 +348,17 @@ public class CassandraDaemon
nativeServer.start();
else
logger.info("Not starting native transport as requested. Use JMX (StorageService->startNativeTransport()) to start it");
+
+ try
+ {
+ StorageService.instance.maybeJoinRing(StorageService.RING_DELAY);
+ }
+ catch (ConfigurationException e)
+ {
+ logger.error("Fatal configuration error", e);
+ System.err.println(e.getMessage() + "\nFatal configuration error; unable to start server. See log for stacktrace.");
+ System.exit(1);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/63a8bfe0/src/java/org/apache/cassandra/service/IEndpointLifecycleSubscriber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IEndpointLifecycleSubscriber.java b/src/java/org/apache/cassandra/service/IEndpointLifecycleSubscriber.java
new file mode 100644
index 0000000..24cb3d7
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/IEndpointLifecycleSubscriber.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+
+/**
+ * Interface on which interested parties can be notified of high level endpoint
+ * state changes.
+ *
+ * Note that while IEndpointStateChangeSubscriber notify about gossip related
+ * changes (IEndpointStateChangeSubscriber.onJoin() is called when a node join
+ * gossip), this interface allows to be notified about higher level events.
+ */
+public interface IEndpointLifecycleSubscriber
+{
+ /**
+ * Called when a new node joins the cluster, i.e. either has just been
+ * bootstrapped or "instajoins".
+ *
+ * @param endpoint the newly added endpoint.
+ */
+ public void onJoinCluster(InetAddress endpoint);
+
+ /**
+ * Called when a new node leave the cluster (decommission or removeToken).
+ *
+ * @param endpoint the endpoint that is leaving.
+ */
+ public void onLeaveCluster(InetAddress endpoint);
+
+ /**
+ * Called when a node is marked UP.
+ *
+ * @param endpoint the endpoint marked UP.
+ */
+ public void onUp(InetAddress endpoint);
+
+ /**
+ * Called when a node is marked DOWN.
+ *
+ * @param endpoint the endpoint marked DOWN.
+ */
+ public void onDown(InetAddress endpoint);
+
+ /**
+ * Called when a node has moved (to a new token).
+ *
+ * @param endpoint the endpoint that has moved.
+ */
+ public void onMove(InetAddress endpoint);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/63a8bfe0/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 58ce112..ad4a534 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -105,7 +105,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
* This pool is used by tasks that can have longer execution times, and usually are non periodic.
*/
public static final DebuggableScheduledThreadPoolExecutor tasks = new DebuggableScheduledThreadPoolExecutor("NonPeriodicTasks");
-/**
+ /**
* tasks that do not need to be waited for on shutdown/drain
*/
public static final DebuggableScheduledThreadPoolExecutor optionalTasks = new DebuggableScheduledThreadPoolExecutor("OptionalTasks");
@@ -181,6 +181,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
private static ScheduledRangeTransferExecutorService rangeXferExecutor = new ScheduledRangeTransferExecutorService();
+ private final List<IEndpointLifecycleSubscriber> lifecycleSubscribers = new CopyOnWriteArrayList<IEndpointLifecycleSubscriber>();
+
public void finishBootstrapping()
{
isBootstrapMode = false;
@@ -253,6 +255,16 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
this.daemon = daemon;
}
+ public void register(IEndpointLifecycleSubscriber subscriber)
+ {
+ lifecycleSubscribers.add(subscriber);
+ }
+
+ public void unregister(IEndpointLifecycleSubscriber subscriber)
+ {
+ lifecycleSubscribers.remove(subscriber);
+ }
+
// should only be called via JMX
public void stopGossiping()
{
@@ -388,6 +400,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
public synchronized void initServer(int delay) throws ConfigurationException
{
+ initServerLocally();
+ maybeJoinRing(delay);
+ }
+
+ public void initServerLocally()
+ {
logger.info("Cassandra version: " + FBUtilities.getReleaseVersionString());
logger.info("Thrift API version: " + Constants.VERSION);
logger.info("CQL supported versions: " + StringUtils.join(ClientState.getCQLSupportedVersion(), ",") + " (default: " + ClientState.DEFAULT_CQL_VERSION + ")");
@@ -484,6 +502,18 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
}
}, "StorageServiceShutdownHook");
Runtime.getRuntime().addShutdownHook(drainOnShutdown);
+ }
+
+ public synchronized void maybeJoinRing(int delay) throws ConfigurationException
+ {
+ // This method should only be called as part of the server initialization, so if initialized == true, we've already gone
+ // through that. If the ring must be joined after the server initialization, use joinTokenRing() directly.
+ if (initialized)
+ {
+ if (isClientMode)
+ throw new UnsupportedOperationException("StorageService does not support switching modes.");
+ return;
+ }
if (Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true")))
{
@@ -1249,8 +1279,21 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
logger.debug("Node " + endpoint + " state normal, token " + tokens);
if (tokenMetadata.isMember(endpoint))
+ {
logger.info("Node " + endpoint + " state jump to normal");
+ if (!isClientMode)
+ {
+ for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
+ subscriber.onUp(endpoint);
+ }
+ }
+ else if (!isClientMode)
+ {
+ for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
+ subscriber.onJoinCluster(endpoint);
+ }
+
// Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300).
if (Gossiper.instance.usesHostId(endpoint))
tokenMetadata.updateHostId(Gossiper.instance.getHostId(endpoint), endpoint);
@@ -1344,8 +1387,16 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
SystemTable.updateLocalTokens(Collections.<Token>emptyList(), localTokensToRemove);
if (tokenMetadata.isMoving(endpoint)) // if endpoint was moving to a new token
+ {
tokenMetadata.removeFromMoving(endpoint);
+ if (!isClientMode)
+ {
+ for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
+ subscriber.onMove(endpoint);
+ }
+ }
+
calculatePendingRanges();
}
@@ -1504,6 +1555,11 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
Gossiper.instance.removeEndpoint(endpoint);
tokenMetadata.removeEndpoint(endpoint);
tokenMetadata.removeBootstrapTokens(tokens);
+ if (!isClientMode)
+ {
+ for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
+ subscriber.onLeaveCluster(endpoint);
+ }
calculatePendingRanges();
if (!isClientMode)
{
@@ -1856,6 +1912,11 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
public void onDead(InetAddress endpoint, EndpointState state)
{
MessagingService.instance().convict(endpoint);
+ if (!isClientMode)
+ {
+ for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
+ subscriber.onDown(endpoint);
+ }
}
public void onRestart(InetAddress endpoint, EndpointState state)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/63a8bfe0/src/java/org/apache/cassandra/transport/Event.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java
index 855049d..33b08f4 100644
--- a/src/java/org/apache/cassandra/transport/Event.java
+++ b/src/java/org/apache/cassandra/transport/Event.java
@@ -58,7 +58,7 @@ public abstract class Event
public static class TopologyChange extends Event
{
- public enum Change { NEW_NODE, REMOVED_NODE }
+ public enum Change { NEW_NODE, REMOVED_NODE, MOVED_NODE }
public final Change change;
public final InetSocketAddress node;
@@ -80,6 +80,11 @@ public abstract class Event
return new TopologyChange(Change.REMOVED_NODE, new InetSocketAddress(host, port));
}
+ public static TopologyChange movedNode(InetAddress host, int port)
+ {
+ return new TopologyChange(Change.MOVED_NODE, new InetSocketAddress(host, port));
+ }
+
// Assumes the type has already by been deserialized
private static TopologyChange deserializeEvent(ChannelBuffer cb)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/63a8bfe0/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index e820554..e629ca4 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -35,10 +35,10 @@ import org.jboss.netty.logging.Slf4JLoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.gms.*;
import org.apache.cassandra.service.CassandraDaemon;
import org.apache.cassandra.service.IMigrationListener;
import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.service.IEndpointLifecycleSubscriber;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.transport.messages.EventMessage;
@@ -63,7 +63,7 @@ public class Server implements CassandraDaemon.Server
{
this.socket = socket;
EventNotifier notifier = new EventNotifier(this);
- Gossiper.instance.register(notifier);
+ StorageService.instance.register(notifier);
MigrationManager.instance.register(notifier);
}
@@ -203,7 +203,7 @@ public class Server implements CassandraDaemon.Server
}
}
- private static class EventNotifier implements IEndpointStateChangeSubscriber, IMigrationListener
+ private static class EventNotifier implements IEndpointLifecycleSubscriber, IMigrationListener
{
private final Server server;
@@ -227,33 +227,29 @@ public class Server implements CassandraDaemon.Server
}
}
- public void onJoin(InetAddress endpoint, EndpointState epState)
+ public void onJoinCluster(InetAddress endpoint)
{
server.connectionTracker.send(Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort()));
}
- public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
+ public void onLeaveCluster(InetAddress endpoint)
{
+ server.connectionTracker.send(Event.TopologyChange.removedNode(getRpcAddress(endpoint), server.socket.getPort()));
}
- public void onAlive(InetAddress endpoint, EndpointState state)
- {
- server.connectionTracker.send(Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort()));
- }
-
- public void onDead(InetAddress endpoint, EndpointState state)
+ public void onMove(InetAddress endpoint)
{
- server.connectionTracker.send(Event.StatusChange.nodeDown(getRpcAddress(endpoint), server.socket.getPort()));
+ server.connectionTracker.send(Event.TopologyChange.movedNode(getRpcAddress(endpoint), server.socket.getPort()));
}
- public void onRemove(InetAddress endpoint)
+ public void onUp(InetAddress endpoint)
{
- server.connectionTracker.send(Event.TopologyChange.removedNode(getRpcAddress(endpoint), server.socket.getPort()));
+ server.connectionTracker.send(Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort()));
}
- public void onRestart(InetAddress endpoint, EndpointState state)
+ public void onDown(InetAddress endpoint)
{
- server.connectionTracker.send(Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort()));
+ server.connectionTracker.send(Event.StatusChange.nodeDown(getRpcAddress(endpoint), server.socket.getPort()));
}
public void onCreateKeyspace(String ksName)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/63a8bfe0/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index 87cc910..ef7a2ab 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -79,7 +79,6 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
init();
LOCAL = FBUtilities.getBroadcastAddress();
- StorageService.instance.initServer(0);
// generate a fake endpoint for which we can spoof receiving/sending trees
REMOTE = InetAddress.getByName("127.0.0.2");
store = null;