You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2018/05/17 17:40:12 UTC

[1/6] cassandra git commit: Cleanup StartupClusterConnectivityChecker and PING Verb

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 6efa99c7a -> 06b3521ac
  refs/heads/cassandra-3.11 ab8348c57 -> 0d4aacc83
  refs/heads/trunk e210a05a5 -> d585410cc


Cleanup StartupClusterConnectivityChecker and PING Verb

patch by jasobrown; reviewed by Ariel Weisberg for CASSANDRA-14447


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06b3521a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06b3521a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06b3521a

Branch: refs/heads/cassandra-3.0
Commit: 06b3521acdb21dd3d85902d59146b9d08ad7d752
Parents: 6efa99c
Author: Jason Brown <ja...@gmail.com>
Authored: Fri Apr 6 11:47:35 2018 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Thu May 17 10:34:31 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/net/MessagingService.java  | 15 ++++-
 .../org/apache/cassandra/net/PingMessage.java   | 60 ++++++++++++++++++++
 3 files changed, 73 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/06b3521a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3da808a..1293bd4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.17
+ * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
  * Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)
  * Cassandra not starting when using enhanced startup scripts in windows (CASSANDRA-14418)
  * Fix progress stats and units in compactionstats (CASSANDRA-12244)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06b3521a/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index e0f77b7..047f51f 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -147,13 +147,20 @@ public final class MessagingService implements MessagingServiceMBean
         PAXOS_PROPOSE,
         PAXOS_COMMIT,
         @Deprecated PAGED_RANGE,
-        // remember to add new verbs at the end, since we serialize by ordinal
-        UNUSED_1,
+        PING,
+
+        // UNUSED verbs were used as padding for backward/forward compatability before 4.0,
+        // but it wasn't quite as bullet/future proof as needed. We still need to keep these entries
+        // around, at least for a major rev or two (post-4.0). see CASSANDRA-13993 for a discussion.
+        // For now, though, the UNUSED are legacy values (placeholders, basically) that should only be used
+        // for correctly adding VERBs that need to be emergency additions to 3.0/3.11.
+        // We can reclaim them (their id's, to be correct) in future versions, if desired, though.
         UNUSED_2,
         UNUSED_3,
         UNUSED_4,
         UNUSED_5,
         ;
+        // remember to add new verbs at the end, since we serialize by ordinal
 
         // This is to support a "late" choice of the verb based on the messaging service version.
         // See CASSANDRA-12249 for more details.
@@ -210,9 +217,10 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.SNAPSHOT, Stage.MISC);
         put(Verb.ECHO, Stage.GOSSIP);
 
-        put(Verb.UNUSED_1, Stage.INTERNAL_RESPONSE);
         put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE);
         put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE);
+
+        put(Verb.PING, Stage.READ);
     }};
 
     /**
@@ -251,6 +259,7 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.HINT, HintMessage.serializer);
         put(Verb.BATCH_STORE, Batch.serializer);
         put(Verb.BATCH_REMOVE, UUIDSerializer.serializer);
+        put(Verb.PING, PingMessage.serializer);
     }};
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06b3521a/src/java/org/apache/cassandra/net/PingMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/PingMessage.java b/src/java/org/apache/cassandra/net/PingMessage.java
new file mode 100644
index 0000000..8eaf23e
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/PingMessage.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * A backport of the version from 4.0, intentionnaly added as versions 4.0 or greater aren't guaranteed
+ * to know the c* versions they communicate with before they connect.
+ *
+ * It is intentional that no {@link IVerbHandler} is provided as we do not want process the message;
+ * the intent is to not break the stream by leaving it in an unclean state, with unconsumed bytes.
+ * We do, however, assign a {@link org.apache.cassandra.concurrent.StageManager} stage
+ * to maintain proper message flow.
+ * See CASSANDRA-13393 for a discussion.
+ */
+public class PingMessage
+{
+    public static IVersionedSerializer<PingMessage> serializer = new PingMessageSerializer();
+
+    public static class PingMessageSerializer implements IVersionedSerializer<PingMessage>
+    {
+        public void serialize(PingMessage t, DataOutputPlus out, int version)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public PingMessage deserialize(DataInputPlus in, int version) throws IOException
+        {
+            // throw away the one byte of the payload
+            in.readByte();
+            return new PingMessage();
+        }
+
+        public long serializedSize(PingMessage t, int version)
+        {
+            return 1;
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[4/6] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by ja...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d4aacc8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d4aacc8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d4aacc8

Branch: refs/heads/trunk
Commit: 0d4aacc832b1ffa428970d5c0e235d54585e22c2
Parents: ab8348c 06b3521
Author: Jason Brown <ja...@gmail.com>
Authored: Thu May 17 10:34:56 2018 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Thu May 17 10:35:36 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/net/MessagingService.java  | 14 ++++-
 .../org/apache/cassandra/net/PingMessage.java   | 60 ++++++++++++++++++++
 3 files changed, 72 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d4aacc8/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d70b381,1293bd4..87e7c24
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,20 -1,5 +1,21 @@@
 -3.0.17
 +3.11.3
 + * Fix New SASI view creation during Index Redistribution (CASSANDRA-14055)
 + * Remove string formatting lines from BufferPool hot path (CASSANDRA-14416)
 + * Update metrics to 3.1.5 (CASSANDRA-12924)
 + * Detect OpenJDK jvm type and architecture (CASSANDRA-12793)
 + * Don't use guava collections in the non-system keyspace jmx attributes (CASSANDRA-12271)
 + * Allow existing nodes to use all peers in shadow round (CASSANDRA-13851)
 + * Fix cqlsh to read connection.ssl cqlshrc option again (CASSANDRA-14299)
 + * Downgrade log level to trace for CommitLogSegmentManager (CASSANDRA-14370)
 + * CQL fromJson(null) throws NullPointerException (CASSANDRA-13891)
 + * Serialize empty buffer as empty string for json output format (CASSANDRA-14245)
 + * Allow logging implementation to be interchanged for embedded testing (CASSANDRA-13396)
 + * SASI tokenizer for simple delimiter based entries (CASSANDRA-14247)
 + * Fix Loss of digits when doing CAST from varint/bigint to decimal (CASSANDRA-14170)
 + * RateBasedBackPressure unnecessarily invokes a lock on the Guava RateLimiter (CASSANDRA-14163)
 + * Fix wildcard GROUP BY queries (CASSANDRA-14209)
 +Merged from 3.0:
+  * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
   * Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)
   * Cassandra not starting when using enhanced startup scripts in windows (CASSANDRA-14418)
   * Fix progress stats and units in compactionstats (CASSANDRA-12244)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d4aacc8/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index 027e218,047f51f..59ed8f3
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -206,36 -143,18 +206,41 @@@ public final class MessagingService imp
          _TRACE, // dummy verb so we can use MS.droppedMessagesMap
          ECHO,
          REPAIR_MESSAGE,
 -        PAXOS_PREPARE,
 -        PAXOS_PROPOSE,
 -        PAXOS_COMMIT,
 -        @Deprecated PAGED_RANGE,
 +        PAXOS_PREPARE
 +        {
 +            public long getTimeout()
 +            {
 +                return DatabaseDescriptor.getWriteRpcTimeout();
 +            }
 +        },
 +        PAXOS_PROPOSE
 +        {
 +            public long getTimeout()
 +            {
 +                return DatabaseDescriptor.getWriteRpcTimeout();
 +            }
 +        },
 +        PAXOS_COMMIT
 +        {
 +            public long getTimeout()
 +            {
 +                return DatabaseDescriptor.getWriteRpcTimeout();
 +            }
 +        },
 +        @Deprecated PAGED_RANGE
 +        {
 +            public long getTimeout()
 +            {
 +                return DatabaseDescriptor.getRangeRpcTimeout();
 +            }
 +        },
-         // remember to add new verbs at the end, since we serialize by ordinal
-         UNUSED_1,
+         PING,
 -
+         // UNUSED verbs were used as padding for backward/forward compatability before 4.0,
+         // but it wasn't quite as bullet/future proof as needed. We still need to keep these entries
+         // around, at least for a major rev or two (post-4.0). see CASSANDRA-13993 for a discussion.
+         // For now, though, the UNUSED are legacy values (placeholders, basically) that should only be used
+         // for correctly adding VERBs that need to be emergency additions to 3.0/3.11.
+         // We can reclaim them (their id's, to be correct) in future versions, if desired, though.
          UNUSED_2,
          UNUSED_3,
          UNUSED_4,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[3/6] cassandra git commit: Cleanup StartupClusterConnectivityChecker and PING Verb

Posted by ja...@apache.org.
Cleanup StartupClusterConnectivityChecker and PING Verb

patch by jasobrown; reviewed by Ariel Weisberg for CASSANDRA-14447


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06b3521a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06b3521a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06b3521a

Branch: refs/heads/trunk
Commit: 06b3521acdb21dd3d85902d59146b9d08ad7d752
Parents: 6efa99c
Author: Jason Brown <ja...@gmail.com>
Authored: Fri Apr 6 11:47:35 2018 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Thu May 17 10:34:31 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/net/MessagingService.java  | 15 ++++-
 .../org/apache/cassandra/net/PingMessage.java   | 60 ++++++++++++++++++++
 3 files changed, 73 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/06b3521a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3da808a..1293bd4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.17
+ * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
  * Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)
  * Cassandra not starting when using enhanced startup scripts in windows (CASSANDRA-14418)
  * Fix progress stats and units in compactionstats (CASSANDRA-12244)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06b3521a/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index e0f77b7..047f51f 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -147,13 +147,20 @@ public final class MessagingService implements MessagingServiceMBean
         PAXOS_PROPOSE,
         PAXOS_COMMIT,
         @Deprecated PAGED_RANGE,
-        // remember to add new verbs at the end, since we serialize by ordinal
-        UNUSED_1,
+        PING,
+
+        // UNUSED verbs were used as padding for backward/forward compatability before 4.0,
+        // but it wasn't quite as bullet/future proof as needed. We still need to keep these entries
+        // around, at least for a major rev or two (post-4.0). see CASSANDRA-13993 for a discussion.
+        // For now, though, the UNUSED are legacy values (placeholders, basically) that should only be used
+        // for correctly adding VERBs that need to be emergency additions to 3.0/3.11.
+        // We can reclaim them (their id's, to be correct) in future versions, if desired, though.
         UNUSED_2,
         UNUSED_3,
         UNUSED_4,
         UNUSED_5,
         ;
+        // remember to add new verbs at the end, since we serialize by ordinal
 
         // This is to support a "late" choice of the verb based on the messaging service version.
         // See CASSANDRA-12249 for more details.
@@ -210,9 +217,10 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.SNAPSHOT, Stage.MISC);
         put(Verb.ECHO, Stage.GOSSIP);
 
