You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by mc...@apache.org on 2022/12/18 02:35:03 UTC

[cassandra] branch cassandra-4.0 updated: Restore custom param types over messaging system

This is an automated email from the ASF dual-hosted git repository.

mck pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
     new f01d2b4a3c Restore custom param types over messaging system
f01d2b4a3c is described below

commit f01d2b4a3ca114ac58d95befcab3dbbad9d960aa
Author: Mick Semb Wever <mc...@apache.org>
AuthorDate: Sat Nov 12 12:02:35 2022 +0100

    Restore custom param types over messaging system
    
     patch by Mick Semb Wever; reviewed by Aleksey Yeschenko for CASSANDRA-17981
---
 CHANGES.txt                                        |  1 +
 .../cassandra/net/CustomParamsSerializer.java      | 73 ++++++++++++++++++++++
 src/java/org/apache/cassandra/net/Message.java     | 18 ++++++
 src/java/org/apache/cassandra/net/ParamType.java   |  5 +-
 .../org/apache/cassandra/utils/ByteArrayUtil.java  | 24 +++++++
 .../unit/org/apache/cassandra/net/MessageTest.java | 36 +++++++++++
 6 files changed, 156 insertions(+), 1 deletion(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 4dfc269f6a..fee031d8f5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0.8
+ * Restore internode custom tracing on 4.0's new messaging system (CASSANDRA-17981)
  * Harden parsing of boolean values in CQL in PropertyDefinitions (CASSANDRA-17878)
  * Fix error message about type hints (CASSANDRA-17915)
  * Fix possible race condition on repair snapshots (CASSANDRA-17955)
diff --git a/src/java/org/apache/cassandra/net/CustomParamsSerializer.java b/src/java/org/apache/cassandra/net/CustomParamsSerializer.java
new file mode 100644
index 0000000000..a866651ee1
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/CustomParamsSerializer.java
@@ -0,0 +1,73 @@
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.ByteArrayUtil;
+
+
+class CustomParamsSerializer implements IVersionedSerializer<Map<String,byte[]>>
+{
+    public static final CustomParamsSerializer serializer = new CustomParamsSerializer();
+
+    @Override
+    public void serialize(Map<String, byte[]> t, DataOutputPlus out, int version) throws IOException
+    {
+        out.writeUnsignedVInt(t.size());
+        for (Map.Entry<String, byte[]> e : t.entrySet())
+        {
+            out.writeUTF(e.getKey());
+            ByteArrayUtil.writeWithVIntLength(e.getValue(), out);
+        }
+    }
+
+    @Override
+    public long serializedSize(Map<String, byte[]> t, int version)
+    {
+        int size = TypeSizes.sizeofUnsignedVInt(t.size());
+        for (Map.Entry<String,byte[]> e : t.entrySet())
+        {
+            size += TypeSizes.sizeof(e.getKey());
+            size += ByteArrayUtil.serializedSizeWithVIntLength(e.getValue());
+        }
+        return size;
+    }
+
+    @Override
+    public Map<String, byte[]> deserialize(DataInputPlus in, int version) throws IOException
+    {
+        int entries = (int) in.readUnsignedVInt();
+        Map<String, byte[]> customParams = Maps.newHashMapWithExpectedSize(entries);
+
+        for (int i = 0 ; i < entries ; ++i)
+            customParams.put(in.readUTF(), ByteArrayUtil.readWithVIntLength(in));
+
+        return customParams;
+    }
+
+}
diff --git a/src/java/org/apache/cassandra/net/Message.java b/src/java/org/apache/cassandra/net/Message.java
index 2640d8f2b8..bc176bdb86 100644
--- a/src/java/org/apache/cassandra/net/Message.java
+++ b/src/java/org/apache/cassandra/net/Message.java
@@ -19,7 +19,9 @@ package org.apache.cassandra.net;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.EnumMap;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -49,6 +51,7 @@ import org.apache.cassandra.utils.NoSpamLogger;
 
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
 import static org.apache.cassandra.db.TypeSizes.sizeof;
 import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
 import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAddressAndPortSerializer;
@@ -423,6 +426,12 @@ public class Message<T>
         {
             return (TraceType) params.getOrDefault(ParamType.TRACE_TYPE, TraceType.QUERY);
         }
+
+        @Nullable
+        public Map<String,byte[]> customParams()
+        {
+            return (Map<String,byte[]>) params.get(ParamType.CUSTOM_MAP);
+        }
     }
 
     @SuppressWarnings("WeakerAccess")
@@ -473,6 +482,15 @@ public class Message<T>
             return this;
         }
 
