You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2018/05/17 17:40:12 UTC
[1/6] cassandra git commit: Cleanup StartupClusterConnectivityChecker
and PING Verb
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 6efa99c7a -> 06b3521ac
refs/heads/cassandra-3.11 ab8348c57 -> 0d4aacc83
refs/heads/trunk e210a05a5 -> d585410cc
Cleanup StartupClusterConnectivityChecker and PING Verb
patch by jasobrown; reviewed by Ariel Weisberg for CASSANDRA-14447
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06b3521a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06b3521a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06b3521a
Branch: refs/heads/cassandra-3.0
Commit: 06b3521acdb21dd3d85902d59146b9d08ad7d752
Parents: 6efa99c
Author: Jason Brown <ja...@gmail.com>
Authored: Fri Apr 6 11:47:35 2018 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Thu May 17 10:34:31 2018 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/net/MessagingService.java | 15 ++++-
.../org/apache/cassandra/net/PingMessage.java | 60 ++++++++++++++++++++
3 files changed, 73 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06b3521a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3da808a..1293bd4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.17
+ * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
* Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)
* Cassandra not starting when using enhanced startup scripts in windows (CASSANDRA-14418)
* Fix progress stats and units in compactionstats (CASSANDRA-12244)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06b3521a/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index e0f77b7..047f51f 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -147,13 +147,20 @@ public final class MessagingService implements MessagingServiceMBean
PAXOS_PROPOSE,
PAXOS_COMMIT,
@Deprecated PAGED_RANGE,
- // remember to add new verbs at the end, since we serialize by ordinal
- UNUSED_1,
+ PING,
+
+ // UNUSED verbs were used as padding for backward/forward compatability before 4.0,
+ // but it wasn't quite as bullet/future proof as needed. We still need to keep these entries
+ // around, at least for a major rev or two (post-4.0). see CASSANDRA-13993 for a discussion.
+ // For now, though, the UNUSED are legacy values (placeholders, basically) that should only be used
+ // for correctly adding VERBs that need to be emergency additions to 3.0/3.11.
+ // We can reclaim them (their id's, to be correct) in future versions, if desired, though.
UNUSED_2,
UNUSED_3,
UNUSED_4,
UNUSED_5,
;
+ // remember to add new verbs at the end, since we serialize by ordinal
// This is to support a "late" choice of the verb based on the messaging service version.
// See CASSANDRA-12249 for more details.
@@ -210,9 +217,10 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.SNAPSHOT, Stage.MISC);
put(Verb.ECHO, Stage.GOSSIP);
- put(Verb.UNUSED_1, Stage.INTERNAL_RESPONSE);
put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE);
put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE);
+
+ put(Verb.PING, Stage.READ);
}};
/**
@@ -251,6 +259,7 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.HINT, HintMessage.serializer);
put(Verb.BATCH_STORE, Batch.serializer);
put(Verb.BATCH_REMOVE, UUIDSerializer.serializer);
+ put(Verb.PING, PingMessage.serializer);
}};
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06b3521a/src/java/org/apache/cassandra/net/PingMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/PingMessage.java b/src/java/org/apache/cassandra/net/PingMessage.java
new file mode 100644
index 0000000..8eaf23e
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/PingMessage.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * A backport of the version from 4.0, intentionnaly added as versions 4.0 or greater aren't guaranteed
+ * to know the c* versions they communicate with before they connect.
+ *
+ * It is intentional that no {@link IVerbHandler} is provided as we do not want process the message;
+ * the intent is to not break the stream by leaving it in an unclean state, with unconsumed bytes.
+ * We do, however, assign a {@link org.apache.cassandra.concurrent.StageManager} stage
+ * to maintain proper message flow.
+ * See CASSANDRA-13393 for a discussion.
+ */
+public class PingMessage
+{
+ public static IVersionedSerializer<PingMessage> serializer = new PingMessageSerializer();
+
+ public static class PingMessageSerializer implements IVersionedSerializer<PingMessage>
+ {
+ public void serialize(PingMessage t, DataOutputPlus out, int version)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public PingMessage deserialize(DataInputPlus in, int version) throws IOException
+ {
+ // throw away the one byte of the payload
+ in.readByte();
+ return new PingMessage();
+ }
+
+ public long serializedSize(PingMessage t, int version)
+ {
+ return 1;
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[4/6] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by ja...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d4aacc8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d4aacc8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d4aacc8
Branch: refs/heads/trunk
Commit: 0d4aacc832b1ffa428970d5c0e235d54585e22c2
Parents: ab8348c 06b3521
Author: Jason Brown <ja...@gmail.com>
Authored: Thu May 17 10:34:56 2018 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Thu May 17 10:35:36 2018 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/net/MessagingService.java | 14 ++++-
.../org/apache/cassandra/net/PingMessage.java | 60 ++++++++++++++++++++
3 files changed, 72 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d4aacc8/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d70b381,1293bd4..87e7c24
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,20 -1,5 +1,21 @@@
-3.0.17
+3.11.3
+ * Fix New SASI view creation during Index Redistribution (CASSANDRA-14055)
+ * Remove string formatting lines from BufferPool hot path (CASSANDRA-14416)
+ * Update metrics to 3.1.5 (CASSANDRA-12924)
+ * Detect OpenJDK jvm type and architecture (CASSANDRA-12793)
+ * Don't use guava collections in the non-system keyspace jmx attributes (CASSANDRA-12271)
+ * Allow existing nodes to use all peers in shadow round (CASSANDRA-13851)
+ * Fix cqlsh to read connection.ssl cqlshrc option again (CASSANDRA-14299)
+ * Downgrade log level to trace for CommitLogSegmentManager (CASSANDRA-14370)
+ * CQL fromJson(null) throws NullPointerException (CASSANDRA-13891)
+ * Serialize empty buffer as empty string for json output format (CASSANDRA-14245)
+ * Allow logging implementation to be interchanged for embedded testing (CASSANDRA-13396)
+ * SASI tokenizer for simple delimiter based entries (CASSANDRA-14247)
+ * Fix Loss of digits when doing CAST from varint/bigint to decimal (CASSANDRA-14170)
+ * RateBasedBackPressure unnecessarily invokes a lock on the Guava RateLimiter (CASSANDRA-14163)
+ * Fix wildcard GROUP BY queries (CASSANDRA-14209)
+Merged from 3.0:
+ * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
* Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)
* Cassandra not starting when using enhanced startup scripts in windows (CASSANDRA-14418)
* Fix progress stats and units in compactionstats (CASSANDRA-12244)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d4aacc8/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index 027e218,047f51f..59ed8f3
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -206,36 -143,18 +206,41 @@@ public final class MessagingService imp
_TRACE, // dummy verb so we can use MS.droppedMessagesMap
ECHO,
REPAIR_MESSAGE,
- PAXOS_PREPARE,
- PAXOS_PROPOSE,
- PAXOS_COMMIT,
- @Deprecated PAGED_RANGE,
+ PAXOS_PREPARE
+ {
+ public long getTimeout()
+ {
+ return DatabaseDescriptor.getWriteRpcTimeout();
+ }
+ },
+ PAXOS_PROPOSE
+ {
+ public long getTimeout()
+ {
+ return DatabaseDescriptor.getWriteRpcTimeout();
+ }
+ },
+ PAXOS_COMMIT
+ {
+ public long getTimeout()
+ {
+ return DatabaseDescriptor.getWriteRpcTimeout();
+ }
+ },
+ @Deprecated PAGED_RANGE
+ {
+ public long getTimeout()
+ {
+ return DatabaseDescriptor.getRangeRpcTimeout();
+ }
+ },
- // remember to add new verbs at the end, since we serialize by ordinal
- UNUSED_1,
+ PING,
-
+ // UNUSED verbs were used as padding for backward/forward compatability before 4.0,
+ // but it wasn't quite as bullet/future proof as needed. We still need to keep these entries
+ // around, at least for a major rev or two (post-4.0). see CASSANDRA-13993 for a discussion.
+ // For now, though, the UNUSED are legacy values (placeholders, basically) that should only be used
+ // for correctly adding VERBs that need to be emergency additions to 3.0/3.11.
+ // We can reclaim them (their id's, to be correct) in future versions, if desired, though.
UNUSED_2,
UNUSED_3,
UNUSED_4,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[3/6] cassandra git commit: Cleanup StartupClusterConnectivityChecker
and PING Verb
Posted by ja...@apache.org.
Cleanup StartupClusterConnectivityChecker and PING Verb
patch by jasobrown; reviewed by Ariel Weisberg for CASSANDRA-14447
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06b3521a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06b3521a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06b3521a
Branch: refs/heads/trunk
Commit: 06b3521acdb21dd3d85902d59146b9d08ad7d752
Parents: 6efa99c
Author: Jason Brown <ja...@gmail.com>
Authored: Fri Apr 6 11:47:35 2018 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Thu May 17 10:34:31 2018 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/net/MessagingService.java | 15 ++++-
.../org/apache/cassandra/net/PingMessage.java | 60 ++++++++++++++++++++
3 files changed, 73 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06b3521a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3da808a..1293bd4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.17
+ * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
* Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)
* Cassandra not starting when using enhanced startup scripts in windows (CASSANDRA-14418)
* Fix progress stats and units in compactionstats (CASSANDRA-12244)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06b3521a/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index e0f77b7..047f51f 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -147,13 +147,20 @@ public final class MessagingService implements MessagingServiceMBean
PAXOS_PROPOSE,
PAXOS_COMMIT,
@Deprecated PAGED_RANGE,
- // remember to add new verbs at the end, since we serialize by ordinal
- UNUSED_1,
+ PING,
+
+ // UNUSED verbs were used as padding for backward/forward compatability before 4.0,
+ // but it wasn't quite as bullet/future proof as needed. We still need to keep these entries
+ // around, at least for a major rev or two (post-4.0). see CASSANDRA-13993 for a discussion.
+ // For now, though, the UNUSED are legacy values (placeholders, basically) that should only be used
+ // for correctly adding VERBs that need to be emergency additions to 3.0/3.11.
+ // We can reclaim them (their id's, to be correct) in future versions, if desired, though.
UNUSED_2,
UNUSED_3,
UNUSED_4,
UNUSED_5,
;
+ // remember to add new verbs at the end, since we serialize by ordinal
// This is to support a "late" choice of the verb based on the messaging service version.
// See CASSANDRA-12249 for more details.
@@ -210,9 +217,10 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.SNAPSHOT, Stage.MISC);
put(Verb.ECHO, Stage.GOSSIP);
- put(Verb.UNUSED_1, Stage.INTERNAL_RESPONSE);
put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE);
put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE);
+
+ put(Verb.PING, Stage.READ);
}};
/**
@@ -251,6 +259,7 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.HINT, HintMessage.serializer);
put(Verb.BATCH_STORE, Batch.serializer);
put(Verb.BATCH_REMOVE, UUIDSerializer.serializer);
+ put(Verb.PING, PingMessage.serializer);
}};
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06b3521a/src/java/org/apache/cassandra/net/PingMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/PingMessage.java b/src/java/org/apache/cassandra/net/PingMessage.java
new file mode 100644
index 0000000..8eaf23e
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/PingMessage.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * A backport of the version from 4.0, intentionnaly added as versions 4.0 or greater aren't guaranteed
+ * to know the c* versions they communicate with before they connect.
+ *
+ * It is intentional that no {@link IVerbHandler} is provided as we do not want process the message;
+ * the intent is to not break the stream by leaving it in an unclean state, with unconsumed bytes.
+ * We do, however, assign a {@link org.apache.cassandra.concurrent.StageManager} stage
+ * to maintain proper message flow.
+ * See CASSANDRA-13393 for a discussion.
+ */
+public class PingMessage
+{
+ public static IVersionedSerializer<PingMessage> serializer = new PingMessageSerializer();
+
+ public static class PingMessageSerializer implements IVersionedSerializer<PingMessage>
+ {
+ public void serialize(PingMessage t, DataOutputPlus out, int version)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public PingMessage deserialize(DataInputPlus in, int version) throws IOException
+ {
+ // throw away the one byte of the payload
+ in.readByte();
+ return new PingMessage();
+ }
+
+ public long serializedSize(PingMessage t, int version)
+ {
+ return 1;
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[6/6] cassandra git commit: Merge branch 'cassandra-3.11' into trunk
Posted by ja...@apache.org.
Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d585410c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d585410c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d585410c
Branch: refs/heads/trunk
Commit: d585410ccb42011ca71441471d1e2949e5ddedb5
Parents: e210a05 0d4aacc
Author: Jason Brown <ja...@gmail.com>
Authored: Thu May 17 10:35:51 2018 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Thu May 17 10:39:37 2018 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/config/DatabaseDescriptor.java | 6 +
.../org/apache/cassandra/gms/EndpointState.java | 2 +-
src/java/org/apache/cassandra/gms/Gossiper.java | 15 +-
.../apache/cassandra/gms/HeartBeatState.java | 4 +-
.../cassandra/net/MessageDeliveryTask.java | 25 +-
.../org/apache/cassandra/net/MessageIn.java | 6 +
.../apache/cassandra/net/MessagingService.java | 36 ++-
.../net/StartupClusterConnectivityChecker.java | 231 +++++++++++--------
.../cassandra/service/CassandraDaemon.java | 11 +-
.../cassandra/net/MessageDeliveryTaskTest.java | 121 ++++++++++
.../org/apache/cassandra/net/MessageInTest.java | 66 ++++++
.../StartupClusterConnectivityCheckerTest.java | 160 +++++++------
13 files changed, 497 insertions(+), 187 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index cfe56c6,87e7c24..097db1d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -256,6 -15,8 +256,7 @@@
* RateBasedBackPressure unnecessarily invokes a lock on the Guava RateLimiter (CASSANDRA-14163)
* Fix wildcard GROUP BY queries (CASSANDRA-14209)
Merged from 3.0:
+ * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
- * Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)
* Cassandra not starting when using enhanced startup scripts in windows (CASSANDRA-14418)
* Fix progress stats and units in compactionstats (CASSANDRA-12244)
* Better handle missing partition columns in system_schema.columns (CASSANDRA-14379)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 699148d,8b92c5a..592b96e
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -26,9 -26,8 +26,10 @@@ import java.nio.file.NoSuchFileExceptio
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
++import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
@@@ -1487,6 -1491,6 +1488,11 @@@ public class DatabaseDescripto
getTruncateRpcTimeout());
}
++ public static long getPingTimeout()
++ {
++ return TimeUnit.SECONDS.toMillis(getBlockForPeersTimeoutInSeconds());
++ }
++
public static double getPhiConvictThreshold()
{
return conf.phi_convict_threshold;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/gms/EndpointState.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/EndpointState.java
index 1085447,674b597..5646bf6
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@@ -50,7 -54,7 +50,7 @@@ public class EndpointStat
private volatile long updateTimestamp;
private volatile boolean isAlive;
-- EndpointState(HeartBeatState initialHbState)
++ public EndpointState(HeartBeatState initialHbState)
{
this(initialHbState, new EnumMap<ApplicationState, VersionedValue>(ApplicationState.class));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index 4c66669,ea05525..3975187
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -32,13 -31,7 +32,13 @@@ import javax.management.ObjectName
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
++import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Uninterruptibles;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
- import org.apache.cassandra.locator.SeedProvider;
+import org.apache.cassandra.utils.CassandraVersion;
+import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -878,12 -854,12 +878,17 @@@ public class Gossiper implements IFailu
return endpointStateMap.get(ep);
}
- public Set<Entry<InetAddressAndPort, EndpointState>> getEndpointStates()
- public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
++ public ImmutableSet<InetAddressAndPort> getEndpoints()
+ {
- return endpointStateMap.entrySet();
++ return ImmutableSet.copyOf(endpointStateMap.keySet());
++ }
++
++ public int getEndpointCount()
+ {
- return endpointStateMap.entrySet();
++ return endpointStateMap.size();
}
- public UUID getHostId(InetAddress endpoint)
+ public UUID getHostId(InetAddressAndPort endpoint)
{
return getHostId(endpoint, endpointStateMap);
}
@@@ -1798,11 -1693,11 +1803,11 @@@
Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_MIN_WAIT_MS, TimeUnit.MILLISECONDS);
int totalPolls = 0;
int numOkay = 0;
-- int epSize = Gossiper.instance.getEndpointStates().size();
++ int epSize = Gossiper.instance.getEndpointCount();
while (numOkay < GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED)
{
Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
-- int currentSize = Gossiper.instance.getEndpointStates().size();
++ int currentSize = Gossiper.instance.getEndpointCount();
totalPolls++;
if (currentSize == epSize)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/gms/HeartBeatState.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/HeartBeatState.java
index 13e1ace,13e1ace..2abd5d7
--- a/src/java/org/apache/cassandra/gms/HeartBeatState.java
+++ b/src/java/org/apache/cassandra/gms/HeartBeatState.java
@@@ -27,7 -27,7 +27,7 @@@ import org.apache.cassandra.io.util.Dat
/**
* HeartBeat State associated with any given endpoint.
*/
--class HeartBeatState
++public class HeartBeatState
{
public static final IVersionedSerializer<HeartBeatState> serializer = new HeartBeatStateSerializer();
@@@ -39,7 -39,7 +39,7 @@@
this(gen, 0);
}
-- HeartBeatState(int gen, int ver)
++ public HeartBeatState(int gen, int ver)
{
generation = gen;
version = ver;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index 6e132a8,c91e9da..1b9090c
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@@ -20,7 -20,6 +20,8 @@@ package org.apache.cassandra.net
import java.io.IOException;
import java.util.EnumSet;
++import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Shorts;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -50,23 -45,20 +51,40 @@@ public class MessageDeliveryTask implem
public void run()
{
++ process();
++ }
++
++ /**
++ * A helper function for making unit testing reasonable.
++ *
++ * @return true if the message was processed; else false.
++ */
++ @VisibleForTesting
++ boolean process()
++ {
MessagingService.Verb verb = message.verb;
++ if (verb == null)
++ {
++ logger.trace("Unknown verb {}", verb);
++ return false;
++ }
++
+ MessagingService.instance().metrics.addQueueWaitTime(verb.toString(),
+ ApproximateTime.currentTimeMillis() - enqueueTime);
+
long timeTaken = message.getLifetimeInMS();
if (MessagingService.DROPPABLE_VERBS.contains(verb)
&& timeTaken > message.getTimeout())
{
MessagingService.instance().incrementDroppedMessages(message, timeTaken);
-- return;
++ return false;
}
IVerbHandler verbHandler = MessagingService.instance().getVerbHandler(verb);
if (verbHandler == null)
{
-- logger.trace("Unknown verb {}", verb);
-- return;
++ logger.trace("No handler for verb {}", verb);
++ return false;
}
try
@@@ -91,6 -83,6 +109,7 @@@
if (GOSSIP_VERBS.contains(message.verb))
Gossiper.instance.setLastProcessedMessageAt(message.constructionTime);
++ return true;
}
private void handleFailure(Throwable t)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessageIn.java
index 7fb866f,d06d515..1cd39f3
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@@ -146,8 -117,8 +146,14 @@@ public class MessageIn<T
}
serializer = (IVersionedSerializer<T2>) callback.serializer;
}
++
if (payloadSize == 0 || serializer == null)
++ {
++ // if there's no deserializer for the verb, skip the payload bytes to leave
++ // the stream in a clean state (for the next message)
++ in.skipBytesFully(payloadSize);
return create(from, null, parameters, verb, version, constructionTime);
++ }
T2 payload = serializer.deserialize(in, version);
return MessageIn.create(from, payload, parameters, verb, version, constructionTime);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index a590723,59ed8f3..f5051fb
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -254,57 -234,37 +254,67 @@@ public final class MessagingService imp
return DatabaseDescriptor.getRangeRpcTimeout();
}
},
- PING(),
- PING,
++ PING
++ {
++ public long getTimeout()
++ {
++ return DatabaseDescriptor.getPingTimeout();
++ }
++ },
+
- // add new verbs after the existing verbs, but *before* the UNUSED verbs, since we serialize by ordinal.
- // UNUSED verbs serve as padding for backwards compatability where a previous version needs to validate a verb from the future.
- UNUSED_1,
+ // UNUSED verbs were used as padding for backward/forward compatability before 4.0,
+ // but it wasn't quite as bullet/future proof as needed. We still need to keep these entries
+ // around, at least for a major rev or two (post-4.0). see CASSANDRA-13993 for a discussion.
+ // For now, though, the UNUSED are legacy values (placeholders, basically) that should only be used
+ // for correctly adding VERBs that need to be emergency additions to 3.0/3.11.
- // We can reclaim them (their id's, to be correct) in future versions, if desired, though.
++ // We can reclaim them (their id's, to be correct) in future versions, if desireed, though.
UNUSED_2,
UNUSED_3,
UNUSED_4,
UNUSED_5,
;
- // remember to add new verbs at the end, since we serialize by ordinal
++ // add new verbs after the existing verbs, since we serialize by ordinal.
- // This is to support a "late" choice of the verb based on the messaging service version.
- // See CASSANDRA-12249 for more details.
- public static Verb convertForMessagingServiceVersion(Verb verb, int version)
+ private final int id;
+ Verb()
{
- if (verb == PAGED_RANGE && version >= VERSION_30)
- return RANGE_SLICE;
+ id = ordinal();
+ }
- return verb;
+ /**
+ * Unused, but it is an extension point for adding custom verbs
+ * @param id
+ */
+ Verb(int id)
+ {
+ this.id = id;
}
public long getTimeout()
{
return DatabaseDescriptor.getRpcTimeout();
}
- }
- public static final Verb[] verbValues = Verb.values();
+ public int getId()
+ {
+ return id;
+ }
+ private static final IntObjectMap<Verb> idToVerbMap = new IntObjectOpenHashMap<>(values().length);
+ static
+ {
+ for (Verb v : values())
+ {
- if (idToVerbMap.containsKey(v.getId()))
- throw new IllegalArgumentException("cannot have two verbs that map to the same id: " + v + " and " + v.getId());
- idToVerbMap.put(v.getId(), v);
++ Verb existing = idToVerbMap.put(v.getId(), v);
++ if (existing != null)
++ throw new IllegalArgumentException("cannot have two verbs that map to the same id: " + v + " and " + existing);
+ }
+ }
+
+ public static Verb fromId(int id)
+ {
+ return idToVerbMap.get(id);
+ }
+ }
public static final EnumMap<MessagingService.Verb, Stage> verbStages = new EnumMap<MessagingService.Verb, Stage>(MessagingService.Verb.class)
{{
@@@ -350,9 -310,8 +360,10 @@@
put(Verb.SNAPSHOT, Stage.MISC);
put(Verb.ECHO, Stage.GOSSIP);
- put(Verb.UNUSED_1, Stage.INTERNAL_RESPONSE);
put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE);
put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE);
++ put(Verb.UNUSED_4, Stage.INTERNAL_RESPONSE);
++ put(Verb.UNUSED_5, Stage.INTERNAL_RESPONSE);
put(Verb.PING, Stage.READ);
}};
@@@ -929,6 -827,6 +939,14 @@@
}
/**
++ * SHOULD ONLY BE USED FOR TESTING!!
++ */
++ public void removeVerbHandler(Verb verb)
++ {
++ verbHandlers.remove(verb);
++ }
++
++ /**
* This method returns the verb handler associated with the registered
* verb. If no handler has been registered then null is returned.
*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
index f22ab48,0000000..db04ca3
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
+++ b/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
@@@ -1,171 -1,0 +1,216 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
-
+package org.apache.cassandra.net;
+
++import java.util.HashSet;
++import java.util.Map;
+import java.util.Set;
++import java.util.concurrent.ConcurrentHashMap;
++import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
- import java.util.function.Predicate;
- import java.util.stream.Collectors;
+
++import com.google.common.annotations.VisibleForTesting;
++import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
++import org.apache.cassandra.gms.ApplicationState;
++import org.apache.cassandra.gms.EndpointState;
++import org.apache.cassandra.gms.Gossiper;
++import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
++import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.locator.InetAddressAndPort;
- import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
++import org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.net.MessagingService.Verb.PING;
++import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE;
++import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
+
+public class StartupClusterConnectivityChecker
+{
+ private static final Logger logger = LoggerFactory.getLogger(StartupClusterConnectivityChecker.class);
+
- enum State { CONTINUE, FINISH_SUCCESS, FINISH_TIMEOUT }
-
+ private final int targetPercent;
- private final int timeoutSecs;
- private final Predicate<InetAddressAndPort> gossipStatus;
++ private final long timeoutNanos;
+
- public StartupClusterConnectivityChecker(int targetPercent, int timeoutSecs, Predicate<InetAddressAndPort> gossipStatus)
++ public static StartupClusterConnectivityChecker create(int targetPercent, int timeoutSecs)
+ {
- if (targetPercent < 0)
- {
- targetPercent = 0;
- }
- else if (targetPercent > 100)
- {
- targetPercent = 100;
- }
- this.targetPercent = targetPercent;
-
- if (timeoutSecs < 0)
- {
- timeoutSecs = 1;
- }
- else if (timeoutSecs > 100)
- {
++ timeoutSecs = Math.max(1, timeoutSecs);
++ if (timeoutSecs > 100)
+ logger.warn("setting the block-for-peers timeout (in seconds) to {} might be a bit excessive, but using it nonetheless", timeoutSecs);
- }
- this.timeoutSecs = timeoutSecs;
++ long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSecs);
+
- this.gossipStatus = gossipStatus;
++ return new StartupClusterConnectivityChecker(targetPercent, timeoutNanos);
+ }
+
- public void execute(Set<InetAddressAndPort> peers)
++ @VisibleForTesting
++ StartupClusterConnectivityChecker(int targetPercent, long timeoutNanos)
+ {
- if (peers == null || targetPercent == 0)
- return;
++ this.targetPercent = Math.min(100, Math.max(0, targetPercent));
++ this.timeoutNanos = timeoutNanos;
++ }
+
- // remove current node from the set
- peers = peers.stream()
- .filter(peer -> !peer.equals(FBUtilities.getBroadcastAddressAndPort()))
- .collect(Collectors.toSet());
++ /**
++ * @param peers The currently known peers in the cluster; argument is not modified.
++ * @return true if the requested percentage of peers are marked ALIVE in gossip and have their connections opened;
++ * else false.
++ */
++ public boolean execute(Set<InetAddressAndPort> peers)
++ {
++ if (targetPercent == 0 || peers == null)
++ return true;
+
- // don't block if there's no other nodes in the cluster (or we don't know about them)
- if (peers.size() <= 0)
- return;
++ // make a copy of the set, to avoid mucking with the input (in case it's a sensitive collection)
++ peers = new HashSet<>(peers);
++ peers.remove(FBUtilities.getBroadcastAddressAndPort());
+
- logger.info("choosing to block until {}% of peers are marked alive and connections are established; max time to wait = {} seconds",
- targetPercent, timeoutSecs);
++ if (peers.isEmpty())
++ return true;
+
- // first, send out a ping message to open up the non-gossip connections
- final AtomicInteger connectedCount = sendPingMessages(peers);
++ logger.info("choosing to block until {}% of the {} known peers are marked alive and connections are established; max time to wait = {} seconds",
++ targetPercent, peers.size(), TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
+
- final long startNanos = System.nanoTime();
- final long expirationNanos = startNanos + TimeUnit.SECONDS.toNanos(timeoutSecs);
- int completedRounds = 0;
- while (checkStatus(peers, connectedCount, startNanos, expirationNanos < System.nanoTime(), completedRounds) == State.CONTINUE)
- {
- completedRounds++;
- Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MICROSECONDS);
- }
- }
++ long startNanos = System.nanoTime();
+
- State checkStatus(Set<InetAddressAndPort> peers, AtomicInteger connectedCount, final long startNanos, boolean beyondExpiration, final int completedRounds)
- {
- long currentAlive = peers.stream().filter(gossipStatus).count();
- float currentAlivePercent = ((float) currentAlive / (float) peers.size()) * 100;
++ AckMap acks = new AckMap(3);
++ int target = (int) ((targetPercent / 100.0) * peers.size());
++ CountDownLatch latch = new CountDownLatch(target);
+
- // assume two connections to remote host that we care to track here (small msg & large msg)
- final int totalConnectionsSize = peers.size() * 2;
- final int connectionsCount = connectedCount.get();
- float currentConnectedPercent = ((float) connectionsCount / (float) totalConnectionsSize) * 100;
++ // set up a listener to react to new nodes becoming alive (in gossip), and account for all the nodes that are already alive
++ Set<InetAddressAndPort> alivePeers = Sets.newSetFromMap(new ConcurrentHashMap<>());
++ AliveListener listener = new AliveListener(alivePeers, latch, acks);
++ Gossiper.instance.register(listener);
+
- if (currentAlivePercent >= targetPercent && currentConnectedPercent >= targetPercent)
- {
- logger.info("after {} milliseconds, found {}% ({} / {}) of peers as marked alive, " +
- "and {}% ({} / {}) of peers as connected, " +
- "both of which are above the desired threshold of {}%",
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos),
- currentAlivePercent, currentAlive, peers.size(),
- currentConnectedPercent, connectionsCount, totalConnectionsSize,
- targetPercent);
- return State.FINISH_SUCCESS;
- }
++ // send out a ping message to open up the non-gossip connections
++ sendPingMessages(peers, latch, acks);
+
- // perform at least two rounds of checking, else this is kinda useless (and the operator set the aliveTimeoutSecs too low)
- if (completedRounds >= 2 && beyondExpiration)
- {
- logger.info("after {} milliseconds, found {}% ({} / {}) of peers as marked alive, " +
- "and {}% ({} / {}) of peers as connected, " +
- "one or both of which is below the desired threshold of {}%",
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos),
- currentAlivePercent, currentAlive, peers.size(),
- currentConnectedPercent, connectionsCount, totalConnectionsSize,
- targetPercent);
- return State.FINISH_TIMEOUT;
- }
- return State.CONTINUE;
++ for (InetAddressAndPort peer : peers)
++ if (Gossiper.instance.isAlive(peer) && alivePeers.add(peer) && acks.incrementAndCheck(peer))
++ latch.countDown();
++
++ boolean succeeded = Uninterruptibles.awaitUninterruptibly(latch, timeoutNanos, TimeUnit.NANOSECONDS);
++ Gossiper.instance.unregister(listener);
++
++ int connected = peers.size() - (int) latch.getCount();
++ logger.info("After waiting/processing for {} milliseconds, {} out of {} peers ({}%) have been marked alive and had connections established",
++ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos),
++ connected,
++ peers.size(),
++ connected / (peers.size()) * 100.0);
++ return succeeded;
+ }
+
+ /**
- * Sends a "connection warmup" message to each peer in the collection, on every {@link OutboundConnectionIdentifier.ConnectionType}
- * used for internode messaging.
++ * Sends a "connection warmup" message to each peer in the collection, on every {@link ConnectionType}
++ * used for internode messaging (that is not gossip).
+ */
- private AtomicInteger sendPingMessages(Set<InetAddressAndPort> peers)
++ private void sendPingMessages(Set<InetAddressAndPort> peers, CountDownLatch latch, AckMap acks)
+ {
- AtomicInteger connectedCount = new AtomicInteger(0);
+ IAsyncCallback responseHandler = new IAsyncCallback()
+ {
- @Override
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
+
- @Override
+ public void response(MessageIn msg)
+ {
- connectedCount.incrementAndGet();
++ if (acks.incrementAndCheck(msg.from))
++ latch.countDown();
+ }
+ };
+
- MessageOut<PingMessage> smallChannelMessageOut = new MessageOut<>(PING, PingMessage.smallChannelMessage, PingMessage.serializer);
- MessageOut<PingMessage> largeChannelMessageOut = new MessageOut<>(PING, PingMessage.largeChannelMessage, PingMessage.serializer);
++ MessageOut<PingMessage> smallChannelMessageOut = new MessageOut<>(PING, PingMessage.smallChannelMessage,
++ PingMessage.serializer, SMALL_MESSAGE);
++ MessageOut<PingMessage> largeChannelMessageOut = new MessageOut<>(PING, PingMessage.largeChannelMessage,
++ PingMessage.serializer, LARGE_MESSAGE);
+ for (InetAddressAndPort peer : peers)
+ {
+ MessagingService.instance().sendRR(smallChannelMessageOut, peer, responseHandler);
+ MessagingService.instance().sendRR(largeChannelMessageOut, peer, responseHandler);
+ }
++ }
+
- return connectedCount;
++ /**
++ * A trivial implementation of {@link IEndpointStateChangeSubscriber} that really only cares about
++ * {@link #onAlive(InetAddressAndPort, EndpointState)} invocations.
++ */
++ private static final class AliveListener implements IEndpointStateChangeSubscriber
++ {
++ private final CountDownLatch latch;
++ private final Set<InetAddressAndPort> livePeers;
++ private final AckMap acks;
++
++ AliveListener(Set<InetAddressAndPort> livePeers, CountDownLatch latch, AckMap acks)
++ {
++ this.latch = latch;
++ this.livePeers = livePeers;
++ this.acks = acks;
++ }
++
++ public void onJoin(InetAddressAndPort endpoint, EndpointState epState)
++ {
++ }
++
++ public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue)
++ {
++ }
++
++ public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value)
++ {
++ }
++
++ public void onAlive(InetAddressAndPort endpoint, EndpointState state)
++ {
++ if (livePeers.add(endpoint) && acks.incrementAndCheck(endpoint))
++ latch.countDown();
++ }
++
++ public void onDead(InetAddressAndPort endpoint, EndpointState state)
++ {
++ }
++
++ public void onRemove(InetAddressAndPort endpoint)
++ {
++ }
++
++ public void onRestart(InetAddressAndPort endpoint, EndpointState state)
++ {
++ }
++ }
++
++ private static final class AckMap
++ {
++ private final int threshold;
++ private final Map<InetAddressAndPort, AtomicInteger> acks;
++
++ AckMap(int threshold)
++ {
++ this.threshold = threshold;
++ acks = new ConcurrentHashMap<>();
++ }
++
++ boolean incrementAndCheck(InetAddressAndPort address)
++ {
++ return acks.computeIfAbsent(address, addr -> new AtomicInteger(0)).incrementAndGet() == threshold;
++ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java
index 80b8b7b,d9bd5c3..6e0b92b
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@@ -503,12 -513,6 +500,10 @@@ public class CassandraDaemo
*/
public void start()
{
- StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(DatabaseDescriptor.getBlockForPeersPercentage(),
- DatabaseDescriptor.getBlockForPeersTimeoutInSeconds(),
- Gossiper.instance::isAlive);
- Set<InetAddressAndPort> peers = Gossiper.instance.getEndpointStates().stream().map(Map.Entry::getKey).collect(Collectors.toSet());
- connectivityChecker.execute(peers);
++ StartupClusterConnectivityChecker connectivityChecker = StartupClusterConnectivityChecker.create(DatabaseDescriptor.getBlockForPeersPercentage(),
++ DatabaseDescriptor.getBlockForPeersTimeoutInSeconds());
++ connectivityChecker.execute(Gossiper.instance.getEndpoints());
+
String nativeFlag = System.getProperty("cassandra.start_native_transport");
if ((nativeFlag != null && Boolean.parseBoolean(nativeFlag)) || (nativeFlag == null && DatabaseDescriptor.startNativeTransport()))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/test/unit/org/apache/cassandra/net/MessageDeliveryTaskTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/net/MessageDeliveryTaskTest.java
index 0000000,0000000..db38efb
new file mode 100644
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/MessageDeliveryTaskTest.java
@@@ -1,0 -1,0 +1,121 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.cassandra.net;
++
++import java.net.UnknownHostException;
++import java.util.Collections;
++
++import org.junit.AfterClass;
++import org.junit.Assert;
++import org.junit.Before;
++import org.junit.BeforeClass;
++import org.junit.Test;
++
++import org.apache.cassandra.config.DatabaseDescriptor;
++import org.apache.cassandra.locator.InetAddressAndPort;
++
++public class MessageDeliveryTaskTest
++{
++ private static final MockVerbHandler VERB_HANDLER = new MockVerbHandler();
++
++ @BeforeClass
++ public static void before()
++ {
++ DatabaseDescriptor.daemonInitialization();
++ MessagingService.instance().registerVerbHandlers(MessagingService.Verb.UNUSED_2, VERB_HANDLER);
++ }
++
++ @AfterClass
++ public static void after()
++ {
++ MessagingService.instance().removeVerbHandler(MessagingService.Verb.UNUSED_2);
++ }
++
++ @Before
++ public void setUp()
++ {
++ VERB_HANDLER.reset();
++ }
++
++ @Test
++ public void process_HappyPath() throws UnknownHostException
++ {
++ InetAddressAndPort addr = InetAddressAndPort.getByName("127.0.0.1");
++ MessageIn msg = MessageIn.create(addr, null, Collections.emptyMap(), MessagingService.Verb.UNUSED_2, 1);
++ MessageDeliveryTask task = new MessageDeliveryTask(msg, 42);
++ Assert.assertTrue(task.process());
++ Assert.assertEquals(1, VERB_HANDLER.invocationCount);
++ }
++
++ @Test
++ public void process_NullVerb() throws UnknownHostException
++ {
++ InetAddressAndPort addr = InetAddressAndPort.getByName("127.0.0.1");
++ MessageIn msg = MessageIn.create(addr, null, Collections.emptyMap(), null, 1);
++ MessageDeliveryTask task = new MessageDeliveryTask(msg, 42);
++ Assert.assertFalse(task.process());
++ }
++
++ @Test
++ public void process_NoHandler() throws UnknownHostException
++ {
++ InetAddressAndPort addr = InetAddressAndPort.getByName("127.0.0.1");
++ MessageIn msg = MessageIn.create(addr, null, Collections.emptyMap(), MessagingService.Verb.UNUSED_5, 1);
++ MessageDeliveryTask task = new MessageDeliveryTask(msg, 42);
++ Assert.assertFalse(task.process());
++ }
++
++ @Test
++ public void process_ExpiredDroppableMessage() throws UnknownHostException
++ {
++ InetAddressAndPort addr = InetAddressAndPort.getByName("127.0.0.1");
++
++ // we need any droppable verb, so just grab it from the enum itself rather than hard code a value
++ MessageIn msg = MessageIn.create(addr, null, Collections.emptyMap(), MessagingService.DROPPABLE_VERBS.iterator().next(), 1, 0);
++ MessageDeliveryTask task = new MessageDeliveryTask(msg, 42);
++ Assert.assertFalse(task.process());
++ }
++
++ // non-droppable message should still be processed even if they are expired
++ @Test
++ public void process_ExpiredMessage() throws UnknownHostException
++ {
++ InetAddressAndPort addr = InetAddressAndPort.getByName("127.0.0.1");
++ MessageIn msg = MessageIn.create(addr, null, Collections.emptyMap(), MessagingService.Verb.UNUSED_2, 1, 0);
++ MessageDeliveryTask task = new MessageDeliveryTask(msg, 42);
++ Assert.assertTrue(task.process());
++ Assert.assertEquals(1, VERB_HANDLER.invocationCount);
++ }
++
++ private static class MockVerbHandler implements IVerbHandler<Object>
++ {
++ private int invocationCount;
++
++ @Override
++ public void doVerb(MessageIn<Object> message, int id)
++ {
++ invocationCount++;
++ }
++
++ void reset()
++ {
++ invocationCount = 0;
++ }
++ }
++}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/test/unit/org/apache/cassandra/net/MessageInTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/net/MessageInTest.java
index 0000000,0000000..b9ea7da
new file mode 100644
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/MessageInTest.java
@@@ -1,0 -1,0 +1,66 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.cassandra.net;
++
++import java.io.IOException;
++import java.nio.ByteBuffer;
++import java.util.Collections;
++
++import org.junit.Assert;
++import org.junit.BeforeClass;
++import org.junit.Test;
++
++import org.apache.cassandra.config.DatabaseDescriptor;
++import org.apache.cassandra.io.util.DataInputBuffer;
++import org.apache.cassandra.io.util.DataInputPlus;
++import org.apache.cassandra.locator.InetAddressAndPort;
++
++public class MessageInTest
++{
++ @BeforeClass
++ public static void before()
++ {
++ DatabaseDescriptor.daemonInitialization();
++ }
++
++ // make sure deserializing message doesn't crash with an unknown verb
++ @Test
++ public void read_NullVerb() throws IOException
++ {
++ read(null);
++ }
++
++ @Test
++ public void read_NoSerializer() throws IOException
++ {
++ read(MessagingService.Verb.UNUSED_5);
++ }
++
++ private void read(MessagingService.Verb verb) throws IOException
++ {
++ InetAddressAndPort addr = InetAddressAndPort.getByName("127.0.0.1");
++ ByteBuffer buf = ByteBuffer.allocate(64);
++ buf.limit(buf.capacity());
++ DataInputPlus dataInputBuffer = new DataInputBuffer(buf, false);
++ int payloadSize = 27;
++ Assert.assertEquals(0, buf.position());
++ Assert.assertNotNull(MessageIn.read(dataInputBuffer, 1, 42, 0, addr, payloadSize, verb, Collections.emptyMap()));
++ Assert.assertEquals(payloadSize, buf.position());
++ }
++}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d585410c/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java
index 12f54c6,0000000..4eeb314
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java
+++ b/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java
@@@ -1,129 -1,0 +1,157 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.net.UnknownHostException;
++import java.util.Collections;
++import java.util.HashMap;
+import java.util.HashSet;
++import java.util.Map;
+import java.util.Set;
- import java.util.concurrent.atomic.AtomicInteger;
- import java.util.function.Predicate;
+
- import com.google.common.net.InetAddresses;
++import org.junit.After;
+import org.junit.Assert;
++import org.junit.Before;
++import org.junit.BeforeClass;
+import org.junit.Test;
+
++import org.apache.cassandra.config.DatabaseDescriptor;
++import org.apache.cassandra.gms.EndpointState;
++import org.apache.cassandra.gms.Gossiper;
++import org.apache.cassandra.gms.HeartBeatState;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
++import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
++
+public class StartupClusterConnectivityCheckerTest
+{
- @Test
- public void testConnectivity_SimpleHappyPath() throws UnknownHostException
++ private StartupClusterConnectivityChecker connectivityChecker;
++ private Set<InetAddressAndPort> peers;
++
++ @BeforeClass
++ public static void before()
+ {
- StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(70, 10, addr -> true);
- int count = 10;
- Set<InetAddressAndPort> peers = createNodes(count);
- Assert.assertEquals(StartupClusterConnectivityChecker.State.FINISH_SUCCESS,
- connectivityChecker.checkStatus(peers, new AtomicInteger(count * 2), System.nanoTime(), false, 0));
++ DatabaseDescriptor.daemonInitialization();
+ }
+
- @Test
- public void testConnectivity_SimpleContinue() throws UnknownHostException
++ @Before
++ public void setUp() throws UnknownHostException
++ {
++ connectivityChecker = new StartupClusterConnectivityChecker(70, 10);
++ peers = new HashSet<>();
++ peers.add(InetAddressAndPort.getByName("127.0.1.0"));
++ peers.add(InetAddressAndPort.getByName("127.0.1.1"));
++ peers.add(InetAddressAndPort.getByName("127.0.1.2"));
++ }
++
++ @After
++ public void tearDown()
+ {
- StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(70, 10, addr -> true);
- int count = 10;
- Set<InetAddressAndPort> peers = createNodes(count);
- Assert.assertEquals(StartupClusterConnectivityChecker.State.CONTINUE,
- connectivityChecker.checkStatus(peers, new AtomicInteger(0), System.nanoTime(), false, 0));
++ MessagingService.instance().clearMessageSinks();
+ }
+
+ @Test
- public void testConnectivity_Timeout() throws UnknownHostException
++ public void execute_HappyPath()
+ {
- StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(70, 10, addr -> true);
- int count = 10;
- Set<InetAddressAndPort> peers = createNodes(count);
- Assert.assertEquals(StartupClusterConnectivityChecker.State.CONTINUE,
- connectivityChecker.checkStatus(peers, new AtomicInteger(0), System.nanoTime(), false, 4));
- Assert.assertEquals(StartupClusterConnectivityChecker.State.FINISH_TIMEOUT,
- connectivityChecker.checkStatus(peers, new AtomicInteger(0), System.nanoTime(), true, 5));
++ Sink sink = new Sink(true, true);
++ MessagingService.instance().addMessageSink(sink);
++ Assert.assertTrue(connectivityChecker.execute(peers));
++ checkAllConnectionTypesSeen(sink);
+ }
+
+ @Test
- public void testConnectivity_SimpleUpdating() throws UnknownHostException
++ public void execute_NotAlive()
+ {
- UpdatablePredicate predicate = new UpdatablePredicate();
- final int count = 100;
- final int thresholdPercentage = 70;
- StartupClusterConnectivityChecker connectivityChecker = new StartupClusterConnectivityChecker(thresholdPercentage, 10, predicate);
- Set<InetAddressAndPort> peers = createNodes(count);
++ Sink sink = new Sink(false, true);
++ MessagingService.instance().addMessageSink(sink);
++ Assert.assertFalse(connectivityChecker.execute(peers));
++ checkAllConnectionTypesSeen(sink);
++ }
+
- AtomicInteger connectedCount = new AtomicInteger();
++ @Test
++ public void execute_NoConnectionsAcks()
++ {
++ Sink sink = new Sink(true, false);
++ MessagingService.instance().addMessageSink(sink);
++ Assert.assertFalse(connectivityChecker.execute(peers));
++ }
+
- for (int i = 0; i < count; i++)
++ private void checkAllConnectionTypesSeen(Sink sink)
++ {
++ for (InetAddressAndPort peer : peers)
+ {
- predicate.reset(i);
- connectedCount.set(i * 2);
- StartupClusterConnectivityChecker.State expectedState = i < thresholdPercentage ?
- StartupClusterConnectivityChecker.State.CONTINUE :
- StartupClusterConnectivityChecker.State.FINISH_SUCCESS;
- Assert.assertEquals("failed on iteration " + i,
- expectedState, connectivityChecker.checkStatus(peers, connectedCount, System.nanoTime(), false, i));
++ ConnectionTypeRecorder recorder = sink.seenConnectionRequests.get(peer);
++ Assert.assertNotNull(recorder);
++ Assert.assertTrue(recorder.seenSmallMessageRequest);
++ Assert.assertTrue(recorder.seenLargeMessageRequest);
+ }
+ }
+
- /**
- * returns true until index = threshold, then returns false.
- */
- private class UpdatablePredicate implements Predicate<InetAddressAndPort>
++ private static class Sink implements IMessageSink
+ {
- int index;
- int threshold;
++ private final boolean markAliveInGossip;
++ private final boolean processConnectAck;
++ private final Map<InetAddressAndPort, ConnectionTypeRecorder> seenConnectionRequests;
+
- void reset(int threshold)
++ Sink(boolean markAliveInGossip, boolean processConnectAck)
+ {
- index = 0;
- this.threshold = threshold;
++ this.markAliveInGossip = markAliveInGossip;
++ this.processConnectAck = processConnectAck;
++ seenConnectionRequests = new HashMap<>();
+ }
+
+ @Override
- public boolean test(InetAddressAndPort inetAddressAndPort)
++ public boolean allowOutgoingMessage(MessageOut message, int id, InetAddressAndPort to)
+ {
- index++;
- return index <= threshold;
++ ConnectionTypeRecorder recorder = seenConnectionRequests.computeIfAbsent(to, inetAddress -> new ConnectionTypeRecorder());
++ if (message.connectionType == SMALL_MESSAGE)
++ {
++ Assert.assertFalse(recorder.seenSmallMessageRequest);
++ recorder.seenSmallMessageRequest = true;
++ }
++ else
++ {
++ Assert.assertFalse(recorder.seenLargeMessageRequest);
++ recorder.seenLargeMessageRequest = true;
++ }
++
++ if (processConnectAck)
++ {
++ MessageIn msgIn = MessageIn.create(to, message.payload, Collections.emptyMap(), MessagingService.Verb.REQUEST_RESPONSE, 1);
++ MessagingService.instance().getRegisteredCallback(id).callback.response(msgIn);
++ }
++
++ if (markAliveInGossip)
++ Gossiper.instance.realMarkAlive(to, new EndpointState(new HeartBeatState(1, 1)));
++ return false;
+ }
- }
-
- private static Set<InetAddressAndPort> createNodes(int count) throws UnknownHostException
- {
- Set<InetAddressAndPort> nodes = new HashSet<>();
+
- if (count < 1)
- Assert.fail("need at least *one* node in the set!");
-
- InetAddressAndPort node = InetAddressAndPort.getByName("127.0.0.1");
- nodes.add(node);
- for (int i = 1; i < count; i++)
++ @Override
++ public boolean allowIncomingMessage(MessageIn message, int id)
+ {
- node = InetAddressAndPort.getByAddress(InetAddresses.increment(node.address));
- nodes.add(node);
++ return false;
+ }
- return nodes;
+ }
+
++ private static class ConnectionTypeRecorder
++ {
++ boolean seenSmallMessageRequest;
++ boolean seenLargeMessageRequest;
++ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[5/6] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by ja...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d4aacc8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d4aacc8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d4aacc8
Branch: refs/heads/cassandra-3.11
Commit: 0d4aacc832b1ffa428970d5c0e235d54585e22c2
Parents: ab8348c 06b3521
Author: Jason Brown <ja...@gmail.com>
Authored: Thu May 17 10:34:56 2018 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Thu May 17 10:35:36 2018 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/net/MessagingService.java | 14 ++++-
.../org/apache/cassandra/net/PingMessage.java | 60 ++++++++++++++++++++
3 files changed, 72 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d4aacc8/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d70b381,1293bd4..87e7c24
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,20 -1,5 +1,21 @@@
-3.0.17
+3.11.3
+ * Fix New SASI view creation during Index Redistribution (CASSANDRA-14055)
+ * Remove string formatting lines from BufferPool hot path (CASSANDRA-14416)
+ * Update metrics to 3.1.5 (CASSANDRA-12924)
+ * Detect OpenJDK jvm type and architecture (CASSANDRA-12793)
+ * Don't use guava collections in the non-system keyspace jmx attributes (CASSANDRA-12271)
+ * Allow existing nodes to use all peers in shadow round (CASSANDRA-13851)
+ * Fix cqlsh to read connection.ssl cqlshrc option again (CASSANDRA-14299)
+ * Downgrade log level to trace for CommitLogSegmentManager (CASSANDRA-14370)
+ * CQL fromJson(null) throws NullPointerException (CASSANDRA-13891)
+ * Serialize empty buffer as empty string for json output format (CASSANDRA-14245)
+ * Allow logging implementation to be interchanged for embedded testing (CASSANDRA-13396)
+ * SASI tokenizer for simple delimiter based entries (CASSANDRA-14247)
+ * Fix Loss of digits when doing CAST from varint/bigint to decimal (CASSANDRA-14170)
+ * RateBasedBackPressure unnecessarily invokes a lock on the Guava RateLimiter (CASSANDRA-14163)
+ * Fix wildcard GROUP BY queries (CASSANDRA-14209)
+Merged from 3.0:
+ * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
* Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)
* Cassandra not starting when using enhanced startup scripts in windows (CASSANDRA-14418)
* Fix progress stats and units in compactionstats (CASSANDRA-12244)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d4aacc8/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index 027e218,047f51f..59ed8f3
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -206,36 -143,18 +206,41 @@@ public final class MessagingService imp
_TRACE, // dummy verb so we can use MS.droppedMessagesMap
ECHO,
REPAIR_MESSAGE,
- PAXOS_PREPARE,
- PAXOS_PROPOSE,
- PAXOS_COMMIT,
- @Deprecated PAGED_RANGE,
+ PAXOS_PREPARE
+ {
+ public long getTimeout()
+ {
+ return DatabaseDescriptor.getWriteRpcTimeout();
+ }
+ },
+ PAXOS_PROPOSE
+ {
+ public long getTimeout()
+ {
+ return DatabaseDescriptor.getWriteRpcTimeout();
+ }
+ },
+ PAXOS_COMMIT
+ {
+ public long getTimeout()
+ {
+ return DatabaseDescriptor.getWriteRpcTimeout();
+ }
+ },
+ @Deprecated PAGED_RANGE
+ {
+ public long getTimeout()
+ {
+ return DatabaseDescriptor.getRangeRpcTimeout();
+ }
+ },
- // remember to add new verbs at the end, since we serialize by ordinal
- UNUSED_1,
+ PING,
-
+ // UNUSED verbs were used as padding for backward/forward compatability before 4.0,
+ // but it wasn't quite as bullet/future proof as needed. We still need to keep these entries
+ // around, at least for a major rev or two (post-4.0). see CASSANDRA-13993 for a discussion.
+ // For now, though, the UNUSED are legacy values (placeholders, basically) that should only be used
+ // for correctly adding VERBs that need to be emergency additions to 3.0/3.11.
+ // We can reclaim them (their id's, to be correct) in future versions, if desired, though.
UNUSED_2,
UNUSED_3,
UNUSED_4,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[2/6] cassandra git commit: Cleanup StartupClusterConnectivityChecker
and PING Verb
Posted by ja...@apache.org.
Cleanup StartupClusterConnectivityChecker and PING Verb
patch by jasobrown; reviewed by Ariel Weisberg for CASSANDRA-14447
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06b3521a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06b3521a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06b3521a
Branch: refs/heads/cassandra-3.11
Commit: 06b3521acdb21dd3d85902d59146b9d08ad7d752
Parents: 6efa99c
Author: Jason Brown <ja...@gmail.com>
Authored: Fri Apr 6 11:47:35 2018 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Thu May 17 10:34:31 2018 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/net/MessagingService.java | 15 ++++-
.../org/apache/cassandra/net/PingMessage.java | 60 ++++++++++++++++++++
3 files changed, 73 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06b3521a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3da808a..1293bd4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.17
+ * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
* Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)
* Cassandra not starting when using enhanced startup scripts in windows (CASSANDRA-14418)
* Fix progress stats and units in compactionstats (CASSANDRA-12244)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06b3521a/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index e0f77b7..047f51f 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -147,13 +147,20 @@ public final class MessagingService implements MessagingServiceMBean
PAXOS_PROPOSE,
PAXOS_COMMIT,
@Deprecated PAGED_RANGE,
- // remember to add new verbs at the end, since we serialize by ordinal
- UNUSED_1,
+ PING,
+
+ // UNUSED verbs were used as padding for backward/forward compatability before 4.0,
+ // but it wasn't quite as bullet/future proof as needed. We still need to keep these entries
+ // around, at least for a major rev or two (post-4.0). see CASSANDRA-13993 for a discussion.
+ // For now, though, the UNUSED are legacy values (placeholders, basically) that should only be used
+ // for correctly adding VERBs that need to be emergency additions to 3.0/3.11.
+ // We can reclaim them (their id's, to be correct) in future versions, if desired, though.
UNUSED_2,
UNUSED_3,
UNUSED_4,
UNUSED_5,
;
+ // remember to add new verbs at the end, since we serialize by ordinal
// This is to support a "late" choice of the verb based on the messaging service version.
// See CASSANDRA-12249 for more details.
@@ -210,9 +217,10 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.SNAPSHOT, Stage.MISC);
put(Verb.ECHO, Stage.GOSSIP);
- put(Verb.UNUSED_1, Stage.INTERNAL_RESPONSE);
put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE);
put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE);
+
+ put(Verb.PING, Stage.READ);
}};
/**
@@ -251,6 +259,7 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.HINT, HintMessage.serializer);
put(Verb.BATCH_STORE, Batch.serializer);
put(Verb.BATCH_REMOVE, UUIDSerializer.serializer);
+ put(Verb.PING, PingMessage.serializer);
}};
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06b3521a/src/java/org/apache/cassandra/net/PingMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/PingMessage.java b/src/java/org/apache/cassandra/net/PingMessage.java
new file mode 100644
index 0000000..8eaf23e
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/PingMessage.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * A backport of the version from 4.0, intentionnaly added as versions 4.0 or greater aren't guaranteed
+ * to know the c* versions they communicate with before they connect.
+ *
+ * It is intentional that no {@link IVerbHandler} is provided as we do not want process the message;
+ * the intent is to not break the stream by leaving it in an unclean state, with unconsumed bytes.
+ * We do, however, assign a {@link org.apache.cassandra.concurrent.StageManager} stage
+ * to maintain proper message flow.
+ * See CASSANDRA-13393 for a discussion.
+ */
+public class PingMessage
+{
+ public static IVersionedSerializer<PingMessage> serializer = new PingMessageSerializer();
+
+ public static class PingMessageSerializer implements IVersionedSerializer<PingMessage>
+ {
+ public void serialize(PingMessage t, DataOutputPlus out, int version)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public PingMessage deserialize(DataInputPlus in, int version) throws IOException
+ {
+ // throw away the one byte of the payload
+ in.readByte();
+ return new PingMessage();
+ }
+
+ public long serializedSize(PingMessage t, int version)
+ {
+ return 1;
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org