-        put(Verb.UNUSED_1, Stage.INTERNAL_RESPONSE);
         put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE);
         put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE);
+
+        put(Verb.PING, Stage.READ);
     }};
 
     /**
@@ -251,6 +259,7 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.HINT, HintMessage.serializer);
         put(Verb.BATCH_STORE, Batch.serializer);
         put(Verb.BATCH_REMOVE, UUIDSerializer.serializer);
+        put(Verb.PING, PingMessage.serializer);
     }};
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06b3521a/src/java/org/apache/cassandra/net/PingMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/PingMessage.java b/src/java/org/apache/cassandra/net/PingMessage.java
new file mode 100644
index 0000000..8eaf23e
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/PingMessage.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * A backport of the version from 4.0, intentionnaly added as versions 4.0 or greater aren't guaranteed
+ * to know the c* versions they communicate with before they connect.
+ *
+ * It is intentional that no {@link IVerbHandler} is provided as we do not want process the message;
+ * the intent is to not break the stream by leaving it in an unclean state, with unconsumed bytes.
+ * We do, however, assign a {@link org.apache.cassandra.concurrent.StageManager} stage
+ * to maintain proper message flow.
+ * See CASSANDRA-13393 for a discussion.
+ */
+public class PingMessage
+{
+    public static IVersionedSerializer<PingMessage> serializer = new PingMessageSerializer();
+
+    public static class PingMessageSerializer implements IVersionedSerializer<PingMessage>
+    {
+        public void serialize(PingMessage t, DataOutputPlus out, int version)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public PingMessage deserialize(DataInputPlus in, int version) throws IOException
+        {
+            // throw away the one byte of the payload
+            in.readByte();
+            return new PingMessage();
+        }
+
+        public long serializedSize(PingMessage t, int version)
+        {
+            return 1;
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[6/6] cassandra git commit: Merge branch 'cassandra-3.11' into trunk

Posted by ja...@apache.org.
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d585410c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d585410c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d585410c

Branch: refs/heads/trunk
Commit: d585410ccb42011ca71441471d1e2949e5ddedb5
Parents: e210a05 0d4aacc
Author: Jason Brown <ja...@gmail.com>
Authored: Thu May 17 10:35:51 2018 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Thu May 17 10:39:37 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |   6 +
 .../org/apache/cassandra/gms/EndpointState.java |   2 +-
 src/java/org/apache/cassandra/gms/Gossiper.java |  15 +-
 .../apache/cassandra/gms/HeartBeatState.java    |   4 +-
 .../cassandra/net/MessageDeliveryTask.java      |  25 +-
 .../org/apache/cassandra/net/MessageIn.java     |   6 +
 .../apache/cassandra/net/MessagingService.java  |  36 ++-
 .../net/StartupClusterConnectivityChecker.java  | 231 +++++++++++--------
 .../cassandra/service/CassandraDaemon.java      |  11 +-
 .../cassandra/net/MessageDeliveryTaskTest.java  | 121 ++++++++++
 .../org/apache/cassandra/net/MessageInTest.java |  66 ++++++
 .../StartupClusterConnectivityCheckerTest.java  | 160 +++++++------
 13 files changed, 497 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index cfe56c6,87e7c24..097db1d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -256,6 -15,8 +256,7 @@@
   * RateBasedBackPressure unnecessarily invokes a lock on the Guava RateLimiter (CASSANDRA-14163)
   * Fix wildcard GROUP BY queries (CASSANDRA-14209)
  Merged from 3.0:
+  * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
 - * Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)
   * Cassandra not starting when using enhanced startup scripts in windows (CASSANDRA-14418)
   * Fix progress stats and units in compactionstats (CASSANDRA-12244)
   * Better handle missing partition columns in system_schema.columns (CASSANDRA-14379)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 699148d,8b92c5a..592b96e
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -26,9 -26,8 +26,10 @@@ import java.nio.file.NoSuchFileExceptio
  import java.nio.file.Path;
  import java.nio.file.Paths;
  import java.util.*;
++import java.util.concurrent.TimeUnit;
  
  import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Preconditions;
  import com.google.common.collect.ImmutableSet;
  import com.google.common.primitives.Ints;
  import com.google.common.primitives.Longs;
@@@ -1487,6 -1491,6 +1488,11 @@@ public class DatabaseDescripto
                           getTruncateRpcTimeout());
      }
  
