You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by mc...@apache.org on 2022/12/18 02:35:03 UTC
[cassandra] branch cassandra-4.0 updated: Restore custom param types over messaging system
This is an automated email from the ASF dual-hosted git repository.
mck pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
new f01d2b4a3c Restore custom param types over messaging system
f01d2b4a3c is described below
commit f01d2b4a3ca114ac58d95befcab3dbbad9d960aa
Author: Mick Semb Wever <mc...@apache.org>
AuthorDate: Sat Nov 12 12:02:35 2022 +0100
Restore custom param types over messaging system
patch by Mick Semb Wever; reviewed by Aleksey Yeschenko for CASSANDRA-17981
---
CHANGES.txt | 1 +
.../cassandra/net/CustomParamsSerializer.java | 73 ++++++++++++++++++++++
src/java/org/apache/cassandra/net/Message.java | 18 ++++++
src/java/org/apache/cassandra/net/ParamType.java | 5 +-
.../org/apache/cassandra/utils/ByteArrayUtil.java | 24 +++++++
.../unit/org/apache/cassandra/net/MessageTest.java | 36 +++++++++++
6 files changed, 156 insertions(+), 1 deletion(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 4dfc269f6a..fee031d8f5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0.8
+ * Restore internode custom tracing on 4.0's new messaging system (CASSANDRA-17981)
* Harden parsing of boolean values in CQL in PropertyDefinitions (CASSANDRA-17878)
* Fix error message about type hints (CASSANDRA-17915)
* Fix possible race condition on repair snapshots (CASSANDRA-17955)
diff --git a/src/java/org/apache/cassandra/net/CustomParamsSerializer.java b/src/java/org/apache/cassandra/net/CustomParamsSerializer.java
new file mode 100644
index 0000000000..a866651ee1
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/CustomParamsSerializer.java
@@ -0,0 +1,73 @@
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.ByteArrayUtil;
+
+
+class CustomParamsSerializer implements IVersionedSerializer<Map<String,byte[]>>
+{
+ public static final CustomParamsSerializer serializer = new CustomParamsSerializer();
+
+ @Override
+ public void serialize(Map<String, byte[]> t, DataOutputPlus out, int version) throws IOException
+ {
+ out.writeUnsignedVInt(t.size());
+ for (Map.Entry<String, byte[]> e : t.entrySet())
+ {
+ out.writeUTF(e.getKey());
+ ByteArrayUtil.writeWithVIntLength(e.getValue(), out);
+ }
+ }
+
+ @Override
+ public long serializedSize(Map<String, byte[]> t, int version)
+ {
+ int size = TypeSizes.sizeofUnsignedVInt(t.size());
+ for (Map.Entry<String,byte[]> e : t.entrySet())
+ {
+ size += TypeSizes.sizeof(e.getKey());
+ size += ByteArrayUtil.serializedSizeWithVIntLength(e.getValue());
+ }
+ return size;
+ }
+
+ @Override
+ public Map<String, byte[]> deserialize(DataInputPlus in, int version) throws IOException
+ {
+ int entries = (int) in.readUnsignedVInt();
+ Map<String, byte[]> customParams = Maps.newHashMapWithExpectedSize(entries);
+
+ for (int i = 0 ; i < entries ; ++i)
+ customParams.put(in.readUTF(), ByteArrayUtil.readWithVIntLength(in));
+
+ return customParams;
+ }
+
+}
diff --git a/src/java/org/apache/cassandra/net/Message.java b/src/java/org/apache/cassandra/net/Message.java
index 2640d8f2b8..bc176bdb86 100644
--- a/src/java/org/apache/cassandra/net/Message.java
+++ b/src/java/org/apache/cassandra/net/Message.java
@@ -19,7 +19,9 @@ package org.apache.cassandra.net;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.EnumMap;
+import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -49,6 +51,7 @@ import org.apache.cassandra.utils.NoSpamLogger;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
import static org.apache.cassandra.db.TypeSizes.sizeof;
import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAddressAndPortSerializer;
@@ -423,6 +426,12 @@ public class Message<T>
{
return (TraceType) params.getOrDefault(ParamType.TRACE_TYPE, TraceType.QUERY);
}
+
+ @Nullable
+ public Map<String,byte[]> customParams()
+ {
+ return (Map<String,byte[]>) params.get(ParamType.CUSTOM_MAP);
+ }
}
@SuppressWarnings("WeakerAccess")
@@ -473,6 +482,15 @@ public class Message<T>
return this;
}
+ public Builder<T> withCustomParam(String name, byte[] value)
+ {
+ Map<String,byte[]> customParams = (Map<String,byte[]>)
+ params.computeIfAbsent(ParamType.CUSTOM_MAP, (t) -> new HashMap<String,byte[]>());
+
+ customParams.put(name, value);
+ return this;
+ }
+
/**
* A shortcut to add tracing params.
* Effectively, it is the same as calling {@link #withParam(ParamType, Object)} with tracing params
diff --git a/src/java/org/apache/cassandra/net/ParamType.java b/src/java/org/apache/cassandra/net/ParamType.java
index 5d4b982ce5..007605b260 100644
--- a/src/java/org/apache/cassandra/net/ParamType.java
+++ b/src/java/org/apache/cassandra/net/ParamType.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.UUIDSerializer;
import static java.lang.Math.max;
+
import static org.apache.cassandra.locator.InetAddressAndPort.FwdFrmSerializer.fwdFrmSerializer;
/**
@@ -54,7 +55,9 @@ public enum ParamType
TRACE_TYPE (6, "TraceType", Tracing.traceTypeSerializer),
@Deprecated
- TRACK_REPAIRED_DATA (7, "TrackRepaired", LegacyFlag.serializer);
+ TRACK_REPAIRED_DATA (7, "TrackRepaired", LegacyFlag.serializer),
+
+ CUSTOM_MAP (14, "CUSTOM", CustomParamsSerializer.serializer);
final int id;
@Deprecated final String legacyAlias; // pre-4.0 we used to serialize entire param name string
diff --git a/src/java/org/apache/cassandra/utils/ByteArrayUtil.java b/src/java/org/apache/cassandra/utils/ByteArrayUtil.java
index 75734ada96..58229c0863 100644
--- a/src/java/org/apache/cassandra/utils/ByteArrayUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteArrayUtil.java
@@ -26,6 +26,8 @@ import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
// mostly copied from java.io.Bits
public class ByteArrayUtil
@@ -230,6 +232,17 @@ public class ByteArrayUtil
out.write(buffer);
}
+ public static void writeWithVIntLength(byte[] bytes, DataOutputPlus out) throws IOException
+ {
+ out.writeUnsignedVInt(bytes.length);
+ out.write(bytes);
+ }
+
+ public static int serializedSizeWithVIntLength(byte[] bytes)
+ {
+ return TypeSizes.sizeofUnsignedVInt(bytes.length) + bytes.length;
+ }
+
public static byte[] readWithLength(DataInput in) throws IOException
{
byte[] b = new byte[in.readInt()];
@@ -244,6 +257,17 @@ public class ByteArrayUtil
return b;
}
+ public static byte[] readWithVIntLength(DataInputPlus in) throws IOException
+ {
+ int length = (int)in.readUnsignedVInt();
+ if (length < 0)
+ throw new IOException("Corrupt (negative) value length encountered");
+
+ byte[] bytes = new byte[length];
+ in.readFully(bytes);
+ return bytes;
+ }
+
public static void copyBytes(byte[] src, int srcPos, byte[] dst, int dstPos, int length)
{
System.arraycopy(src, srcPos, dst, dstPos, length);
diff --git a/test/unit/org/apache/cassandra/net/MessageTest.java b/test/unit/org/apache/cassandra/net/MessageTest.java
index a0deadfd3e..d3f56177eb 100644
--- a/test/unit/org/apache/cassandra/net/MessageTest.java
+++ b/test/unit/org/apache/cassandra/net/MessageTest.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.net;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@@ -37,6 +39,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.tracing.Tracing.TraceType;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.FreeRunningClock;
@@ -49,6 +52,7 @@ import static org.apache.cassandra.net.ParamType.RESPOND_TO;
import static org.apache.cassandra.net.ParamType.TRACE_SESSION;
import static org.apache.cassandra.net.ParamType.TRACE_TYPE;
import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+
import static org.junit.Assert.*;
public class MessageTest
@@ -211,6 +215,38 @@ public class MessageTest
assertNull(msg.header.traceSession());
}
+ @Test
+ public void testCustomParams() throws CharacterCodingException, IOException
+ {
+ long id = 1;
+ InetAddressAndPort from = FBUtilities.getLocalAddressAndPort();
+
+ Message<NoPayload> msg =
+ Message.builder(Verb._TEST_1, noPayload)
+ .withId(1)
+ .from(from)
+ .withCustomParam("custom1", "custom1value".getBytes(StandardCharsets.UTF_8))
+ .withCustomParam("custom2", "custom2value".getBytes(StandardCharsets.UTF_8))
+ .build();
+
+ assertEquals(id, msg.id());
+ assertEquals(from, msg.from());
+ assertEquals(2, msg.header.customParams().size());
+ assertEquals("custom1value", new String(msg.header.customParams().get("custom1"), StandardCharsets.UTF_8));
+ assertEquals("custom2value", new String(msg.header.customParams().get("custom2"), StandardCharsets.UTF_8));
+
+ DataOutputBuffer out = DataOutputBuffer.scratchBuffer.get();
+ Message.serializer.serialize(msg, out, VERSION_40);
+ DataInputBuffer in = new DataInputBuffer(out.buffer(), true);
+ msg = Message.serializer.deserialize(in, from, VERSION_40);
+
+ assertEquals(id, msg.id());
+ assertEquals(from, msg.from());
+ assertEquals(2, msg.header.customParams().size());
+ assertEquals("custom1value", new String(msg.header.customParams().get("custom1"), StandardCharsets.UTF_8));
+ assertEquals("custom2value", new String(msg.header.customParams().get("custom2"), StandardCharsets.UTF_8));
+ }
+
private void testAddTraceHeaderWithType(TraceType traceType)
{
try
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org