You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2018/02/26 14:47:17 UTC

cassandra git commit: Add optional startup delay to wait until peers are ready

Repository: cassandra
Updated Branches:
  refs/heads/trunk 1b82de8c9 -> b86801e95


Add optional startup delay to wait until peers are ready

patch by jasobrown; reviewed by Ariel Weisberg for CASSANDRA-13993


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b86801e9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b86801e9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b86801e9

Branch: refs/heads/trunk
Commit: b86801e95a58c5f1a9c779b21fa57136e0225d61
Parents: 1b82de8
Author: Jason Brown <ja...@gmail.com>
Authored: Mon Feb 26 06:38:33 2018 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Mon Feb 26 06:40:18 2018 -0800

----------------------------------------------------------------------
 .circleci/config.yml                            |  12 +-
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/config/Config.java     |   5 +
 .../cassandra/config/DatabaseDescriptor.java    |  10 ++
 .../org/apache/cassandra/net/MessageOut.java    |  29 +++-
 .../apache/cassandra/net/MessagingService.java  |  16 +-
 .../org/apache/cassandra/net/PingMessage.java   |  82 +++++++++
 .../apache/cassandra/net/PingVerbHandler.java   |  31 ++++
 .../org/apache/cassandra/net/PongMessage.java   |  50 ++++++
 .../net/StartupClusterConnectivityChecker.java  | 171 +++++++++++++++++++
 .../net/async/OutboundConnectionIdentifier.java |  34 +++-
 .../net/async/OutboundMessagingConnection.java  |   2 +-
 .../net/async/OutboundMessagingPool.java        |  33 ++--
 .../cassandra/service/CassandraDaemon.java      |  10 ++
 .../cassandra/service/EchoVerbHandler.java      |   6 +-
 .../cassandra/service/StorageService.java       |   3 +
 .../StartupClusterConnectivityCheckerTest.java  | 129 ++++++++++++++
 .../apache/cassandra/service/RemoveTest.java    |   2 +-
 18 files changed, 587 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/.circleci/config.yml
----------------------------------------------------------------------
diff --git a/.circleci/config.yml b/.circleci/config.yml
index f881b70..13bc11d 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -58,16 +58,16 @@ with_dtest_jobs_only: &with_dtest_jobs_only
                       - build
 # Set env_settings, env_vars, and workflows/build_and_run_tests based on environment
 env_settings: &env_settings
-    <<: *default_env_settings
-    #<<: *high_capacity_env_settings
+    #<<: *default_env_settings
+    <<: *high_capacity_env_settings
 env_vars: &env_vars
-    <<: *resource_constrained_env_vars
-    #<<: *high_capacity_env_vars
+    #<<: *resource_constrained_env_vars
+    <<: *high_capacity_env_vars
 workflows:
     version: 2
-    build_and_run_tests: *default_jobs
+    #build_and_run_tests: *default_jobs
     #build_and_run_tests: *with_dtest_jobs_only
-    #build_and_run_tests: *with_dtest_jobs
+    build_and_run_tests: *with_dtest_jobs
 docker_image: &docker_image kjellman/cassandra-test:0.4.3
 version: 2
 jobs:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 40b18ae..9e7a599 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Add optional startup delay to wait until peers are ready (CASSANDRA-13993)
  * Add a few options to nodetool verify (CASSANDRA-14201)
  * CVE-2017-5929 Security vulnerability and redefine default log rotation policy (CASSANDRA-14183)
  * Use JVM default SSL validation algorithm instead of custom default (CASSANDRA-13259)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 875751b..ad91a9b 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -373,6 +373,11 @@ public class Config
 
     public String full_query_log_dir = null;
 