++    public static long getPingTimeout()
++    {
++        return TimeUnit.SECONDS.toMillis(getBlockForPeersTimeoutInSeconds());
++    }
++
      public static double getPhiConvictThreshold()
      {
          return conf.phi_convict_threshold;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/gms/EndpointState.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/EndpointState.java
index 1085447,674b597..5646bf6
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@@ -50,7 -54,7 +50,7 @@@ public class EndpointStat
      private volatile long updateTimestamp;
      private volatile boolean isAlive;
  
--    EndpointState(HeartBeatState initialHbState)
++    public EndpointState(HeartBeatState initialHbState)
      {
          this(initialHbState, new EnumMap<ApplicationState, VersionedValue>(ApplicationState.class));
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index 4c66669,ea05525..3975187
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -32,13 -31,7 +32,13 @@@ import javax.management.ObjectName
  import com.google.common.annotations.VisibleForTesting;
  import com.google.common.collect.ImmutableList;
  import com.google.common.collect.ImmutableMap;
 +import com.google.common.collect.Iterables;
++import com.google.common.collect.ImmutableSet;
  import com.google.common.util.concurrent.Uninterruptibles;
 +
 +import org.apache.cassandra.locator.InetAddressAndPort;
- import org.apache.cassandra.locator.SeedProvider;
 +import org.apache.cassandra.utils.CassandraVersion;
 +import org.apache.cassandra.utils.Pair;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -878,12 -854,12 +878,17 @@@ public class Gossiper implements IFailu
          return endpointStateMap.get(ep);
      }
  
-     public Set<Entry<InetAddressAndPort, EndpointState>> getEndpointStates()
 -    public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
++    public ImmutableSet<InetAddressAndPort> getEndpoints()
 +    {
-         return endpointStateMap.entrySet();
++        return ImmutableSet.copyOf(endpointStateMap.keySet());
++    }
++
++    public int getEndpointCount()
+     {
 -        return endpointStateMap.entrySet();
++        return endpointStateMap.size();
      }
  
 -    public UUID getHostId(InetAddress endpoint)
 +    public UUID getHostId(InetAddressAndPort endpoint)
      {
          return getHostId(endpoint, endpointStateMap);
      }
@@@ -1798,11 -1693,11 +1803,11 @@@
          Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_MIN_WAIT_MS, TimeUnit.MILLISECONDS);
          int totalPolls = 0;
          int numOkay = 0;
--        int epSize = Gossiper.instance.getEndpointStates().size();
++        int epSize = Gossiper.instance.getEndpointCount();
          while (numOkay < GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED)
          {
              Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
--            int currentSize = Gossiper.instance.getEndpointStates().size();
++            int currentSize = Gossiper.instance.getEndpointCount();
              totalPolls++;
              if (currentSize == epSize)
              {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/gms/HeartBeatState.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/HeartBeatState.java
index 13e1ace,13e1ace..2abd5d7
--- a/src/java/org/apache/cassandra/gms/HeartBeatState.java
+++ b/src/java/org/apache/cassandra/gms/HeartBeatState.java
@@@ -27,7 -27,7 +27,7 @@@ import org.apache.cassandra.io.util.Dat
  /**
   * HeartBeat State associated with any given endpoint.
   */
--class HeartBeatState
++public class HeartBeatState
  {
      public static final IVersionedSerializer<HeartBeatState> serializer = new HeartBeatStateSerializer();
  
@@@ -39,7 -39,7 +39,7 @@@
          this(gen, 0);
      }
  
--    HeartBeatState(int gen, int ver)
++    public HeartBeatState(int gen, int ver)
      {
          generation = gen;
          version = ver;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index 6e132a8,c91e9da..1b9090c
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@@ -20,7 -20,6 +20,8 @@@ package org.apache.cassandra.net
  import java.io.IOException;
  import java.util.EnumSet;
  
++import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.primitives.Shorts;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -50,23 -45,20 +51,40 @@@ public class MessageDeliveryTask implem
  
      public void run()
      {
++        process();
++    }
++
++    /**
++     * A helper function for making unit testing reasonable.
++     *
++     * @return true if the message was processed; else false.
++     */
++    @VisibleForTesting
++    boolean process()
++    {
          MessagingService.Verb verb = message.verb;
++        if (verb == null)
++        {
++            logger.trace("Unknown verb {}", verb);
++            return false;
++        }
++
 +        MessagingService.instance().metrics.addQueueWaitTime(verb.toString(),
 +                                                             ApproximateTime.currentTimeMillis() - enqueueTime);
 +
          long timeTaken = message.getLifetimeInMS();
          if (MessagingService.DROPPABLE_VERBS.contains(verb)
              && timeTaken > message.getTimeout())
          {
              MessagingService.instance().incrementDroppedMessages(message, timeTaken);
--            return;
++            return false;
          }
  
          IVerbHandler verbHandler = MessagingService.instance().getVerbHandler(verb);
          if (verbHandler == null)
          {
--            logger.trace("Unknown verb {}", verb);
--            return;
++            logger.trace("No handler for verb {}", verb);
++            return false;
          }
  
          try
@@@ -91,6 -83,6 +109,7 @@@
  
          if (GOSSIP_VERBS.contains(message.verb))
              Gossiper.instance.setLastProcessedMessageAt(message.constructionTime);
++        return true;
      }
  
      private void handleFailure(Throwable t)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessageIn.java
index 7fb866f,d06d515..1cd39f3
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@@ -146,8 -117,8 +146,14 @@@ public class MessageIn<T
              }
              serializer = (IVersionedSerializer<T2>) callback.serializer;
          }
++
          if (payloadSize == 0 || serializer == null)
++        {
++            // if there's no deserializer for the verb, skip the payload bytes to leave
++            // the stream in a clean state (for the next message)
++            in.skipBytesFully(payloadSize);
              return create(from, null, parameters, verb, version, constructionTime);
++        }
  
          T2 payload = serializer.deserialize(in, version);
          return MessageIn.create(from, payload, parameters, verb, version, constructionTime);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index a590723,59ed8f3..f5051fb
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -254,57 -234,37 +254,67 @@@ public final class MessagingService imp
                  return DatabaseDescriptor.getRangeRpcTimeout();
              }
          },
-         PING(),
 -        PING,
++        PING
++        {
++            public long getTimeout()
++            {
++                return DatabaseDescriptor.getPingTimeout();
++            }
++        },
 +
-         // add new verbs after the existing verbs, but *before* the UNUSED verbs, since we serialize by ordinal.
-         // UNUSED verbs serve as padding for backwards compatability where a previous version needs to validate a verb from the future.
-         UNUSED_1,
+         // UNUSED verbs were used as padding for backward/forward compatability before 4.0,
+         // but it wasn't quite as bullet/future proof as needed. We still need to keep these entries
+         // around, at least for a major rev or two (post-4.0). see CASSANDRA-13993 for a discussion.
+         // For now, though, the UNUSED are legacy values (placeholders, basically) that should only be used
+         // for correctly adding VERBs that need to be emergency additions to 3.0/3.11.
 -        // We can reclaim them (their id's, to be correct) in future versions, if desired, though.
++        // We can reclaim them (their id's, to be correct) in future versions, if desireed, though.
          UNUSED_2,
          UNUSED_3,
          UNUSED_4,
          UNUSED_5,
          ;
 -        // remember to add new verbs at the end, since we serialize by ordinal
