You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/05/26 11:51:43 UTC

[cassandra] branch trunk updated: Add support for adding custom Verbs

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 19a8f4e  Add support for adding custom Verbs
19a8f4e is described below

commit 19a8f4ea13eb844bc0387637f82da1da62991107
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Thu Apr 2 08:23:33 2020 +0200

    Add support for adding custom Verbs
    
    Patch by marcuse; reviewed by Benedict Elliott Smith and David Capwell for CASSANDRA-15725
---
 CHANGES.txt                                      |   1 +
 src/java/org/apache/cassandra/net/Verb.java      | 106 +++++++++++++++++++++--
 test/unit/org/apache/cassandra/net/VerbTest.java |  33 +++++++
 3 files changed, 133 insertions(+), 7 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 1be10dd..2a35318 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha5
+ * Add support for adding custom Verbs (CASSANDRA-15725)
  * Speed up entire-file-streaming file containment check and allow entire-file-streaming for all compaction strategies (CASSANDRA-15657,CASSANDRA-15783)
  * Provide ability to configure IAuditLogger (CASSANDRA-15748)
  * Fix nodetool enablefullquerylog blocking param parsing (CASSANDRA-15819)
diff --git a/src/java/org/apache/cassandra/net/Verb.java b/src/java/org/apache/cassandra/net/Verb.java
index 67d847e..6ba9ab8 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -91,6 +91,7 @@ import static org.apache.cassandra.concurrent.Stage.*;
 import static org.apache.cassandra.concurrent.Stage.INTERNAL_RESPONSE;
 import static org.apache.cassandra.concurrent.Stage.MISC;
 import static org.apache.cassandra.net.VerbTimeouts.*;
+import static org.apache.cassandra.net.Verb.Kind.*;
 import static org.apache.cassandra.net.Verb.Priority.*;
 import static org.apache.cassandra.schema.MigrationManager.MigrationsSerializer;
 
@@ -185,6 +186,11 @@ public enum Verb
     INTERNAL_RSP           (23,  P1, rpcTimeout,      INTERNAL_RESPONSE, () -> null,                                 () -> ResponseVerbHandler.instance                             ),
 
     // largest used ID: 116
+
+    // CUSTOM VERBS
+    UNUSED_CUSTOM_VERB     (CUSTOM,
+                            0,   P1, rpcTimeout,      INTERNAL_RESPONSE, () -> null,                                 () -> null                                                     ),
+
     ;
 
     public static final List<Verb> VERBS = ImmutableList.copyOf(Verb.values());
@@ -198,9 +204,16 @@ public enum Verb
         P4
     }
 
+    public enum Kind
+    {
+        NORMAL,
+        CUSTOM
+    }
+
     public final int id;
     public final Priority priority;
     public final Stage stage;