+        public Builder<T> withCustomParam(String name, byte[] value)
+        {
+            Map<String,byte[]> customParams  = (Map<String,byte[]>)
+                    params.computeIfAbsent(ParamType.CUSTOM_MAP, (t) -> new HashMap<String,byte[]>());
+
+            customParams.put(name, value);
+            return this;
+        }
+
         /**
          * A shortcut to add tracing params.
          * Effectively, it is the same as calling {@link #withParam(ParamType, Object)} with tracing params
diff --git a/src/java/org/apache/cassandra/net/ParamType.java b/src/java/org/apache/cassandra/net/ParamType.java
index 5d4b982ce5..007605b260 100644
--- a/src/java/org/apache/cassandra/net/ParamType.java
+++ b/src/java/org/apache/cassandra/net/ParamType.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 import static java.lang.Math.max;
+
 import static org.apache.cassandra.locator.InetAddressAndPort.FwdFrmSerializer.fwdFrmSerializer;
 
 /**
@@ -54,7 +55,9 @@ public enum ParamType
     TRACE_TYPE          (6, "TraceType",     Tracing.traceTypeSerializer),
 
     @Deprecated
-    TRACK_REPAIRED_DATA (7, "TrackRepaired", LegacyFlag.serializer);
+    TRACK_REPAIRED_DATA (7, "TrackRepaired", LegacyFlag.serializer),
+
+    CUSTOM_MAP          (14, "CUSTOM",       CustomParamsSerializer.serializer);
 
     final int id;
     @Deprecated final String legacyAlias; // pre-4.0 we used to serialize entire param name string
diff --git a/src/java/org/apache/cassandra/utils/ByteArrayUtil.java b/src/java/org/apache/cassandra/utils/ByteArrayUtil.java
index 75734ada96..58229c0863 100644
--- a/src/java/org/apache/cassandra/utils/ByteArrayUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteArrayUtil.java
@@ -26,6 +26,8 @@ import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
 
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
 
 // mostly copied from java.io.Bits
 public class ByteArrayUtil
@@ -230,6 +232,17 @@ public class ByteArrayUtil
         out.write(buffer);
     }
 
+    public static void writeWithVIntLength(byte[] bytes, DataOutputPlus out) throws IOException
+    {
+        out.writeUnsignedVInt(bytes.length);
+        out.write(bytes);
+    }
+
+    public static int serializedSizeWithVIntLength(byte[] bytes)
+    {
+        return TypeSizes.sizeofUnsignedVInt(bytes.length) + bytes.length;
+    }
+
     public static byte[] readWithLength(DataInput in) throws IOException
     {
         byte[] b = new byte[in.readInt()];
@@ -244,6 +257,17 @@ public class ByteArrayUtil
         return b;
     }
 
+    public static byte[] readWithVIntLength(DataInputPlus in) throws IOException
+    {
+        int length = (int)in.readUnsignedVInt();
+        if (length < 0)
+            throw new IOException("Corrupt (negative) value length encountered");
+
+        byte[] bytes = new byte[length];
+        in.readFully(bytes);
+        return bytes;
+    }
+
     public static void copyBytes(byte[] src, int srcPos, byte[] dst, int dstPos, int length)
     {
         System.arraycopy(src, srcPos, dst, dstPos, length);
diff --git a/test/unit/org/apache/cassandra/net/MessageTest.java b/test/unit/org/apache/cassandra/net/MessageTest.java
index a0deadfd3e..d3f56177eb 100644
--- a/test/unit/org/apache/cassandra/net/MessageTest.java
+++ b/test/unit/org/apache/cassandra/net/MessageTest.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.net;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.StandardCharsets;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
@@ -37,6 +39,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.tracing.Tracing.TraceType;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.FreeRunningClock;
 
@@ -49,6 +52,7 @@ import static org.apache.cassandra.net.ParamType.RESPOND_TO;
 import static org.apache.cassandra.net.ParamType.TRACE_SESSION;
 import static org.apache.cassandra.net.ParamType.TRACE_TYPE;
 import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+
 import static org.junit.Assert.*;
 
 public class MessageTest
@@ -211,6 +215,38 @@ public class MessageTest
         assertNull(msg.header.traceSession());
     }
 
+    @Test
+    public void testCustomParams() throws CharacterCodingException, IOException
+    {
+        long id = 1;
+        InetAddressAndPort from = FBUtilities.getLocalAddressAndPort();
+
+        Message<NoPayload> msg =
+            Message.builder(Verb._TEST_1, noPayload)
+                   .withId(1)
+                   .from(from)
+                   .withCustomParam("custom1", "custom1value".getBytes(StandardCharsets.UTF_8))
+                   .withCustomParam("custom2", "custom2value".getBytes(StandardCharsets.UTF_8))
+                   .build();
+
+        assertEquals(id, msg.id());
+        assertEquals(from, msg.from());
+        assertEquals(2, msg.header.customParams().size());
+        assertEquals("custom1value", new String(msg.header.customParams().get("custom1"), StandardCharsets.UTF_8));
+        assertEquals("custom2value", new String(msg.header.customParams().get("custom2"), StandardCharsets.UTF_8));
+
+        DataOutputBuffer out = DataOutputBuffer.scratchBuffer.get();
+        Message.serializer.serialize(msg, out, VERSION_40);
+        DataInputBuffer in = new DataInputBuffer(out.buffer(), true);
+        msg = Message.serializer.deserialize(in, from, VERSION_40);
+
+        assertEquals(id, msg.id());
+        assertEquals(from, msg.from());
+        assertEquals(2, msg.header.customParams().size());
+        assertEquals("custom1value", new String(msg.header.customParams().get("custom1"), StandardCharsets.UTF_8));
+        assertEquals("custom2value", new String(msg.header.customParams().get("custom2"), StandardCharsets.UTF_8));
+    }
+
     private void testAddTraceHeaderWithType(TraceType traceType)
     {
         try


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org