++        // add new verbs after the existing verbs, since we serialize by ordinal.
  
 -        // This is to support a "late" choice of the verb based on the messaging service version.
 -        // See CASSANDRA-12249 for more details.
 -        public static Verb convertForMessagingServiceVersion(Verb verb, int version)
 +        private final int id;
 +        Verb()
          {
 -            if (verb == PAGED_RANGE && version >= VERSION_30)
 -                return RANGE_SLICE;
 +            id = ordinal();
 +        }
  
 -            return verb;
 +        /**
 +         * Unused, but it is an extension point for adding custom verbs
 +         * @param id
 +         */
 +        Verb(int id)
 +        {
 +            this.id = id;
          }
  
          public long getTimeout()
          {
              return DatabaseDescriptor.getRpcTimeout();
          }
 -    }
  
 -    public static final Verb[] verbValues = Verb.values();
 +        public int getId()
 +        {
 +            return id;
 +        }
 +        private static final IntObjectMap<Verb> idToVerbMap = new IntObjectOpenHashMap<>(values().length);
 +        static
 +        {
 +            for (Verb v : values())
 +            {
-                 if (idToVerbMap.containsKey(v.getId()))
-                     throw new IllegalArgumentException("cannot have two verbs that map to the same id: " + v + " and " + v.getId());
-                 idToVerbMap.put(v.getId(), v);
++                Verb existing = idToVerbMap.put(v.getId(), v);
++                if (existing != null)
++                    throw new IllegalArgumentException("cannot have two verbs that map to the same id: " + v + " and " + existing);
 +            }
 +        }
 +
 +        public static Verb fromId(int id)
 +        {
 +            return idToVerbMap.get(id);
 +        }
 +    }
  
      public static final EnumMap<MessagingService.Verb, Stage> verbStages = new EnumMap<MessagingService.Verb, Stage>(MessagingService.Verb.class)
      {{
@@@ -350,9 -310,8 +360,10 @@@
          put(Verb.SNAPSHOT, Stage.MISC);
          put(Verb.ECHO, Stage.GOSSIP);
  
-         put(Verb.UNUSED_1, Stage.INTERNAL_RESPONSE);
          put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE);
          put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE);
++        put(Verb.UNUSED_4, Stage.INTERNAL_RESPONSE);
++        put(Verb.UNUSED_5, Stage.INTERNAL_RESPONSE);
  
          put(Verb.PING, Stage.READ);
      }};
@@@ -929,6 -827,6 +939,14 @@@
      }
  
      /**
++     * SHOULD ONLY BE USED FOR TESTING!!
++     */
++    public void removeVerbHandler(Verb verb)
++    {
++        verbHandlers.remove(verb);
++    }
++
++    /**
       * This method returns the verb handler associated with the registered
       * verb. If no handler has been registered then null is returned.
       *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
index f22ab48,0000000..db04ca3
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
+++ b/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
@@@ -1,171 -1,0 +1,216 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
- 
 +package org.apache.cassandra.net;
 +
++import java.util.HashSet;
++import java.util.Map;
 +import java.util.Set;
++import java.util.concurrent.ConcurrentHashMap;
++import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicInteger;
- import java.util.function.Predicate;
- import java.util.stream.Collectors;
 +
++import com.google.common.annotations.VisibleForTesting;
++import com.google.common.collect.Sets;
 +import com.google.common.util.concurrent.Uninterruptibles;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
++import org.apache.cassandra.gms.ApplicationState;
++import org.apache.cassandra.gms.EndpointState;
++import org.apache.cassandra.gms.Gossiper;
++import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
++import org.apache.cassandra.gms.VersionedValue;
 +import org.apache.cassandra.locator.InetAddressAndPort;
- import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
++import org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +import static org.apache.cassandra.net.MessagingService.Verb.PING;
++import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE;
++import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
 +
 +public class StartupClusterConnectivityChecker
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(StartupClusterConnectivityChecker.class);
 +
-     enum State { CONTINUE, FINISH_SUCCESS, FINISH_TIMEOUT }
- 
 +    private final int targetPercent;
-     private final int timeoutSecs;
-     private final Predicate<InetAddressAndPort> gossipStatus;
++    private final long timeoutNanos;
 +
-     public StartupClusterConnectivityChecker(int targetPercent, int timeoutSecs, Predicate<InetAddressAndPort> gossipStatus)
++    public static StartupClusterConnectivityChecker create(int targetPercent, int timeoutSecs)
 +    {
-         if (targetPercent < 0)
-         {
-             targetPercent = 0;
-         }
-         else if (targetPercent > 100)
-         {
-             targetPercent = 100;
-         }
-         this.targetPercent = targetPercent;
- 
-         if (timeoutSecs < 0)
-         {
-             timeoutSecs = 1;
-         }
-         else if (timeoutSecs > 100)
-         {
++        timeoutSecs = Math.max(1, timeoutSecs);
++        if (timeoutSecs > 100)
 +            logger.warn("setting the block-for-peers timeout (in seconds) to {} might be a bit excessive, but using it nonetheless", timeoutSecs);
-         }
-         this.timeoutSecs = timeoutSecs;
++        long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSecs);
 +
-         this.gossipStatus = gossipStatus;
++        return new StartupClusterConnectivityChecker(targetPercent, timeoutNanos);
 +    }
 +
-     public void execute(Set<InetAddressAndPort> peers)
++    @VisibleForTesting
++    StartupClusterConnectivityChecker(int targetPercent, long timeoutNanos)
 +    {
-         if (peers == null || targetPercent == 0)
-             return;
++        this.targetPercent = Math.min(100, Math.max(0, targetPercent));
++        this.timeoutNanos = timeoutNanos;
++    }
 +
-         // remove current node from the set
-         peers = peers.stream()
-                      .filter(peer -> !peer.equals(FBUtilities.getBroadcastAddressAndPort()))
-                      .collect(Collectors.toSet());
++    /**
++     * @param peers The currently known peers in the cluster; argument is not modified.
++     * @return true if the requested percentage of peers are marked ALIVE in gossip and have their connections opened;
++     * else false.
++     */
++    public boolean execute(Set<InetAddressAndPort> peers)
++    {
++        if (targetPercent == 0 || peers == null)
++            return true;
 +
-         // don't block if there's no other nodes in the cluster (or we don't know about them)
-         if (peers.size() <= 0)
-             return;
++        // make a copy of the set, to avoid mucking with the input (in case it's a sensitive collection)
++        peers = new HashSet<>(peers);
++        peers.remove(FBUtilities.getBroadcastAddressAndPort());
 +
-         logger.info("choosing to block until {}% of peers are marked alive and connections are established; max time to wait = {} seconds",
-                     targetPercent, timeoutSecs);
++        if (peers.isEmpty())
++            return true;
 +
-         // first, send out a ping message to open up the non-gossip connections
-         final AtomicInteger connectedCount = sendPingMessages(peers);
++        logger.info("choosing to block until {}% of the {} known peers are marked alive and connections are established; max time to wait = {} seconds",
++                    targetPercent, peers.size(), TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
 +
-         final long startNanos = System.nanoTime();
-         final long expirationNanos = startNanos + TimeUnit.SECONDS.toNanos(timeoutSecs);
-         int completedRounds = 0;
-         while (checkStatus(peers, connectedCount, startNanos, expirationNanos < System.nanoTime(), completedRounds) == State.CONTINUE)
-         {
-             completedRounds++;
-             Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MICROSECONDS);
-         }
-     }
++        long startNanos = System.nanoTime();
 +
