You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/05/26 11:51:43 UTC
[cassandra] branch trunk updated: Add support for adding custom
Verbs
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 19a8f4e Add support for adding custom Verbs
19a8f4e is described below
commit 19a8f4ea13eb844bc0387637f82da1da62991107
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Thu Apr 2 08:23:33 2020 +0200
Add support for adding custom Verbs
Patch by marcuse; reviewed by Benedict Elliott Smith and David Capwell for CASSANDRA-15725
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/net/Verb.java | 106 +++++++++++++++++++++--
test/unit/org/apache/cassandra/net/VerbTest.java | 33 +++++++
3 files changed, 133 insertions(+), 7 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 1be10dd..2a35318 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha5
+ * Add support for adding custom Verbs (CASSANDRA-15725)
* Speed up entire-file-streaming file containment check and allow entire-file-streaming for all compaction strategies (CASSANDRA-15657,CASSANDRA-15783)
* Provide ability to configure IAuditLogger (CASSANDRA-15748)
* Fix nodetool enablefullquerylog blocking param parsing (CASSANDRA-15819)
diff --git a/src/java/org/apache/cassandra/net/Verb.java b/src/java/org/apache/cassandra/net/Verb.java
index 67d847e..6ba9ab8 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -91,6 +91,7 @@ import static org.apache.cassandra.concurrent.Stage.*;
import static org.apache.cassandra.concurrent.Stage.INTERNAL_RESPONSE;
import static org.apache.cassandra.concurrent.Stage.MISC;
import static org.apache.cassandra.net.VerbTimeouts.*;
+import static org.apache.cassandra.net.Verb.Kind.*;
import static org.apache.cassandra.net.Verb.Priority.*;
import static org.apache.cassandra.schema.MigrationManager.MigrationsSerializer;
@@ -185,6 +186,11 @@ public enum Verb
INTERNAL_RSP (23, P1, rpcTimeout, INTERNAL_RESPONSE, () -> null, () -> ResponseVerbHandler.instance ),
// largest used ID: 116
+
+ // CUSTOM VERBS
+ UNUSED_CUSTOM_VERB (CUSTOM,
+ 0, P1, rpcTimeout, INTERNAL_RESPONSE, () -> null, () -> null ),
+
;
public static final List<Verb> VERBS = ImmutableList.copyOf(Verb.values());
@@ -198,9 +204,16 @@ public enum Verb
P4
}
+ public enum Kind
+ {
+ NORMAL,
+ CUSTOM
+ }
+
public final int id;
public final Priority priority;
public final Stage stage;
+ public final Kind kind;
/**
* Messages we receive from peers have a Verb that tells us what kind of message it is.
@@ -233,16 +246,38 @@ public enum Verb
Verb(int id, Priority priority, ToLongFunction<TimeUnit> expiration, Stage stage, Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> serializer, Supplier<? extends IVerbHandler<?>> handler, Verb responseVerb)
{
+ this(NORMAL, id, priority, expiration, stage, serializer, handler, responseVerb);
+ }
+
+ Verb(Kind kind, int id, Priority priority, ToLongFunction<TimeUnit> expiration, Stage stage, Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> serializer, Supplier<? extends IVerbHandler<?>> handler)
+ {
+ this(kind, id, priority, expiration, stage, serializer, handler, null);
+ }
+
+ Verb(Kind kind, int id, Priority priority, ToLongFunction<TimeUnit> expiration, Stage stage, Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> serializer, Supplier<? extends IVerbHandler<?>> handler, Verb responseVerb)
+ {
this.stage = stage;
if (id < 0)
throw new IllegalArgumentException("Verb id must be non-negative, got " + id + " for verb " + name());
- this.id = id;
+ if (kind == CUSTOM)
+ {
+ if (id > MAX_CUSTOM_VERB_ID)
+ throw new AssertionError("Invalid custom verb id " + id + " - we only allow custom ids between 0 and " + MAX_CUSTOM_VERB_ID);
+ this.id = idForCustomVerb(id);
+ }
+ else
+ {
+ if (id > CUSTOM_VERB_START - MAX_CUSTOM_VERB_ID)
+ throw new AssertionError("Invalid verb id " + id + " - we only allow ids between 0 and " + (CUSTOM_VERB_START - MAX_CUSTOM_VERB_ID));
+ this.id = id;
+ }
this.priority = priority;
this.serializer = serializer;
this.handler = handler;
this.responseVerb = responseVerb;
this.expiration = expiration;
+ this.kind = kind;
}
public <In, Out> IVersionedAsymmetricSerializer<In, Out> serializer()
@@ -319,33 +354,90 @@ public enum Verb
return original;
}
+ // This is the largest number we can store in 2 bytes using VIntCoding (1 bit per byte is used to indicate if there is more data coming).
+ // When generating ids we count *down* from this number
+ private static final int CUSTOM_VERB_START = (1 << (7 * 2)) - 1;
+
+ // Sanity check for the custom verb ids - avoids someone mistakenly adding a custom verb id close to the normal verbs which
+ // could cause a conflict later when new normal verbs are added.
+ private static final int MAX_CUSTOM_VERB_ID = 1000;
+
private static final Verb[] idToVerbMap;
+ private static final Verb[] idToCustomVerbMap;
+ private static final int minCustomId;
static
{
Verb[] verbs = values();
int max = -1;
+ int minCustom = Integer.MAX_VALUE;
for (Verb v : verbs)
- max = Math.max(v.id, max);
+ {
+ switch (v.kind)
+ {
+ case NORMAL:
+ max = Math.max(v.id, max);
+ break;
+ case CUSTOM:
+ minCustom = Math.min(v.id, minCustom);
+ break;
+ default:
+ throw new AssertionError("Unsupported Verb Kind: " + v.kind + " for verb " + v);
+ }
+ }
+ minCustomId = minCustom;
+
+ if (minCustom <= max)
+ throw new IllegalStateException("Overlapping verb ids are not allowed");
Verb[] idMap = new Verb[max + 1];
+ int customCount = minCustom < Integer.MAX_VALUE ? CUSTOM_VERB_START - minCustom : 0;
+ Verb[] customIdMap = new Verb[customCount + 1];
for (Verb v : verbs)
{
- if (idMap[v.id] != null)
- throw new IllegalArgumentException("cannot have two verbs that map to the same id: " + v + " and " + idMap[v.id]);
- idMap[v.id] = v;
+ switch (v.kind)
+ {
+ case NORMAL:
+ if (idMap[v.id] != null)
+ throw new IllegalArgumentException("cannot have two verbs that map to the same id: " + v + " and " + idMap[v.id]);
+ idMap[v.id] = v;
+ break;
+ case CUSTOM:
+ int relativeId = idForCustomVerb(v.id);
+ if (customIdMap[relativeId] != null)
+ throw new IllegalArgumentException("cannot have two custom verbs that map to the same id: " + v + " and " + customIdMap[relativeId]);
+ customIdMap[relativeId] = v;
+ break;
+ default:
+ throw new AssertionError("Unsupported Verb Kind: " + v.kind + " for verb " + v);
+ }
}
idToVerbMap = idMap;
+ idToCustomVerbMap = customIdMap;
}
- static Verb fromId(int id)
+ public static Verb fromId(int id)
{
- Verb verb = id >= 0 && id < idToVerbMap.length ? idToVerbMap[id] : null;
+ Verb[] verbs = idToVerbMap;
+ if (id >= minCustomId)
+ {
+ id = idForCustomVerb(id);
+ verbs = idToCustomVerbMap;
+ }
+ Verb verb = id >= 0 && id < verbs.length ? verbs[id] : null;
if (verb == null)
throw new IllegalArgumentException("Unknown verb id " + id);
return verb;
}
+
+ /**
+ * calculate an id for a custom verb
+ */
+ private static int idForCustomVerb(int id)
+ {
+ return CUSTOM_VERB_START - id;
+ }
}
@SuppressWarnings("unused")
diff --git a/test/unit/org/apache/cassandra/net/VerbTest.java b/test/unit/org/apache/cassandra/net/VerbTest.java
new file mode 100644
index 0000000..8f20567
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/VerbTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class VerbTest
+{
+ @Test
+ public void idsMatch()
+ {
+ for (Verb v : Verb.values())
+ assertEquals(v, Verb.fromId(v.id));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org