+    // parameters to adjust how much to delay startup until a certain amount of the cluster is connect to and marked alive
+    public int block_for_peers_percentage = 70;
+    public int block_for_peers_timeout_in_secs = 10;
+
+
     /**
      * @deprecated migrate to {@link DatabaseDescriptor#isClientInitialized()}
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index ccb0a30..2e772c5 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2522,4 +2522,14 @@ public class DatabaseDescriptor
     {
         return  conf.full_query_log_dir;
     }
+
+    public static int getBlockForPeersPercentage()
+    {
+        return conf.block_for_peers_percentage;
+    }
+
+    public static int getBlockForPeersTimeoutInSeconds()
+    {
+        return conf.block_for_peers_timeout_in_secs;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/MessageOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java
index 7d3c0af..236a770 100644
--- a/src/java/org/apache/cassandra/net/MessageOut.java
+++ b/src/java/org/apache/cassandra/net/MessageOut.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -97,6 +98,11 @@ public class MessageOut<T>
     public final List<Object> parameters;
 
     /**
+     * Allows sender to explicitly state which connection type the message should be sent on.
+     */
+    public final ConnectionType connectionType;
+
+    /**
      * Memoization of the serialized size of the just the payload.
      */
     private int payloadSerializedSize = -1;
@@ -122,24 +128,33 @@ public class MessageOut<T>
         this(verb,
              payload,
              serializer,
-             isTracing()
-                 ? Tracing.instance.getTraceHeaders()
-                 : ImmutableList.of());
+             isTracing() ? Tracing.instance.getTraceHeaders() : ImmutableList.of(),
+             null);
+    }
+
+    public MessageOut(MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, ConnectionType connectionType)
+    {
+        this(verb,
+             payload,
+             serializer,
+             isTracing() ? Tracing.instance.getTraceHeaders() : ImmutableList.of(),
+             connectionType);
     }
 
-    private MessageOut(MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, List<Object> parameters)
+    private MessageOut(MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, List<Object> parameters, ConnectionType connectionType)
     {
-        this(FBUtilities.getBroadcastAddressAndPort(), verb, payload, serializer, parameters);
+        this(FBUtilities.getBroadcastAddressAndPort(), verb, payload, serializer, parameters, connectionType);
     }
 
     @VisibleForTesting
-    public MessageOut(InetAddressAndPort from, MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, List<Object> parameters)
+    public MessageOut(InetAddressAndPort from, MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, List<Object> parameters, ConnectionType connectionType)
     {
         this.from = from;
         this.verb = verb;
         this.payload = payload;
         this.serializer = serializer;
         this.parameters = parameters;
+        this.connectionType = connectionType;
     }
 
     public <VT> MessageOut<T> withParameter(ParameterType type, VT value)
@@ -148,7 +163,7 @@ public class MessageOut<T>
         newParameters.addAll(parameters);
         newParameters.add(type);
         newParameters.add(value);
-        return new MessageOut<T>(verb, payload, serializer, newParameters);
+        return new MessageOut<T>(verb, payload, serializer, newParameters, connectionType);
     }
 
     public Stage getStage()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 8fdb395..573cf7d 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -35,7 +35,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
@@ -255,7 +254,10 @@ public final class MessagingService implements MessagingServiceMBean
                 return DatabaseDescriptor.getRangeRpcTimeout();
             }
         },
-        // remember to add new verbs at the end, since we serialize by ordinal
+        PING(),
+
+        // add new verbs after the existing verbs, but *before* the UNUSED verbs, since we serialize by ordinal.
+        // UNUSED verbs serve as padding for backwards compatability where a previous version needs to validate a verb from the future.
         UNUSED_1,
         UNUSED_2,
         UNUSED_3,
@@ -263,7 +265,7 @@ public final class MessagingService implements MessagingServiceMBean
         UNUSED_5,
         ;
 
-        private int id;
+        private final int id;
         Verb()
         {
             id = ordinal();
@@ -291,7 +293,11 @@ public final class MessagingService implements MessagingServiceMBean
         static
         {
             for (Verb v : values())
+            {
+                if (idToVerbMap.containsKey(v.getId()))
+                    throw new IllegalArgumentException("cannot have two verbs that map to the same id: " + v + " and " + v.getId());
                 idToVerbMap.put(v.getId(), v);
+            }
         }
 
         public static Verb fromId(int id)
@@ -347,6 +353,8 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.UNUSED_1, Stage.INTERNAL_RESPONSE);
         put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE);
         put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE);