-     State checkStatus(Set<InetAddressAndPort> peers, AtomicInteger connectedCount, final long startNanos, boolean beyondExpiration, final int completedRounds)
-     {
-         long currentAlive = peers.stream().filter(gossipStatus).count();
-         float currentAlivePercent = ((float) currentAlive / (float) peers.size()) * 100;
++        AckMap acks = new AckMap(3);
++        int target = (int) ((targetPercent / 100.0) * peers.size());
++        CountDownLatch latch = new CountDownLatch(target);
 +
-         // assume two connections to remote host that we care to track here (small msg & large msg)
-         final int totalConnectionsSize = peers.size() * 2;
-         final int connectionsCount = connectedCount.get();
-         float currentConnectedPercent = ((float) connectionsCount / (float) totalConnectionsSize) * 100;
++        // set up a listener to react to new nodes becoming alive (in gossip), and account for all the nodes that are already alive
++        Set<InetAddressAndPort> alivePeers = Sets.newSetFromMap(new ConcurrentHashMap<>());
++        AliveListener listener = new AliveListener(alivePeers, latch, acks);
++        Gossiper.instance.register(listener);
 +
-         if (currentAlivePercent >= targetPercent && currentConnectedPercent >= targetPercent)
-         {
-             logger.info("after {} milliseconds, found {}% ({} / {}) of peers as marked alive, " +
-                         "and {}% ({} / {}) of peers as connected, " +
-                         "both of which are above the desired threshold of {}%",
-                         TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos),
-                         currentAlivePercent, currentAlive, peers.size(),
-                         currentConnectedPercent, connectionsCount, totalConnectionsSize,
-                         targetPercent);
-             return State.FINISH_SUCCESS;
-         }
++        // send out a ping message to open up the non-gossip connections
++        sendPingMessages(peers, latch, acks);
 +
-         // perform at least two rounds of checking, else this is kinda useless (and the operator set the aliveTimeoutSecs too low)
-         if (completedRounds >= 2 && beyondExpiration)
-         {
-             logger.info("after {} milliseconds, found {}% ({} / {}) of peers as marked alive, " +
-                         "and {}% ({} / {}) of peers as connected, " +
-                         "one or both of which is below the desired threshold of {}%",
-                         TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos),
-                         currentAlivePercent, currentAlive, peers.size(),
-                         currentConnectedPercent, connectionsCount, totalConnectionsSize,
-                         targetPercent);
-             return State.FINISH_TIMEOUT;
-         }
-         return State.CONTINUE;
++        for (InetAddressAndPort peer : peers)
++            if (Gossiper.instance.isAlive(peer) && alivePeers.add(peer) && acks.incrementAndCheck(peer))
++                latch.countDown();
++
++        boolean succeeded = Uninterruptibles.awaitUninterruptibly(latch, timeoutNanos, TimeUnit.NANOSECONDS);
++        Gossiper.instance.unregister(listener);
++
++        int connected = peers.size() - (int) latch.getCount();
++        logger.info("After waiting/processing for {} milliseconds, {} out of {} peers ({}%) have been marked alive and had connections established",
++                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos),
++                    connected,
++                    peers.size(),
++                    connected / (peers.size()) * 100.0);
++        return succeeded;
 +    }
 +
 +    /**
-      * Sends a "connection warmup" message to each peer in the collection, on every {@link OutboundConnectionIdentifier.ConnectionType}
-      * used for internode messaging.
++     * Sends a "connection warmup" message to each peer in the collection, on every {@link ConnectionType}
++     * used for internode messaging (that is not gossip).
 +     */
-     private AtomicInteger sendPingMessages(Set<InetAddressAndPort> peers)
++    private void sendPingMessages(Set<InetAddressAndPort> peers, CountDownLatch latch, AckMap acks)
 +    {
-         AtomicInteger connectedCount = new AtomicInteger(0);
 +        IAsyncCallback responseHandler = new IAsyncCallback()
 +        {
-             @Override
 +            public boolean isLatencyForSnitch()
 +            {
 +                return false;
 +            }
 +
-             @Override
 +            public void response(MessageIn msg)
 +            {
-                 connectedCount.incrementAndGet();
++                if (acks.incrementAndCheck(msg.from))
++                    latch.countDown();
 +            }
 +        };
 +
-         MessageOut<PingMessage> smallChannelMessageOut = new MessageOut<>(PING, PingMessage.smallChannelMessage, PingMessage.serializer);
-         MessageOut<PingMessage> largeChannelMessageOut = new MessageOut<>(PING, PingMessage.largeChannelMessage, PingMessage.serializer);
++        MessageOut<PingMessage> smallChannelMessageOut = new MessageOut<>(PING, PingMessage.smallChannelMessage,
++                                                                          PingMessage.serializer, SMALL_MESSAGE);
++        MessageOut<PingMessage> largeChannelMessageOut = new MessageOut<>(PING, PingMessage.largeChannelMessage,
++                                                                          PingMessage.serializer, LARGE_MESSAGE);
 +        for (InetAddressAndPort peer : peers)
 +        {
 +            MessagingService.instance().sendRR(smallChannelMessageOut, peer, responseHandler);
 +            MessagingService.instance().sendRR(largeChannelMessageOut, peer, responseHandler);
 +        }
++    }
 +