+    public final Kind kind;
 
     /**
      * Messages we receive from peers have a Verb that tells us what kind of message it is.
@@ -233,16 +246,38 @@ public enum Verb
 
     Verb(int id, Priority priority, ToLongFunction<TimeUnit> expiration, Stage stage, Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> serializer, Supplier<? extends IVerbHandler<?>> handler, Verb responseVerb)
     {
+        this(NORMAL, id, priority, expiration, stage, serializer, handler, responseVerb);
+    }
+
+    Verb(Kind kind, int id, Priority priority, ToLongFunction<TimeUnit> expiration, Stage stage, Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> serializer, Supplier<? extends IVerbHandler<?>> handler)
+    {
+        this(kind, id, priority, expiration, stage, serializer, handler, null);
+    }
+
+    Verb(Kind kind, int id, Priority priority, ToLongFunction<TimeUnit> expiration, Stage stage, Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> serializer, Supplier<? extends IVerbHandler<?>> handler, Verb responseVerb)
+    {
         this.stage = stage;
         if (id < 0)
             throw new IllegalArgumentException("Verb id must be non-negative, got " + id + " for verb " + name());
 
-        this.id = id;
+        if (kind == CUSTOM)
+        {
+            if (id > MAX_CUSTOM_VERB_ID)
+                throw new AssertionError("Invalid custom verb id " + id + " - we only allow custom ids between 0 and " + MAX_CUSTOM_VERB_ID);
+            this.id = idForCustomVerb(id);
+        }
+        else
+        {
+            if (id > CUSTOM_VERB_START - MAX_CUSTOM_VERB_ID)
+                throw new AssertionError("Invalid verb id " + id + " - we only allow ids between 0 and " + (CUSTOM_VERB_START - MAX_CUSTOM_VERB_ID));
+            this.id = id;
+        }
         this.priority = priority;
         this.serializer = serializer;
         this.handler = handler;
         this.responseVerb = responseVerb;
         this.expiration = expiration;
+        this.kind = kind;
     }
 
     public <In, Out> IVersionedAsymmetricSerializer<In, Out> serializer()
@@ -319,33 +354,90 @@ public enum Verb
         return original;
     }
 
+    // This is the largest number we can store in 2 bytes using VIntCoding (1 bit per byte is used to indicate if there is more data coming).
+    // When generating ids we count *down* from this number
+    private static final int CUSTOM_VERB_START = (1 << (7 * 2)) - 1;
+
+    // Sanity check for the custom verb ids - avoids someone mistakenly adding a custom verb id close to the normal verbs which
+    // could cause a conflict later when new normal verbs are added.
+    private static final int MAX_CUSTOM_VERB_ID = 1000;
+
     private static final Verb[] idToVerbMap;
+    private static final Verb[] idToCustomVerbMap;
+    private static final int minCustomId;
 
     static
     {
         Verb[] verbs = values();
         int max = -1;
+        int minCustom = Integer.MAX_VALUE;
         for (Verb v : verbs)
-            max = Math.max(v.id, max);
+        {
+            switch (v.kind)
+            {
+                case NORMAL:
+                    max = Math.max(v.id, max);
+                    break;
+                case CUSTOM:
+                    minCustom = Math.min(v.id, minCustom);
+                    break;
+                default:
+                    throw new AssertionError("Unsupported Verb Kind: " + v.kind + " for verb " + v);
+            }
+        }
+        minCustomId = minCustom;
+
+        if (minCustom <= max)
+            throw new IllegalStateException("Overlapping verb ids are not allowed");
 
         Verb[] idMap = new Verb[max + 1];
+        int customCount = minCustom < Integer.MAX_VALUE ? CUSTOM_VERB_START - minCustom : 0;
+        Verb[] customIdMap = new Verb[customCount + 1];
         for (Verb v : verbs)
         {
-            if (idMap[v.id] != null)
-                throw new IllegalArgumentException("cannot have two verbs that map to the same id: " + v + " and " + idMap[v.id]);
-            idMap[v.id] = v;
+            switch (v.kind)
+            {
+                case NORMAL:
+                    if (idMap[v.id] != null)
+                        throw new IllegalArgumentException("cannot have two verbs that map to the same id: " + v + " and " + idMap[v.id]);
+                    idMap[v.id] = v;
+                    break;
+                case CUSTOM:
+                    int relativeId = idForCustomVerb(v.id);
+                    if (customIdMap[relativeId] != null)
+                        throw new IllegalArgumentException("cannot have two custom verbs that map to the same id: " + v + " and " + customIdMap[relativeId]);
+                    customIdMap[relativeId] = v;
+                    break;
+                default:
+                    throw new AssertionError("Unsupported Verb Kind: " + v.kind + " for verb " + v);
+            }
         }
 
         idToVerbMap = idMap;
+        idToCustomVerbMap = customIdMap;
     }
 
-    static Verb fromId(int id)
+    public static Verb fromId(int id)
     {
-        Verb verb = id >= 0 && id < idToVerbMap.length ? idToVerbMap[id] : null;
+        Verb[] verbs = idToVerbMap;
+        if (id >= minCustomId)
+        {
+            id = idForCustomVerb(id);
+            verbs = idToCustomVerbMap;
+        }
+        Verb verb = id >= 0 && id < verbs.length ? verbs[id] : null;
         if (verb == null)
             throw new IllegalArgumentException("Unknown verb id " + id);
         return verb;
     }
+
+    /**
+     * calculate an id for a custom verb
+     */
+    private static int idForCustomVerb(int id)
+    {
+        return CUSTOM_VERB_START - id;
+    }
 }
 
 @SuppressWarnings("unused")
diff --git a/test/unit/org/apache/cassandra/net/VerbTest.java b/test/unit/org/apache/cassandra/net/VerbTest.java
new file mode 100644
index 0000000..8f20567
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/VerbTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class VerbTest
+{
+    @Test
+    public void idsMatch()
+    {
+        for (Verb v : Verb.values())
+            assertEquals(v, Verb.fromId(v.id));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org