+
+        put(Verb.PING, Stage.READ);
     }};
 
     /**
@@ -385,6 +393,8 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.HINT, HintMessage.serializer);
         put(Verb.BATCH_STORE, Batch.serializer);
         put(Verb.BATCH_REMOVE, UUIDSerializer.serializer);
+
+        put(Verb.PING, PingMessage.serializer);
     }};
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/PingMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/PingMessage.java b/src/java/org/apache/cassandra/net/PingMessage.java
new file mode 100644
index 0000000..4a19f22
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/PingMessage.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+
+import org.apache.cassandra.hints.HintResponse;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
+import org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType;
+
+/**
+ * Conceptually the same as {@link org.apache.cassandra.gms.EchoMessage}, but indicates to the recipient which
+ * {@link ConnectionType} should be used for the response.
+ */
+public class PingMessage
+{
+    public static IVersionedSerializer<PingMessage> serializer = new PingMessageSerializer();
+
+    public static final PingMessage smallChannelMessage = new PingMessage(ConnectionType.SMALL_MESSAGE);
+    public static final PingMessage largeChannelMessage = new PingMessage(ConnectionType.LARGE_MESSAGE);
+    public static final PingMessage gossipChannelMessage = new PingMessage(ConnectionType.GOSSIP);
+
+    public final ConnectionType connectionType;
+
+    public PingMessage(ConnectionType connectionType)
+    {
+        this.connectionType = connectionType;
+    }
+
+    public static class PingMessageSerializer implements IVersionedSerializer<PingMessage>
+    {
+        public void serialize(PingMessage t, DataOutputPlus out, int version) throws IOException
+        {
+            out.writeByte(t.connectionType.getId());
+        }
+
+        public PingMessage deserialize(DataInputPlus in, int version) throws IOException
+        {
+            ConnectionType connectionType = ConnectionType.fromId(in.readByte());
+
+            // if we ever create a new connection type, then during a rolling upgrade, the old nodes won't know about
+            // the new connection type (as it won't recognize the id), so just default to the small message type.
+            if (connectionType ==  null)
+                connectionType = ConnectionType.SMALL_MESSAGE;
+
+            switch (connectionType)
+            {
+                case LARGE_MESSAGE:
+                    return largeChannelMessage;
+                case GOSSIP:
+                    return gossipChannelMessage;
+                case SMALL_MESSAGE:
+                default:
+                    return smallChannelMessage;
+            }
+        }
+
+        public long serializedSize(PingMessage t, int version)
+        {
+            return 1;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/PingVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/PingVerbHandler.java b/src/java/org/apache/cassandra/net/PingVerbHandler.java
new file mode 100644
index 0000000..d959b91
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/PingVerbHandler.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+public class PingVerbHandler implements IVerbHandler<PingMessage>
+{
+    @Override
+    public void doVerb(MessageIn<PingMessage> message, int id)
+    {
+        MessageOut<PongMessage> msg = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, PongMessage.instance,
+                                                       PongMessage.serializer,
+                                                       message.payload.connectionType);
+        MessagingService.instance().sendReply(msg, id, message.from);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/PongMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/PongMessage.java b/src/java/org/apache/cassandra/net/PongMessage.java
new file mode 100644
index 0000000..bb89cdf
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/PongMessage.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+public class PongMessage
+{
+    public static final PongMessage instance = new PongMessage();
+    public static IVersionedSerializer<PongMessage> serializer = new PongMessage.PongMessageSerializer();
+
+    private PongMessage()
+    {   }
+
+    public static class PongMessageSerializer implements IVersionedSerializer<PongMessage>
+    {
+        public void serialize(PongMessage t, DataOutputPlus out, int version) throws IOException
+        {    }
+
+        public PongMessage deserialize(DataInputPlus in, int version) throws IOException
+        {
+            return instance;
+        }
+
+        public long serializedSize(PongMessage t, int version)
+        {
+            return 0;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java b/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
new file mode 100644
index 0000000..f22ab48
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.net.MessagingService.Verb.PING;
+
+public class StartupClusterConnectivityChecker
+{
+    private static final Logger logger = LoggerFactory.getLogger(StartupClusterConnectivityChecker.class);
+
+    enum State { CONTINUE, FINISH_SUCCESS, FINISH_TIMEOUT }
+
+    private final int targetPercent;
+    private final int timeoutSecs;
+    private final Predicate<InetAddressAndPort> gossipStatus;
+
+    public StartupClusterConnectivityChecker(int targetPercent, int timeoutSecs, Predicate<InetAddressAndPort> gossipStatus)
+    {
+        if (targetPercent < 0)
+        {
+            targetPercent = 0;
+        }
+        else if (targetPercent > 100)
+        {
+            targetPercent = 100;
+        }
+        this.targetPercent = targetPercent;
+
+        if (timeoutSecs < 0)
+        {
+            timeoutSecs = 1;
+        }
+        else if (timeoutSecs > 100)
+        {
+            logger.warn("setting the block-for-peers timeout (in seconds) to {} might be a bit excessive, but using it nonetheless", timeoutSecs);
+        }
+        this.timeoutSecs = timeoutSecs;
+
+        this.gossipStatus = gossipStatus;
+    }
+
+    public void execute(Set<InetAddressAndPort> peers)
+    {
+        if (peers == null || targetPercent == 0)
+            return;
+
+        // remove current node from the set
+        peers = peers.stream()
+                     .filter(peer -> !peer.equals(FBUtilities.getBroadcastAddressAndPort()))
+                     .collect(Collectors.toSet());
+
+        // don't block if there's no other nodes in the cluster (or we don't know about them)
+        if (peers.size() <= 0)
+            return;
+
+        logger.info("choosing to block until {}% of peers are marked alive and connections are established; max time to wait = {} seconds",
+                    targetPercent, timeoutSecs);
+
+        // first, send out a ping message to open up the non-gossip connections
+        final AtomicInteger connectedCount = sendPingMessages(peers);
+
+        final long startNanos = System.nanoTime();
+        final long expirationNanos = startNanos + TimeUnit.SECONDS.toNanos(timeoutSecs);
+        int completedRounds = 0;
+        while (checkStatus(peers, connectedCount, startNanos, expirationNanos < System.nanoTime(), completedRounds) == State.CONTINUE)
+        {
+            completedRounds++;
+            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MICROSECONDS);
+        }
+    }
+
+    State checkStatus(Set<InetAddressAndPort> peers, AtomicInteger connectedCount, final long startNanos, boolean beyondExpiration, final int completedRounds)
+    {
+        long currentAlive = peers.stream().filter(gossipStatus).count();
+        float currentAlivePercent = ((float) currentAlive / (float) peers.size()) * 100;
+
+        // assume two connections to remote host that we care to track here (small msg & large msg)
+        final int totalConnectionsSize = peers.size() * 2;
+        final int connectionsCount = connectedCount.get();
+        float currentConnectedPercent = ((float) connectionsCount / (float) totalConnectionsSize) * 100;
+
+        if (currentAlivePercent >= targetPercent && currentConnectedPercent >= targetPercent)
+        {
+            logger.info("after {} milliseconds, found {}% ({} / {}) of peers as marked alive, " +
+                        "and {}% ({} / {}) of peers as connected, " +
+                        "both of which are above the desired threshold of {}%",
+                        TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos),
+                        currentAlivePercent, currentAlive, peers.size(),
+                        currentConnectedPercent, connectionsCount, totalConnectionsSize,
+                        targetPercent);
+            return State.FINISH_SUCCESS;
+        }
+
+        // perform at least two rounds of checking, else this is kinda useless (and the operator set the aliveTimeoutSecs too low)
+        if (completedRounds >= 2 && beyondExpiration)
+        {
+            logger.info("after {} milliseconds, found {}% ({} / {}) of peers as marked alive, " +
+                        "and {}% ({} / {}) of peers as connected, " +
+                        "one or both of which is below the desired threshold of {}%",
+                        TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos),
+                        currentAlivePercent, currentAlive, peers.size(),
+                        currentConnectedPercent, connectionsCount, totalConnectionsSize,
+                        targetPercent);
+            return State.FINISH_TIMEOUT;
+        }
+        return State.CONTINUE;
+    }
+
+    /**
+     * Sends a "connection warmup" message to each peer in the collection, on every {@link OutboundConnectionIdentifier.ConnectionType}
+     * used for internode messaging.
+     */
+    private AtomicInteger sendPingMessages(Set<InetAddressAndPort> peers)
+    {
+        AtomicInteger connectedCount = new AtomicInteger(0);
+        IAsyncCallback responseHandler = new IAsyncCallback()
+        {
+            @Override
+            public boolean isLatencyForSnitch()
+            {
+                return false;
+            }
+
+            @Override
+            public void response(MessageIn msg)
+            {
+                connectedCount.incrementAndGet();
+            }
+        };
+
+        MessageOut<PingMessage> smallChannelMessageOut = new MessageOut<>(PING, PingMessage.smallChannelMessage, PingMessage.serializer);
+        MessageOut<PingMessage> largeChannelMessageOut = new MessageOut<>(PING, PingMessage.largeChannelMessage, PingMessage.serializer);
+        for (InetAddressAndPort peer : peers)
+        {
+            MessagingService.instance().sendRR(smallChannelMessageOut, peer, responseHandler);
+            MessagingService.instance().sendRR(largeChannelMessageOut, peer, responseHandler);
+        }
+
+        return connectedCount;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
index f3cb554..e309065 100644
--- a/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
+++ b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
@@ -18,9 +18,8 @@
 
 package org.apache.cassandra.net.async;
 
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
+import com.carrotsearch.hppc.IntObjectMap;
+import com.carrotsearch.hppc.IntObjectOpenHashMap;
 import org.apache.cassandra.locator.InetAddressAndPort;
 
 /**
@@ -32,9 +31,34 @@ import org.apache.cassandra.locator.InetAddressAndPort;
  */
 public class OutboundConnectionIdentifier
 {
-    enum ConnectionType
+    public enum ConnectionType
     {
-        GOSSIP, LARGE_MESSAGE, SMALL_MESSAGE, STREAM
+        GOSSIP (0), LARGE_MESSAGE (1), SMALL_MESSAGE (2), STREAM (3);
+
+        private final int id;
+
+        ConnectionType(int id)
+        {
+            this.id = id;
+        }
+
+        public int getId()
+        {
+            return id;
+        }
+
+        private static final IntObjectMap<ConnectionType> idMap = new IntObjectOpenHashMap<>(values().length);
+        static
+        {
+            for (ConnectionType type : values())
+                idMap.put(type.id, type);
+        }
+
+        public static ConnectionType fromId(int id)
+        {
+            return idMap.get(id);
+        }
+
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java b/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
index 28775ef..064131b 100644
--- a/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
+++ b/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
@@ -479,7 +479,7 @@ public class OutboundMessagingConnection
         {
             case SUCCESS:
                 assert result.channelWriter != null;
-                logger.debug("successfully connected to {}, conmpress = {}, coalescing = {}", connectionId,
+                logger.debug("successfully connected to {}, compress = {}, coalescing = {}", connectionId,
                              shouldCompressConnection(connectionId.local(), connectionId.remote()),
                              coalescingStrategy.isPresent() ? coalescingStrategy.get() : CoalescingStrategies.Strategy.DISABLED);
                 if (state.get() == State.CLOSED)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java b/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java
index c701229..14650a7 100644
--- a/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java
+++ b/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.net.async;
 
-import java.net.InetSocketAddress;
 import java.util.Optional;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -101,14 +100,21 @@ public class OutboundMessagingPool
     @VisibleForTesting
     public OutboundMessagingConnection getConnection(MessageOut msg)
     {
-        // optimize for the common path (the small message channel)
-        if (Stage.GOSSIP != msg.getStage())
+        if (msg.connectionType == null)
         {
-            return msg.serializedSize(smallMessageChannel.getTargetVersion()) < LARGE_MESSAGE_THRESHOLD
-            ? smallMessageChannel
-            : largeMessageChannel;
+            // optimize for the common path (the small message channel)
+            if (Stage.GOSSIP != msg.getStage())
+            {
+                return msg.serializedSize(smallMessageChannel.getTargetVersion()) < LARGE_MESSAGE_THRESHOLD
+                       ? smallMessageChannel
+                       : largeMessageChannel;
+            }
+            return gossipChannel;
+        }
+        else
+        {
+            return getConnection(msg.connectionType);
         }
-        return gossipChannel;
     }
 
     /**
@@ -138,20 +144,17 @@ public class OutboundMessagingPool
         smallMessageChannel.close(softClose);
     }
 
-    /**
-     * For testing purposes only.
-     */
     @VisibleForTesting
-    OutboundMessagingConnection getConnection(ConnectionType connectionType)
+    final OutboundMessagingConnection getConnection(ConnectionType connectionType)
     {
         switch (connectionType)
         {
-            case GOSSIP:
-                return gossipChannel;
-            case LARGE_MESSAGE:
-                return largeMessageChannel;
             case SMALL_MESSAGE:
                 return smallMessageChannel;
+            case LARGE_MESSAGE:
+                return largeMessageChannel;
+            case GOSSIP:
+                return gossipChannel;
             default:
                 throw new IllegalArgumentException("unsupported connection type: " + connectionType);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 130f3fd..295a33b 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -25,7 +25,10 @@ import java.net.InetAddress;
 import java.net.URL;
 import java.net.UnknownHostException;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
@@ -47,6 +50,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.StartupClusterConnectivityChecker;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.schema.Schema;
@@ -491,6 +495,12 @@ public class CassandraDaemon
      */
     public void start()
     {
+        StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(DatabaseDescriptor.getBlockForPeersPercentage(),
+                                                                                                      DatabaseDescriptor.getBlockForPeersTimeoutInSeconds(),
+                                                                                                      Gossiper.instance::isAlive);
+        Set<InetAddressAndPort> peers = Gossiper.instance.getEndpointStates().stream().map(Map.Entry::getKey).collect(Collectors.toSet());
+        connectivityChecker.execute(peers);
+
         String nativeFlag = System.getProperty("cassandra.start_native_transport");
         if ((nativeFlag != null && Boolean.parseBoolean(nativeFlag)) || (nativeFlag == null && DatabaseDescriptor.startNativeTransport()))
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/service/EchoVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/EchoVerbHandler.java b/src/java/org/apache/cassandra/service/EchoVerbHandler.java
index d0c435e..1cc52e9 100644
--- a/src/java/org/apache/cassandra/service/EchoVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/EchoVerbHandler.java
@@ -26,16 +26,20 @@ import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType;
+
 public class EchoVerbHandler implements IVerbHandler<EchoMessage>
 {
     private static final Logger logger = LoggerFactory.getLogger(EchoVerbHandler.class);
 
     public void doVerb(MessageIn<EchoMessage> message, int id)
     {
-        MessageOut<EchoMessage> echoMessage = new MessageOut<EchoMessage>(MessagingService.Verb.REQUEST_RESPONSE, EchoMessage.instance, EchoMessage.serializer);
+        MessageOut<EchoMessage> echoMessage = new MessageOut<EchoMessage>(MessagingService.Verb.REQUEST_RESPONSE, EchoMessage.instance,
+                                                                          EchoMessage.serializer, ConnectionType.GOSSIP);
         logger.trace("Sending a EchoMessage reply {}", message.from);
         MessagingService.instance().sendReply(echoMessage, id, message.from);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index d465431..51b77a6 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -92,6 +92,7 @@ import org.apache.cassandra.schema.SchemaVersionVerbHandler;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.schema.ViewMetadata;
+import org.apache.cassandra.repair.RepairMessageVerbHandler;
 import org.apache.cassandra.service.paxos.CommitVerbHandler;
 import org.apache.cassandra.service.paxos.PrepareVerbHandler;
 import org.apache.cassandra.service.paxos.ProposeVerbHandler;
@@ -301,6 +302,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCH_STORE, new BatchStoreVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCH_REMOVE, new BatchRemoveVerbHandler());
+
+        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PING, new PingVerbHandler());
     }
 
     public void registerDaemon(CassandraDaemon daemon)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java b/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java
new file mode 100644
index 0000000..12f54c6
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.net.UnknownHostException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+
+import com.google.common.net.InetAddresses;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+public class StartupClusterConnectivityCheckerTest
+{
+    @Test
+    public void testConnectivity_SimpleHappyPath() throws UnknownHostException
+    {
+        StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(70, 10, addr -> true);
+        int count = 10;
+        Set<InetAddressAndPort> peers = createNodes(count);
+        Assert.assertEquals(StartupClusterConnectivityChecker.State.FINISH_SUCCESS,
+                            connectivityChecker.checkStatus(peers, new AtomicInteger(count * 2), System.nanoTime(), false, 0));
+    }
+
+    @Test
+    public void testConnectivity_SimpleContinue() throws UnknownHostException
+    {
+        StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(70, 10, addr -> true);
+        int count = 10;
+        Set<InetAddressAndPort> peers = createNodes(count);
+        Assert.assertEquals(StartupClusterConnectivityChecker.State.CONTINUE,
+                            connectivityChecker.checkStatus(peers, new AtomicInteger(0), System.nanoTime(), false, 0));
+    }
+
+    @Test
+    public void testConnectivity_Timeout() throws UnknownHostException
+    {
+        StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(70, 10, addr -> true);
+        int count = 10;
+        Set<InetAddressAndPort> peers = createNodes(count);
+        Assert.assertEquals(StartupClusterConnectivityChecker.State.CONTINUE,
+                            connectivityChecker.checkStatus(peers, new AtomicInteger(0), System.nanoTime(), false, 4));
+        Assert.assertEquals(StartupClusterConnectivityChecker.State.FINISH_TIMEOUT,
+                            connectivityChecker.checkStatus(peers, new AtomicInteger(0), System.nanoTime(), true, 5));
+    }
+
+    @Test
+    public void testConnectivity_SimpleUpdating() throws UnknownHostException
+    {
+        UpdatablePredicate predicate = new UpdatablePredicate();
+        final int count = 100;
+        final int thresholdPercentage = 70;
+        StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(thresholdPercentage, 10, predicate);
+        Set<InetAddressAndPort> peers = createNodes(count);
+
+        AtomicInteger connectedCount = new AtomicInteger();
+
+        for (int i = 0; i < count; i++)
+        {
+            predicate.reset(i);
+            connectedCount.set(i * 2);
+            StartupClusterConnectivityChecker.State expectedState = i < thresholdPercentage ?
+                                                                    StartupClusterConnectivityChecker.State.CONTINUE :
+                                                                    StartupClusterConnectivityChecker.State.FINISH_SUCCESS;
+            Assert.assertEquals("failed on iteration " + i,
+                                expectedState, connectivityChecker.checkStatus(peers, connectedCount, System.nanoTime(), false, i));
+        }
+    }
+
+    /**
+     * returns true until index = threshold, then returns false.
+     */
+    private class UpdatablePredicate implements Predicate<InetAddressAndPort>
+    {
+        int index;
+        int threshold;
+
+        void reset(int threshold)
+        {
+            index = 0;
+            this.threshold = threshold;
+        }
+
+        @Override
+        public boolean test(InetAddressAndPort inetAddressAndPort)
+        {
+            index++;
+            return index <= threshold;
+        }
+    }
+
+    private static Set<InetAddressAndPort> createNodes(int count) throws UnknownHostException
+    {
+        Set<InetAddressAndPort> nodes = new HashSet<>();
+
+        if (count < 1)
+            Assert.fail("need at least *one* node in the set!");
+
+        InetAddressAndPort node = InetAddressAndPort.getByName("127.0.0.1");
+        nodes.add(node);
+        for (int i = 1; i < count; i++)
+        {
+            node = InetAddressAndPort.getByAddress(InetAddresses.increment(node.address));
+            nodes.add(node);
+        }
+        return nodes;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b86801e9/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 6714a83..0d39322 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -161,7 +161,7 @@ public class RemoveTest
 
         for (InetAddressAndPort host : hosts)
         {
-            MessageOut msg = new MessageOut(host, MessagingService.Verb.REPLICATION_FINISHED, null, null, Collections.<Object>emptyList());
+            MessageOut msg = new MessageOut(host, MessagingService.Verb.REPLICATION_FINISHED, null, null, Collections.<Object>emptyList(), null);
             MessagingService.instance().sendRR(msg, FBUtilities.getBroadcastAddressAndPort());
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org