-         return connectedCount;
++    /**
++     * A trivial implementation of {@link IEndpointStateChangeSubscriber} that really only cares about
++     * {@link #onAlive(InetAddressAndPort, EndpointState)} invocations.
++     */
++    private static final class AliveListener implements IEndpointStateChangeSubscriber
++    {
++        private final CountDownLatch latch;
++        private final Set<InetAddressAndPort> livePeers;
++        private final AckMap acks;
++
++        AliveListener(Set<InetAddressAndPort> livePeers, CountDownLatch latch, AckMap acks)
++        {
++            this.latch = latch;
++            this.livePeers = livePeers;
++            this.acks = acks;
++        }
++
++        public void onJoin(InetAddressAndPort endpoint, EndpointState epState)
++        {
++        }
++
++        public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue)
++        {
++        }
++
++        public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value)
++        {
++        }
++
++        public void onAlive(InetAddressAndPort endpoint, EndpointState state)
++        {
++            if (livePeers.add(endpoint) && acks.incrementAndCheck(endpoint))
++                latch.countDown();
++        }
++
++        public void onDead(InetAddressAndPort endpoint, EndpointState state)
++        {
++        }
++
++        public void onRemove(InetAddressAndPort endpoint)
++        {
++        }
++
++        public void onRestart(InetAddressAndPort endpoint, EndpointState state)
++        {
++        }
++    }
++
++    private static final class AckMap
++    {
++        private final int threshold;
++        private final Map<InetAddressAndPort, AtomicInteger> acks;
++
++        AckMap(int threshold)
++        {
++            this.threshold = threshold;
++            acks = new ConcurrentHashMap<>();
++        }
++
++        boolean incrementAndCheck(InetAddressAndPort address)
++        {
++            return acks.computeIfAbsent(address, addr -> new AtomicInteger(0)).incrementAndGet() == threshold;
++        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java
index 80b8b7b,d9bd5c3..6e0b92b
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@@ -503,12 -513,6 +500,10 @@@ public class CassandraDaemo
       */
      public void start()
      {
-         StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(DatabaseDescriptor.getBlockForPeersPercentage(),
-                                                                                                       DatabaseDescriptor.getBlockForPeersTimeoutInSeconds(),
-                                                                                                       Gossiper.instance::isAlive);
-         Set<InetAddressAndPort> peers = Gossiper.instance.getEndpointStates().stream().map(Map.Entry::getKey).collect(Collectors.toSet());
-         connectivityChecker.execute(peers);
++        StartupClusterConnectivityChecker connectivityChecker = StartupClusterConnectivityChecker.create(DatabaseDescriptor.getBlockForPeersPercentage(),
++                                                                                                         DatabaseDescriptor.getBlockForPeersTimeoutInSeconds());
++        connectivityChecker.execute(Gossiper.instance.getEndpoints());
 +
          String nativeFlag = System.getProperty("cassandra.start_native_transport");
          if ((nativeFlag != null && Boolean.parseBoolean(nativeFlag)) || (nativeFlag == null && DatabaseDescriptor.startNativeTransport()))
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/test/unit/org/apache/cassandra/net/MessageDeliveryTaskTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/net/MessageDeliveryTaskTest.java
index 0000000,0000000..db38efb
new file mode 100644
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/MessageDeliveryTaskTest.java
@@@ -1,0 -1,0 +1,121 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.cassandra.net;
++
++import java.net.UnknownHostException;
++import java.util.Collections;
++
++import org.junit.AfterClass;
++import org.junit.Assert;
++import org.junit.Before;
++import org.junit.BeforeClass;
++import org.junit.Test;
++
++import org.apache.cassandra.config.DatabaseDescriptor;
++import org.apache.cassandra.locator.InetAddressAndPort;
++
++public class MessageDeliveryTaskTest
++{
++    private static final MockVerbHandler VERB_HANDLER = new MockVerbHandler();
++
++    @BeforeClass
++    public static void before()
++    {
++        DatabaseDescriptor.daemonInitialization();
++        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.UNUSED_2, VERB_HANDLER);
++    }
++
++    @AfterClass
++    public static void after()
++    {
++        MessagingService.instance().removeVerbHandler(MessagingService.Verb.UNUSED_2);
++    }
++
++    @Before
++    public void setUp()
++    {
++        VERB_HANDLER.reset();
++    }
++
++    @Test
++    public void process_HappyPath() throws UnknownHostException
++    {
++        InetAddressAndPort addr = InetAddressAndPort.getByName("127.0.0.1");
++        MessageIn msg = MessageIn.create(addr, null, Collections.emptyMap(), MessagingService.Verb.UNUSED_2, 1);
++        MessageDeliveryTask task = new MessageDeliveryTask(msg, 42);
++        Assert.assertTrue(task.process());
++        Assert.assertEquals(1, VERB_HANDLER.invocationCount);
++    }
++
++    @Test
++    public void process_NullVerb() throws UnknownHostException
++    {
++        InetAddressAndPort addr = InetAddressAndPort.getByName("127.0.0.1");
++        MessageIn msg = MessageIn.create(addr, null, Collections.emptyMap(), null, 1);
++        MessageDeliveryTask task = new MessageDeliveryTask(msg, 42);
++        Assert.assertFalse(task.process());
++    }
++
++    @Test
++    public void process_NoHandler() throws UnknownHostException
++    {
++        InetAddressAndPort addr = InetAddressAndPort.getByName("127.0.0.1");
++        MessageIn msg = MessageIn.create(addr, null, Collections.emptyMap(), MessagingService.Verb.UNUSED_5, 1);
++        MessageDeliveryTask task = new MessageDeliveryTask(msg, 42);
++        Assert.assertFalse(task.process());
++    }
++
++    @Test
++    public void process_ExpiredDroppableMessage() throws UnknownHostException
++    {
++        InetAddressAndPort addr = InetAddressAndPort.getByName("127.0.0.1");
++
++        // we need any droppable verb, so just grab it from the enum itself rather than hard code a value
++        MessageIn msg = MessageIn.create(addr, null, Collections.emptyMap(), MessagingService.DROPPABLE_VERBS.iterator().next(), 1, 0);
++        MessageDeliveryTask task = new MessageDeliveryTask(msg, 42);
++        Assert.assertFalse(task.process());
++    }
++
++    // non-droppable message should still be processed even if they are expired
++    @Test
++    public void process_ExpiredMessage() throws UnknownHostException
++    {
++        InetAddressAndPort addr = InetAddressAndPort.getByName("127.0.0.1");
++        MessageIn msg = MessageIn.create(addr, null, Collections.emptyMap(), MessagingService.Verb.UNUSED_2, 1, 0);
++        MessageDeliveryTask task = new MessageDeliveryTask(msg, 42);
++        Assert.assertTrue(task.process());
++        Assert.assertEquals(1, VERB_HANDLER.invocationCount);
++    }
++
++    private static class MockVerbHandler implements IVerbHandler<Object>
++    {
++        private int invocationCount;
++
++        @Override
++        public void doVerb(MessageIn<Object> message, int id)
++        {
++            invocationCount++;
++        }
++
++        void reset()
++        {
++            invocationCount = 0;
++        }
++    }
++}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/test/unit/org/apache/cassandra/net/MessageInTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/net/MessageInTest.java
index 0000000,0000000..b9ea7da
new file mode 100644
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/MessageInTest.java
@@@ -1,0 -1,0 +1,66 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.cassandra.net;
++
++import java.io.IOException;
++import java.nio.ByteBuffer;
++import java.util.Collections;
++
++import org.junit.Assert;
++import org.junit.BeforeClass;
++import org.junit.Test;
++
++import org.apache.cassandra.config.DatabaseDescriptor;
++import org.apache.cassandra.io.util.DataInputBuffer;
++import org.apache.cassandra.io.util.DataInputPlus;
++import org.apache.cassandra.locator.InetAddressAndPort;
++
++public class MessageInTest
++{
++    @BeforeClass
++    public static void before()
++    {
++        DatabaseDescriptor.daemonInitialization();
++    }
++
++    // make sure deserializing message doesn't crash with an unknown verb
++    @Test
++    public void read_NullVerb() throws IOException
++    {
++        read(null);
++    }
++
++    @Test
++    public void read_NoSerializer() throws IOException
++    {
++        read(MessagingService.Verb.UNUSED_5);
++    }
++
++    private void read(MessagingService.Verb verb) throws IOException
++    {
++        InetAddressAndPort addr = InetAddressAndPort.getByName("127.0.0.1");
++        ByteBuffer buf = ByteBuffer.allocate(64);
++        buf.limit(buf.capacity());
++        DataInputPlus dataInputBuffer = new DataInputBuffer(buf, false);
++        int payloadSize = 27;
++        Assert.assertEquals(0, buf.position());
++        Assert.assertNotNull(MessageIn.read(dataInputBuffer, 1, 42, 0, addr, payloadSize, verb, Collections.emptyMap()));
++        Assert.assertEquals(payloadSize, buf.position());
++    }
++}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java
index 12f54c6,0000000..4eeb314
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java
+++ b/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java
@@@ -1,129 -1,0 +1,157 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.net;
 +
 +import java.net.UnknownHostException;
++import java.util.Collections;
++import java.util.HashMap;
 +import java.util.HashSet;
++import java.util.Map;
 +import java.util.Set;
- import java.util.concurrent.atomic.AtomicInteger;
- import java.util.function.Predicate;
 +
- import com.google.common.net.InetAddresses;
++import org.junit.After;
 +import org.junit.Assert;
++import org.junit.Before;
++import org.junit.BeforeClass;
 +import org.junit.Test;
 +
++import org.apache.cassandra.config.DatabaseDescriptor;
++import org.apache.cassandra.gms.EndpointState;
++import org.apache.cassandra.gms.Gossiper;
++import org.apache.cassandra.gms.HeartBeatState;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +
++import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
++
 +public class StartupClusterConnectivityCheckerTest
 +{
-     @Test
-     public void testConnectivity_SimpleHappyPath() throws UnknownHostException
++    private StartupClusterConnectivityChecker connectivityChecker;
++    private Set<InetAddressAndPort> peers;
++
++    @BeforeClass
++    public static void before()
 +    {
-         StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(70, 10, addr -> true);
-         int count = 10;
-         Set<InetAddressAndPort> peers = createNodes(count);
-         Assert.assertEquals(StartupClusterConnectivityChecker.State.FINISH_SUCCESS,
-                             connectivityChecker.checkStatus(peers, new AtomicInteger(count * 2), System.nanoTime(), false, 0));
++        DatabaseDescriptor.daemonInitialization();
 +    }
 +
-     @Test
-     public void testConnectivity_SimpleContinue() throws UnknownHostException
++    @Before
++    public void setUp() throws UnknownHostException
++    {
++        connectivityChecker = new StartupClusterConnectivityChecker(70, 10);
++        peers = new HashSet<>();
++        peers.add(InetAddressAndPort.getByName("127.0.1.0"));
++        peers.add(InetAddressAndPort.getByName("127.0.1.1"));
++        peers.add(InetAddressAndPort.getByName("127.0.1.2"));
++    }
++
++    @After
++    public void tearDown()
 +    {
-         StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(70, 10, addr -> true);
-         int count = 10;
-         Set<InetAddressAndPort> peers = createNodes(count);
-         Assert.assertEquals(StartupClusterConnectivityChecker.State.CONTINUE,
-                             connectivityChecker.checkStatus(peers, new AtomicInteger(0), System.nanoTime(), false, 0));
++        MessagingService.instance().clearMessageSinks();
 +    }
 +
 +    @Test
-     public void testConnectivity_Timeout() throws UnknownHostException
++    public void execute_HappyPath()
 +    {
-         StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(70, 10, addr -> true);
-         int count = 10;
-         Set<InetAddressAndPort> peers = createNodes(count);
-         Assert.assertEquals(StartupClusterConnectivityChecker.State.CONTINUE,
-                             connectivityChecker.checkStatus(peers, new AtomicInteger(0), System.nanoTime(), false, 4));
-         Assert.assertEquals(StartupClusterConnectivityChecker.State.FINISH_TIMEOUT,
-                             connectivityChecker.checkStatus(peers, new AtomicInteger(0), System.nanoTime(), true, 5));
++        Sink sink = new Sink(true, true);
++        MessagingService.instance().addMessageSink(sink);
++        Assert.assertTrue(connectivityChecker.execute(peers));
++        checkAllConnectionTypesSeen(sink);
 +    }
 +
 +    @Test
-     public void testConnectivity_SimpleUpdating() throws UnknownHostException
++    public void execute_NotAlive()
 +    {
-         UpdatablePredicate predicate = new UpdatablePredicate();
-         final int count = 100;
-         final int thresholdPercentage = 70;
-         StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(thresholdPercentage, 10, predicate);
-         Set<InetAddressAndPort> peers = createNodes(count);
++        Sink sink = new Sink(false, true);
++        MessagingService.instance().addMessageSink(sink);
++        Assert.assertFalse(connectivityChecker.execute(peers));
++        checkAllConnectionTypesSeen(sink);
++    }
 +
-         AtomicInteger connectedCount = new AtomicInteger();
++    @Test
++    public void execute_NoConnectionsAcks()
++    {
++        Sink sink = new Sink(true, false);
++        MessagingService.instance().addMessageSink(sink);
++        Assert.assertFalse(connectivityChecker.execute(peers));
++    }
 +
-         for (int i = 0; i < count; i++)
++    private void checkAllConnectionTypesSeen(Sink sink)
++    {
++        for (InetAddressAndPort peer : peers)
 +        {
-             predicate.reset(i);
-             connectedCount.set(i * 2);
-             StartupClusterConnectivityChecker.State expectedState = i < thresholdPercentage ?
-                                                                     StartupClusterConnectivityChecker.State.CONTINUE :
-                                                                     StartupClusterConnectivityChecker.State.FINISH_SUCCESS;
-             Assert.assertEquals("failed on iteration " + i,
-                                 expectedState, connectivityChecker.checkStatus(peers, connectedCount, System.nanoTime(), false, i));
++            ConnectionTypeRecorder recorder = sink.seenConnectionRequests.get(peer);
++            Assert.assertNotNull(recorder);
++            Assert.assertTrue(recorder.seenSmallMessageRequest);
++            Assert.assertTrue(recorder.seenLargeMessageRequest);
 +        }
 +    }
 +
-     /**
-      * returns true until index = threshold, then returns false.
-      */
-     private class UpdatablePredicate implements Predicate<InetAddressAndPort>
++    private static class Sink implements IMessageSink
 +    {
-         int index;
-         int threshold;
++        private final boolean markAliveInGossip;
++        private final boolean processConnectAck;
++        private final Map<InetAddressAndPort, ConnectionTypeRecorder> seenConnectionRequests;
 +
-         void reset(int threshold)
++        Sink(boolean markAliveInGossip, boolean processConnectAck)
 +        {
-             index = 0;
-             this.threshold = threshold;
++            this.markAliveInGossip = markAliveInGossip;
++            this.processConnectAck = processConnectAck;
++            seenConnectionRequests = new HashMap<>();
 +        }
 +
 +        @Override
-         public boolean test(InetAddressAndPort inetAddressAndPort)
++        public boolean allowOutgoingMessage(MessageOut message, int id, InetAddressAndPort to)
 +        {
-             index++;
-             return index <= threshold;
++            ConnectionTypeRecorder recorder = seenConnectionRequests.computeIfAbsent(to, inetAddress ->  new ConnectionTypeRecorder());
++            if (message.connectionType == SMALL_MESSAGE)
++            {
++                Assert.assertFalse(recorder.seenSmallMessageRequest);
++                recorder.seenSmallMessageRequest = true;
++            }
++            else
++            {
++                Assert.assertFalse(recorder.seenLargeMessageRequest);
++                recorder.seenLargeMessageRequest = true;
++            }
++
++            if (processConnectAck)
++            {
++                MessageIn msgIn = MessageIn.create(to, message.payload, Collections.emptyMap(), MessagingService.Verb.REQUEST_RESPONSE, 1);
++                MessagingService.instance().getRegisteredCallback(id).callback.response(msgIn);
++            }
++
++            if (markAliveInGossip)
++                Gossiper.instance.realMarkAlive(to, new EndpointState(new HeartBeatState(1, 1)));
++            return false;
 +        }
-     }
- 
-     private static Set<InetAddressAndPort> createNodes(int count) throws UnknownHostException
-     {
-         Set<InetAddressAndPort> nodes = new HashSet<>();
 +
-         if (count < 1)
-             Assert.fail("need at least *one* node in the set!");
- 
-         InetAddressAndPort node = InetAddressAndPort.getByName("127.0.0.1");
-         nodes.add(node);
-         for (int i = 1; i < count; i++)
++        @Override
++        public boolean allowIncomingMessage(MessageIn message, int id)
 +        {
-             node = InetAddressAndPort.getByAddress(InetAddresses.increment(node.address));
-             nodes.add(node);
++            return false;
 +        }
-         return nodes;
 +    }
 +
++    private static class ConnectionTypeRecorder
++    {
++        boolean seenSmallMessageRequest;
++        boolean seenLargeMessageRequest;
++    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[5/6] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by ja...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d4aacc8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d4aacc8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d4aacc8

Branch: refs/heads/cassandra-3.11
Commit: 0d4aacc832b1ffa428970d5c0e235d54585e22c2
Parents: ab8348c 06b3521
Author: Jason Brown <ja...@gmail.com>
Authored: Thu May 17 10:34:56 2018 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Thu May 17 10:35:36 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/net/MessagingService.java  | 14 ++++-
 .../org/apache/cassandra/net/PingMessage.java   | 60 ++++++++++++++++++++
 3 files changed, 72 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d4aacc8/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d70b381,1293bd4..87e7c24
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,20 -1,5 +1,21 @@@
 -3.0.17
 +3.11.3
 + * Fix New SASI view creation during Index Redistribution (CASSANDRA-14055)
 + * Remove string formatting lines from BufferPool hot path (CASSANDRA-14416)
 + * Update metrics to 3.1.5 (CASSANDRA-12924)
 + * Detect OpenJDK jvm type and architecture (CASSANDRA-12793)
 + * Don't use guava collections in the non-system keyspace jmx attributes (CASSANDRA-12271)
 + * Allow existing nodes to use all peers in shadow round (CASSANDRA-13851)
 + * Fix cqlsh to read connection.ssl cqlshrc option again (CASSANDRA-14299)
 + * Downgrade log level to trace for CommitLogSegmentManager (CASSANDRA-14370)
 + * CQL fromJson(null) throws NullPointerException (CASSANDRA-13891)
 + * Serialize empty buffer as empty string for json output format (CASSANDRA-14245)
 + * Allow logging implementation to be interchanged for embedded testing (CASSANDRA-13396)
 + * SASI tokenizer for simple delimiter based entries (CASSANDRA-14247)
 + * Fix Loss of digits when doing CAST from varint/bigint to decimal (CASSANDRA-14170)
 + * RateBasedBackPressure unnecessarily invokes a lock on the Guava RateLimiter (CASSANDRA-14163)
 + * Fix wildcard GROUP BY queries (CASSANDRA-14209)
 +Merged from 3.0:
+  * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
   * Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)
   * Cassandra not starting when using enhanced startup scripts in windows (CASSANDRA-14418)
   * Fix progress stats and units in compactionstats (CASSANDRA-12244)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d4aacc8/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index 027e218,047f51f..59ed8f3
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -206,36 -143,18 +206,41 @@@ public final class MessagingService imp
          _TRACE, // dummy verb so we can use MS.droppedMessagesMap
          ECHO,
          REPAIR_MESSAGE,
 -        PAXOS_PREPARE,
 -        PAXOS_PROPOSE,
 -        PAXOS_COMMIT,
 -        @Deprecated PAGED_RANGE,
 +        PAXOS_PREPARE
 +        {
 +            public long getTimeout()
 +            {
 +                return DatabaseDescriptor.getWriteRpcTimeout();
 +            }
 +        },
 +        PAXOS_PROPOSE
 +        {
 +            public long getTimeout()
 +            {
 +                return DatabaseDescriptor.getWriteRpcTimeout();
 +            }
 +        },
 +        PAXOS_COMMIT
 +        {
 +            public long getTimeout()
 +            {
 +                return DatabaseDescriptor.getWriteRpcTimeout();
 +            }
 +        },
 +        @Deprecated PAGED_RANGE
 +        {
 +            public long getTimeout()
 +            {
 +                return DatabaseDescriptor.getRangeRpcTimeout();
 +            }
 +        },
-         // remember to add new verbs at the end, since we serialize by ordinal
-         UNUSED_1,
+         PING,
 -
+         // UNUSED verbs were used as padding for backward/forward compatability before 4.0,
+         // but it wasn't quite as bullet/future proof as needed. We still need to keep these entries
+         // around, at least for a major rev or two (post-4.0). see CASSANDRA-13993 for a discussion.
+         // For now, though, the UNUSED are legacy values (placeholders, basically) that should only be used
+         // for correctly adding VERBs that need to be emergency additions to 3.0/3.11.
+         // We can reclaim them (their id's, to be correct) in future versions, if desired, though.
          UNUSED_2,
          UNUSED_3,
          UNUSED_4,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[2/6] cassandra git commit: Cleanup StartupClusterConnectivityChecker and PING Verb

Posted by ja...@apache.org.
Cleanup StartupClusterConnectivityChecker and PING Verb

patch by jasobrown; reviewed by Ariel Weisberg for CASSANDRA-14447


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06b3521a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06b3521a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06b3521a

Branch: refs/heads/cassandra-3.11
Commit: 06b3521acdb21dd3d85902d59146b9d08ad7d752
Parents: 6efa99c
Author: Jason Brown <ja...@gmail.com>
Authored: Fri Apr 6 11:47:35 2018 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Thu May 17 10:34:31 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/net/MessagingService.java  | 15 ++++-
 .../org/apache/cassandra/net/PingMessage.java   | 60 ++++++++++++++++++++
 3 files changed, 73 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/06b3521a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3da808a..1293bd4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.17
+ * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
  * Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)
  * Cassandra not starting when using enhanced startup scripts in windows (CASSANDRA-14418)
  * Fix progress stats and units in compactionstats (CASSANDRA-12244)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06b3521a/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index e0f77b7..047f51f 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -147,13 +147,20 @@ public final class MessagingService implements MessagingServiceMBean
         PAXOS_PROPOSE,
         PAXOS_COMMIT,
         @Deprecated PAGED_RANGE,
-        // remember to add new verbs at the end, since we serialize by ordinal
-        UNUSED_1,
+        PING,
+
+        // UNUSED verbs were used as padding for backward/forward compatability before 4.0,
+        // but it wasn't quite as bullet/future proof as needed. We still need to keep these entries
+        // around, at least for a major rev or two (post-4.0). see CASSANDRA-13993 for a discussion.
+        // For now, though, the UNUSED are legacy values (placeholders, basically) that should only be used
+        // for correctly adding VERBs that need to be emergency additions to 3.0/3.11.
+        // We can reclaim them (their id's, to be correct) in future versions, if desired, though.
         UNUSED_2,
         UNUSED_3,
         UNUSED_4,
         UNUSED_5,
         ;
+        // remember to add new verbs at the end, since we serialize by ordinal
 
         // This is to support a "late" choice of the verb based on the messaging service version.
         // See CASSANDRA-12249 for more details.
@@ -210,9 +217,10 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.SNAPSHOT, Stage.MISC);
         put(Verb.ECHO, Stage.GOSSIP);
 
-        put(Verb.UNUSED_1, Stage.INTERNAL_RESPONSE);
         put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE);
         put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE);
