You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2018/02/26 14:47:17 UTC
cassandra git commit: Add optional startup delay to wait until peers
are ready
Repository: cassandra
Updated Branches:
refs/heads/trunk 1b82de8c9 -> b86801e95
Add optional startup delay to wait until peers are ready
patch by jasobrown; reviewed by Ariel Weisberg for CASSANDRA-13993
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b86801e9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b86801e9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b86801e9
Branch: refs/heads/trunk
Commit: b86801e95a58c5f1a9c779b21fa57136e0225d61
Parents: 1b82de8
Author: Jason Brown <ja...@gmail.com>
Authored: Mon Feb 26 06:38:33 2018 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Mon Feb 26 06:40:18 2018 -0800
----------------------------------------------------------------------
.circleci/config.yml | 12 +-
CHANGES.txt | 1 +
.../org/apache/cassandra/config/Config.java | 5 +
.../cassandra/config/DatabaseDescriptor.java | 10 ++
.../org/apache/cassandra/net/MessageOut.java | 29 +++-
.../apache/cassandra/net/MessagingService.java | 16 +-
.../org/apache/cassandra/net/PingMessage.java | 82 +++++++++
.../apache/cassandra/net/PingVerbHandler.java | 31 ++++
.../org/apache/cassandra/net/PongMessage.java | 50 ++++++
.../net/StartupClusterConnectivityChecker.java | 171 +++++++++++++++++++
.../net/async/OutboundConnectionIdentifier.java | 34 +++-
.../net/async/OutboundMessagingConnection.java | 2 +-
.../net/async/OutboundMessagingPool.java | 33 ++--
.../cassandra/service/CassandraDaemon.java | 10 ++
.../cassandra/service/EchoVerbHandler.java | 6 +-
.../cassandra/service/StorageService.java | 3 +
.../StartupClusterConnectivityCheckerTest.java | 129 ++++++++++++++
.../apache/cassandra/service/RemoveTest.java | 2 +-
18 files changed, 587 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/.circleci/config.yml
----------------------------------------------------------------------
diff --git a/.circleci/config.yml b/.circleci/config.yml
index f881b70..13bc11d 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -58,16 +58,16 @@ with_dtest_jobs_only: &with_dtest_jobs_only
- build
# Set env_settings, env_vars, and workflows/build_and_run_tests based on environment
env_settings: &env_settings
- <<: *default_env_settings
- #<<: *high_capacity_env_settings
+ #<<: *default_env_settings
+ <<: *high_capacity_env_settings
env_vars: &env_vars
- <<: *resource_constrained_env_vars
- #<<: *high_capacity_env_vars
+ #<<: *resource_constrained_env_vars
+ <<: *high_capacity_env_vars
workflows:
version: 2
- build_and_run_tests: *default_jobs
+ #build_and_run_tests: *default_jobs
#build_and_run_tests: *with_dtest_jobs_only
- #build_and_run_tests: *with_dtest_jobs
+ build_and_run_tests: *with_dtest_jobs
docker_image: &docker_image kjellman/cassandra-test:0.4.3
version: 2
jobs:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 40b18ae..9e7a599 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Add optional startup delay to wait until peers are ready (CASSANDRA-13993)
* Add a few options to nodetool verify (CASSANDRA-14201)
* CVE-2017-5929 Security vulnerability and redefine default log rotation policy (CASSANDRA-14183)
* Use JVM default SSL validation algorithm instead of custom default (CASSANDRA-13259)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 875751b..ad91a9b 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -373,6 +373,11 @@ public class Config
public String full_query_log_dir = null;
+ // parameters to adjust how much to delay startup until a certain amount of the cluster is connect to and marked alive
+ public int block_for_peers_percentage = 70;
+ public int block_for_peers_timeout_in_secs = 10;
+
+
/**
* @deprecated migrate to {@link DatabaseDescriptor#isClientInitialized()}
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index ccb0a30..2e772c5 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2522,4 +2522,14 @@ public class DatabaseDescriptor
{
return conf.full_query_log_dir;
}
+
+ public static int getBlockForPeersPercentage()
+ {
+ return conf.block_for_peers_percentage;
+ }
+
+ public static int getBlockForPeersTimeoutInSeconds()
+ {
+ return conf.block_for_peers_timeout_in_secs;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/MessageOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java
index 7d3c0af..236a770 100644
--- a/src/java/org/apache/cassandra/net/MessageOut.java
+++ b/src/java/org/apache/cassandra/net/MessageOut.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
@@ -97,6 +98,11 @@ public class MessageOut<T>
public final List<Object> parameters;
/**
+ * Allows sender to explicitly state which connection type the message should be sent on.
+ */
+ public final ConnectionType connectionType;
+
+ /**
* Memoization of the serialized size of the just the payload.
*/
private int payloadSerializedSize = -1;
@@ -122,24 +128,33 @@ public class MessageOut<T>
this(verb,
payload,
serializer,
- isTracing()
- ? Tracing.instance.getTraceHeaders()
- : ImmutableList.of());
+ isTracing() ? Tracing.instance.getTraceHeaders() : ImmutableList.of(),
+ null);
+ }
+
+ public MessageOut(MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, ConnectionType connectionType)
+ {
+ this(verb,
+ payload,
+ serializer,
+ isTracing() ? Tracing.instance.getTraceHeaders() : ImmutableList.of(),
+ connectionType);
}
- private MessageOut(MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, List<Object> parameters)
+ private MessageOut(MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, List<Object> parameters, ConnectionType connectionType)
{
- this(FBUtilities.getBroadcastAddressAndPort(), verb, payload, serializer, parameters);
+ this(FBUtilities.getBroadcastAddressAndPort(), verb, payload, serializer, parameters, connectionType);
}
@VisibleForTesting
- public MessageOut(InetAddressAndPort from, MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, List<Object> parameters)
+ public MessageOut(InetAddressAndPort from, MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, List<Object> parameters, ConnectionType connectionType)
{
this.from = from;
this.verb = verb;
this.payload = payload;
this.serializer = serializer;
this.parameters = parameters;
+ this.connectionType = connectionType;
}
public <VT> MessageOut<T> withParameter(ParameterType type, VT value)
@@ -148,7 +163,7 @@ public class MessageOut<T>
newParameters.addAll(parameters);
newParameters.add(type);
newParameters.add(value);
- return new MessageOut<T>(verb, payload, serializer, newParameters);
+ return new MessageOut<T>(verb, payload, serializer, newParameters, connectionType);
}
public Stage getStage()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 8fdb395..573cf7d 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -35,7 +35,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -255,7 +254,10 @@ public final class MessagingService implements MessagingServiceMBean
return DatabaseDescriptor.getRangeRpcTimeout();
}
},
- // remember to add new verbs at the end, since we serialize by ordinal
+ PING(),
+
+ // add new verbs after the existing verbs, but *before* the UNUSED verbs, since we serialize by ordinal.
+ // UNUSED verbs serve as padding for backwards compatability where a previous version needs to validate a verb from the future.
UNUSED_1,
UNUSED_2,
UNUSED_3,
@@ -263,7 +265,7 @@ public final class MessagingService implements MessagingServiceMBean
UNUSED_5,
;
- private int id;
+ private final int id;
Verb()
{
id = ordinal();
@@ -291,7 +293,11 @@ public final class MessagingService implements MessagingServiceMBean
static
{
for (Verb v : values())
+ {
+ if (idToVerbMap.containsKey(v.getId()))
+ throw new IllegalArgumentException("cannot have two verbs that map to the same id: " + v + " and " + v.getId());
idToVerbMap.put(v.getId(), v);
+ }
}
public static Verb fromId(int id)
@@ -347,6 +353,8 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.UNUSED_1, Stage.INTERNAL_RESPONSE);
put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE);
put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE);
+
+ put(Verb.PING, Stage.READ);
}};
/**
@@ -385,6 +393,8 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.HINT, HintMessage.serializer);
put(Verb.BATCH_STORE, Batch.serializer);
put(Verb.BATCH_REMOVE, UUIDSerializer.serializer);
+
+ put(Verb.PING, PingMessage.serializer);
}};
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/PingMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/PingMessage.java b/src/java/org/apache/cassandra/net/PingMessage.java
new file mode 100644
index 0000000..4a19f22
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/PingMessage.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+
+import org.apache.cassandra.hints.HintResponse;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
+import org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType;
+
+/**
+ * Conceptually the same as {@link org.apache.cassandra.gms.EchoMessage}, but indicates to the recipient which
+ * {@link ConnectionType} should be used for the response.
+ */
+public class PingMessage
+{
+ public static IVersionedSerializer<PingMessage> serializer = new PingMessageSerializer();
+
+ public static final PingMessage smallChannelMessage = new PingMessage(ConnectionType.SMALL_MESSAGE);
+ public static final PingMessage largeChannelMessage = new PingMessage(ConnectionType.LARGE_MESSAGE);
+ public static final PingMessage gossipChannelMessage = new PingMessage(ConnectionType.GOSSIP);
+
+ public final ConnectionType connectionType;
+
+ public PingMessage(ConnectionType connectionType)
+ {
+ this.connectionType = connectionType;
+ }
+
+ public static class PingMessageSerializer implements IVersionedSerializer<PingMessage>
+ {
+ public void serialize(PingMessage t, DataOutputPlus out, int version) throws IOException
+ {
+ out.writeByte(t.connectionType.getId());
+ }
+
+ public PingMessage deserialize(DataInputPlus in, int version) throws IOException
+ {
+ ConnectionType connectionType = ConnectionType.fromId(in.readByte());
+
+ // if we ever create a new connection type, then during a rolling upgrade, the old nodes won't know about
+ // the new connection type (as it won't recognize the id), so just default to the small message type.
+ if (connectionType == null)
+ connectionType = ConnectionType.SMALL_MESSAGE;
+
+ switch (connectionType)
+ {
+ case LARGE_MESSAGE:
+ return largeChannelMessage;
+ case GOSSIP:
+ return gossipChannelMessage;
+ case SMALL_MESSAGE:
+ default:
+ return smallChannelMessage;
+ }
+ }
+
+ public long serializedSize(PingMessage t, int version)
+ {
+ return 1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/PingVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/PingVerbHandler.java b/src/java/org/apache/cassandra/net/PingVerbHandler.java
new file mode 100644
index 0000000..d959b91
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/PingVerbHandler.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+public class PingVerbHandler implements IVerbHandler<PingMessage>
+{
+ @Override
+ public void doVerb(MessageIn<PingMessage> message, int id)
+ {
+ MessageOut<PongMessage> msg = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, PongMessage.instance,
+ PongMessage.serializer,
+ message.payload.connectionType);
+ MessagingService.instance().sendReply(msg, id, message.from);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/PongMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/PongMessage.java b/src/java/org/apache/cassandra/net/PongMessage.java
new file mode 100644
index 0000000..bb89cdf
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/PongMessage.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+public class PongMessage
+{
+ public static final PongMessage instance = new PongMessage();
+ public static IVersionedSerializer<PongMessage> serializer = new PongMessage.PongMessageSerializer();
+
+ private PongMessage()
+ { }
+
+ public static class PongMessageSerializer implements IVersionedSerializer<PongMessage>
+ {
+ public void serialize(PongMessage t, DataOutputPlus out, int version) throws IOException
+ { }
+
+ public PongMessage deserialize(DataInputPlus in, int version) throws IOException
+ {
+ return instance;
+ }
+
+ public long serializedSize(PongMessage t, int version)
+ {
+ return 0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java b/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
new file mode 100644
index 0000000..f22ab48
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.net.MessagingService.Verb.PING;
+
+public class StartupClusterConnectivityChecker
+{
+ private static final Logger logger = LoggerFactory.getLogger(StartupClusterConnectivityChecker.class);
+
+ enum State { CONTINUE, FINISH_SUCCESS, FINISH_TIMEOUT }
+
+ private final int targetPercent;
+ private final int timeoutSecs;
+ private final Predicate<InetAddressAndPort> gossipStatus;
+
+ public StartupClusterConnectivityChecker(int targetPercent, int timeoutSecs, Predicate<InetAddressAndPort> gossipStatus)
+ {
+ if (targetPercent < 0)
+ {
+ targetPercent = 0;
+ }
+ else if (targetPercent > 100)
+ {
+ targetPercent = 100;
+ }
+ this.targetPercent = targetPercent;
+
+ if (timeoutSecs < 0)
+ {
+ timeoutSecs = 1;
+ }
+ else if (timeoutSecs > 100)
+ {
+ logger.warn("setting the block-for-peers timeout (in seconds) to {} might be a bit excessive, but using it nonetheless", timeoutSecs);
+ }
+ this.timeoutSecs = timeoutSecs;
+
+ this.gossipStatus = gossipStatus;
+ }
+
+ public void execute(Set<InetAddressAndPort> peers)
+ {
+ if (peers == null || targetPercent == 0)
+ return;
+
+ // remove current node from the set
+ peers = peers.stream()
+ .filter(peer -> !peer.equals(FBUtilities.getBroadcastAddressAndPort()))
+ .collect(Collectors.toSet());
+
+ // don't block if there's no other nodes in the cluster (or we don't know about them)
+ if (peers.size() <= 0)
+ return;
+
+ logger.info("choosing to block until {}% of peers are marked alive and connections are established; max time to wait = {} seconds",
+ targetPercent, timeoutSecs);
+
+ // first, send out a ping message to open up the non-gossip connections
+ final AtomicInteger connectedCount = sendPingMessages(peers);
+
+ final long startNanos = System.nanoTime();
+ final long expirationNanos = startNanos + TimeUnit.SECONDS.toNanos(timeoutSecs);
+ int completedRounds = 0;
+ while (checkStatus(peers, connectedCount, startNanos, expirationNanos < System.nanoTime(), completedRounds) == State.CONTINUE)
+ {
+ completedRounds++;
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MICROSECONDS);
+ }
+ }
+
+ State checkStatus(Set<InetAddressAndPort> peers, AtomicInteger connectedCount, final long startNanos, boolean beyondExpiration, final int completedRounds)
+ {
+ long currentAlive = peers.stream().filter(gossipStatus).count();
+ float currentAlivePercent = ((float) currentAlive / (float) peers.size()) * 100;
+
+ // assume two connections to remote host that we care to track here (small msg & large msg)
+ final int totalConnectionsSize = peers.size() * 2;
+ final int connectionsCount = connectedCount.get();
+ float currentConnectedPercent = ((float) connectionsCount / (float) totalConnectionsSize) * 100;
+
+ if (currentAlivePercent >= targetPercent && currentConnectedPercent >= targetPercent)
+ {
+ logger.info("after {} milliseconds, found {}% ({} / {}) of peers as marked alive, " +
+ "and {}% ({} / {}) of peers as connected, " +
+ "both of which are above the desired threshold of {}%",
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos),
+ currentAlivePercent, currentAlive, peers.size(),
+ currentConnectedPercent, connectionsCount, totalConnectionsSize,
+ targetPercent);
+ return State.FINISH_SUCCESS;
+ }
+
+ // perform at least two rounds of checking, else this is kinda useless (and the operator set the aliveTimeoutSecs too low)
+ if (completedRounds >= 2 && beyondExpiration)
+ {
+ logger.info("after {} milliseconds, found {}% ({} / {}) of peers as marked alive, " +
+ "and {}% ({} / {}) of peers as connected, " +
+ "one or both of which is below the desired threshold of {}%",
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos),
+ currentAlivePercent, currentAlive, peers.size(),
+ currentConnectedPercent, connectionsCount, totalConnectionsSize,
+ targetPercent);
+ return State.FINISH_TIMEOUT;
+ }
+ return State.CONTINUE;
+ }
+
+ /**
+ * Sends a "connection warmup" message to each peer in the collection, on every {@link OutboundConnectionIdentifier.ConnectionType}
+ * used for internode messaging.
+ */
+ private AtomicInteger sendPingMessages(Set<InetAddressAndPort> peers)
+ {
+ AtomicInteger connectedCount = new AtomicInteger(0);
+ IAsyncCallback responseHandler = new IAsyncCallback()
+ {
+ @Override
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
+
+ @Override
+ public void response(MessageIn msg)
+ {
+ connectedCount.incrementAndGet();
+ }
+ };
+
+ MessageOut<PingMessage> smallChannelMessageOut = new MessageOut<>(PING, PingMessage.smallChannelMessage, PingMessage.serializer);
+ MessageOut<PingMessage> largeChannelMessageOut = new MessageOut<>(PING, PingMessage.largeChannelMessage, PingMessage.serializer);
+ for (InetAddressAndPort peer : peers)
+ {
+ MessagingService.instance().sendRR(smallChannelMessageOut, peer, responseHandler);
+ MessagingService.instance().sendRR(largeChannelMessageOut, peer, responseHandler);
+ }
+
+ return connectedCount;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
index f3cb554..e309065 100644
--- a/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
+++ b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
@@ -18,9 +18,8 @@
package org.apache.cassandra.net.async;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
+import com.carrotsearch.hppc.IntObjectMap;
+import com.carrotsearch.hppc.IntObjectOpenHashMap;
import org.apache.cassandra.locator.InetAddressAndPort;
/**
@@ -32,9 +31,34 @@ import org.apache.cassandra.locator.InetAddressAndPort;
*/
public class OutboundConnectionIdentifier
{
- enum ConnectionType
+ public enum ConnectionType
{
- GOSSIP, LARGE_MESSAGE, SMALL_MESSAGE, STREAM
+ GOSSIP (0), LARGE_MESSAGE (1), SMALL_MESSAGE (2), STREAM (3);
+
+ private final int id;
+
+ ConnectionType(int id)
+ {
+ this.id = id;
+ }
+
+ public int getId()
+ {
+ return id;
+ }
+
+ private static final IntObjectMap<ConnectionType> idMap = new IntObjectOpenHashMap<>(values().length);
+ static
+ {
+ for (ConnectionType type : values())
+ idMap.put(type.id, type);
+ }
+
+ public static ConnectionType fromId(int id)
+ {
+ return idMap.get(id);
+ }
+
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java b/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
index 28775ef..064131b 100644
--- a/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
+++ b/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
@@ -479,7 +479,7 @@ public class OutboundMessagingConnection
{
case SUCCESS:
assert result.channelWriter != null;
- logger.debug("successfully connected to {}, conmpress = {}, coalescing = {}", connectionId,
+ logger.debug("successfully connected to {}, compress = {}, coalescing = {}", connectionId,
shouldCompressConnection(connectionId.local(), connectionId.remote()),
coalescingStrategy.isPresent() ? coalescingStrategy.get() : CoalescingStrategies.Strategy.DISABLED);
if (state.get() == State.CLOSED)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java b/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java
index c701229..14650a7 100644
--- a/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java
+++ b/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.net.async;
-import java.net.InetSocketAddress;
import java.util.Optional;
import com.google.common.annotations.VisibleForTesting;
@@ -101,14 +100,21 @@ public class OutboundMessagingPool
@VisibleForTesting
public OutboundMessagingConnection getConnection(MessageOut msg)
{
- // optimize for the common path (the small message channel)
- if (Stage.GOSSIP != msg.getStage())
+ if (msg.connectionType == null)
{
- return msg.serializedSize(smallMessageChannel.getTargetVersion()) < LARGE_MESSAGE_THRESHOLD
- ? smallMessageChannel
- : largeMessageChannel;
+ // optimize for the common path (the small message channel)
+ if (Stage.GOSSIP != msg.getStage())
+ {
+ return msg.serializedSize(smallMessageChannel.getTargetVersion()) < LARGE_MESSAGE_THRESHOLD
+ ? smallMessageChannel
+ : largeMessageChannel;
+ }
+ return gossipChannel;
+ }
+ else
+ {
+ return getConnection(msg.connectionType);
}
- return gossipChannel;
}
/**
@@ -138,20 +144,17 @@ public class OutboundMessagingPool
smallMessageChannel.close(softClose);
}
- /**
- * For testing purposes only.
- */
@VisibleForTesting
- OutboundMessagingConnection getConnection(ConnectionType connectionType)
+ final OutboundMessagingConnection getConnection(ConnectionType connectionType)
{
switch (connectionType)
{
- case GOSSIP:
- return gossipChannel;
- case LARGE_MESSAGE:
- return largeMessageChannel;
case SMALL_MESSAGE:
return smallMessageChannel;
+ case LARGE_MESSAGE:
+ return largeMessageChannel;
+ case GOSSIP:
+ return gossipChannel;
default:
throw new IllegalArgumentException("unsupported connection type: " + connectionType);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 130f3fd..295a33b 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -25,7 +25,10 @@ import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.StandardMBean;
@@ -47,6 +50,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.StartupClusterConnectivityChecker;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.schema.Schema;
@@ -491,6 +495,12 @@ public class CassandraDaemon
*/
public void start()
{
+ StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(DatabaseDescriptor.getBlockForPeersPercentage(),
+ DatabaseDescriptor.getBlockForPeersTimeoutInSeconds(),
+ Gossiper.instance::isAlive);
+ Set<InetAddressAndPort> peers = Gossiper.instance.getEndpointStates().stream().map(Map.Entry::getKey).collect(Collectors.toSet());
+ connectivityChecker.execute(peers);
+
String nativeFlag = System.getProperty("cassandra.start_native_transport");
if ((nativeFlag != null && Boolean.parseBoolean(nativeFlag)) || (nativeFlag == null && DatabaseDescriptor.startNativeTransport()))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/service/EchoVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/EchoVerbHandler.java b/src/java/org/apache/cassandra/service/EchoVerbHandler.java
index d0c435e..1cc52e9 100644
--- a/src/java/org/apache/cassandra/service/EchoVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/EchoVerbHandler.java
@@ -26,16 +26,20 @@ import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType;
+
public class EchoVerbHandler implements IVerbHandler<EchoMessage>
{
private static final Logger logger = LoggerFactory.getLogger(EchoVerbHandler.class);
public void doVerb(MessageIn<EchoMessage> message, int id)
{
- MessageOut<EchoMessage> echoMessage = new MessageOut<EchoMessage>(MessagingService.Verb.REQUEST_RESPONSE, EchoMessage.instance, EchoMessage.serializer);
+ MessageOut<EchoMessage> echoMessage = new MessageOut<EchoMessage>(MessagingService.Verb.REQUEST_RESPONSE, EchoMessage.instance,
+ EchoMessage.serializer, ConnectionType.GOSSIP);
logger.trace("Sending a EchoMessage reply {}", message.from);
MessagingService.instance().sendReply(echoMessage, id, message.from);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index d465431..51b77a6 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -92,6 +92,7 @@ import org.apache.cassandra.schema.SchemaVersionVerbHandler;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.schema.ViewMetadata;
+import org.apache.cassandra.repair.RepairMessageVerbHandler;
import org.apache.cassandra.service.paxos.CommitVerbHandler;
import org.apache.cassandra.service.paxos.PrepareVerbHandler;
import org.apache.cassandra.service.paxos.ProposeVerbHandler;
@@ -301,6 +302,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCH_STORE, new BatchStoreVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCH_REMOVE, new BatchRemoveVerbHandler());
+
+ MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PING, new PingVerbHandler());
}
public void registerDaemon(CassandraDaemon daemon)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java b/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java
new file mode 100644
index 0000000..12f54c6
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.net.UnknownHostException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+
+import com.google.common.net.InetAddresses;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+public class StartupClusterConnectivityCheckerTest
+{
+ @Test
+ public void testConnectivity_SimpleHappyPath() throws UnknownHostException
+ {
+ StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(70, 10, addr -> true);
+ int count = 10;
+ Set<InetAddressAndPort> peers = createNodes(count);
+ Assert.assertEquals(StartupClusterConnectivityChecker.State.FINISH_SUCCESS,
+ connectivityChecker.checkStatus(peers, new AtomicInteger(count * 2), System.nanoTime(), false, 0));
+ }
+
+ @Test
+ public void testConnectivity_SimpleContinue() throws UnknownHostException
+ {
+ StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(70, 10, addr -> true);
+ int count = 10;
+ Set<InetAddressAndPort> peers = createNodes(count);
+ Assert.assertEquals(StartupClusterConnectivityChecker.State.CONTINUE,
+ connectivityChecker.checkStatus(peers, new AtomicInteger(0), System.nanoTime(), false, 0));
+ }
+
+ @Test
+ public void testConnectivity_Timeout() throws UnknownHostException
+ {
+ StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(70, 10, addr -> true);
+ int count = 10;
+ Set<InetAddressAndPort> peers = createNodes(count);
+ Assert.assertEquals(StartupClusterConnectivityChecker.State.CONTINUE,
+ connectivityChecker.checkStatus(peers, new AtomicInteger(0), System.nanoTime(), false, 4));
+ Assert.assertEquals(StartupClusterConnectivityChecker.State.FINISH_TIMEOUT,
+ connectivityChecker.checkStatus(peers, new AtomicInteger(0), System.nanoTime(), true, 5));
+ }
+
+ @Test
+ public void testConnectivity_SimpleUpdating() throws UnknownHostException
+ {
+ UpdatablePredicate predicate = new UpdatablePredicate();
+ final int count = 100;
+ final int thresholdPercentage = 70;
+ StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(thresholdPercentage, 10, predicate);
+ Set<InetAddressAndPort> peers = createNodes(count);
+
+ AtomicInteger connectedCount = new AtomicInteger();
+
+ for (int i = 0; i < count; i++)
+ {
+ predicate.reset(i);
+ connectedCount.set(i * 2);
+ StartupClusterConnectivityChecker.State expectedState = i < thresholdPercentage ?
+ StartupClusterConnectivityChecker.State.CONTINUE :
+ StartupClusterConnectivityChecker.State.FINISH_SUCCESS;
+ Assert.assertEquals("failed on iteration " + i,
+ expectedState, connectivityChecker.checkStatus(peers, connectedCount, System.nanoTime(), false, i));
+ }
+ }
+
+ /**
+ * returns true until index = threshold, then returns false.
+ */
+ private class UpdatablePredicate implements Predicate<InetAddressAndPort>
+ {
+ int index;
+ int threshold;
+
+ void reset(int threshold)
+ {
+ index = 0;
+ this.threshold = threshold;
+ }
+
+ @Override
+ public boolean test(InetAddressAndPort inetAddressAndPort)
+ {
+ index++;
+ return index <= threshold;
+ }
+ }
+
+ private static Set<InetAddressAndPort> createNodes(int count) throws UnknownHostException
+ {
+ Set<InetAddressAndPort> nodes = new HashSet<>();
+
+ if (count < 1)
+ Assert.fail("need at least *one* node in the set!");
+
+ InetAddressAndPort node = InetAddressAndPort.getByName("127.0.0.1");
+ nodes.add(node);
+ for (int i = 1; i < count; i++)
+ {
+ node = InetAddressAndPort.getByAddress(InetAddresses.increment(node.address));
+ nodes.add(node);
+ }
+ return nodes;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 6714a83..0d39322 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -161,7 +161,7 @@ public class RemoveTest
for (InetAddressAndPort host : hosts)
{
- MessageOut msg = new MessageOut(host, MessagingService.Verb.REPLICATION_FINISHED, null, null, Collections.<Object>emptyList());
+ MessageOut msg = new MessageOut(host, MessagingService.Verb.REPLICATION_FINISHED, null, null, Collections.<Object>emptyList(), null);
MessagingService.instance().sendRR(msg, FBUtilities.getBroadcastAddressAndPort());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org