+
+        put(Verb.PING, Stage.READ);
     }};
 
     /**
@@ -251,6 +259,7 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.HINT, HintMessage.serializer);
         put(Verb.BATCH_STORE, Batch.serializer);
         put(Verb.BATCH_REMOVE, UUIDSerializer.serializer);
+        put(Verb.PING, PingMessage.serializer);
     }};
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06b3521a/src/java/org/apache/cassandra/net/PingMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/PingMessage.java b/src/java/org/apache/cassandra/net/PingMessage.java
new file mode 100644
index 0000000..8eaf23e
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/PingMessage.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * A backport of the version from 4.0, intentionnaly added as versions 4.0 or greater aren't guaranteed
+ * to know the c* versions they communicate with before they connect.
+ *
+ * It is intentional that no {@link IVerbHandler} is provided as we do not want process the message;
+ * the intent is to not break the stream by leaving it in an unclean state, with unconsumed bytes.
+ * We do, however, assign a {@link org.apache.cassandra.concurrent.StageManager} stage
+ * to maintain proper message flow.
+ * See CASSANDRA-13393 for a discussion.
+ */
+public class PingMessage
+{
+    public static IVersionedSerializer<PingMessage> serializer = new PingMessageSerializer();
+
+    public static class PingMessageSerializer implements IVersionedSerializer<PingMessage>
+    {
+        public void serialize(PingMessage t, DataOutputPlus out, int version)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public PingMessage deserialize(DataInputPlus in, int version) throws IOException
+        {
+            // throw away the one byte of the payload
+            in.readByte();
+            return new PingMessage();
+        }
+
+        public long serializedSize(PingMessage t, int version)
+        {
+            return 1;
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org