You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/07/21 17:17:56 UTC
[cassandra] 01/02: Fix pre-4.0 FWD_FRM parameter serializer
This is an automated email from the ASF dual-hosted git repository.
maedhroz pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 4d4e1e88d095e10d53b59bf004a59709c3cee186
Author: Jon Meredith <jm...@apple.com>
AuthorDate: Fri Jul 16 22:08:23 2021 -0600
Fix pre-4.0 FWD_FRM parameter serializer
patch by Jon Meredith; reviewed by Caleb Rackliffe and Brandon Williams for CASSANDRA-16808
---
CHANGES.txt | 1 +
.../apache/cassandra/db/MutationVerbHandler.java | 2 +-
.../cassandra/locator/InetAddressAndPort.java | 85 ++++++++++++++++-
.../org/apache/cassandra/net/ForwardingInfo.java | 8 +-
src/java/org/apache/cassandra/net/Message.java | 14 ++-
src/java/org/apache/cassandra/net/ParamType.java | 5 +-
.../cassandra/distributed/UpgradeableCluster.java | 11 ++-
.../upgrade/MixedModeMessageForwardTest.java | 104 +++++++++++++++++++++
.../distributed/upgrade/UpgradeTestBase.java | 9 +-
9 files changed, 223 insertions(+), 16 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 48a500c..b158970 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0.0
+ * Fix pre-4.0 FWD_FRM parameter serializer (CASSANDRA-16808)
* Fix fwd to/from headers in DC write forwarding (CASSANDRA-16797)
* Fix CassandraVersion::compareTo (CASSANDRA-16794)
* BinLog does not close chronicle queue leaving this to GC to cleanup (CASSANDRA-16774)
diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
index bcb9cc7..1d4f868 100644
--- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
@@ -73,7 +73,7 @@ public class MutationVerbHandler implements IVerbHandler<Mutation>
.withParam(ParamType.RESPOND_TO, originalMessage.from())
.withoutParam(ParamType.FORWARD_TO);
- boolean useSameMessageID = forwardTo.useSameMessageID();
+ boolean useSameMessageID = forwardTo.useSameMessageID(originalMessage.id());
// reuse the same Message if all ids are identical (as they will be for 4.0+ node originated messages)
Message<Mutation> message = useSameMessageID ? builder.build() : null;
diff --git a/src/java/org/apache/cassandra/locator/InetAddressAndPort.java b/src/java/org/apache/cassandra/locator/InetAddressAndPort.java
index 06c4ea6..6e67a23 100644
--- a/src/java/org/apache/cassandra/locator/InetAddressAndPort.java
+++ b/src/java/org/apache/cassandra/locator/InetAddressAndPort.java
@@ -24,7 +24,6 @@ import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.util.Objects;
import java.util.regex.Pattern;
import com.google.common.base.Preconditions;
@@ -278,8 +277,10 @@ public final class InetAddressAndPort implements Comparable<InetAddressAndPort>,
return defaultPort;
}
- /*
+ /**
* As of version 4.0 the endpoint description includes a port number as an unsigned short
+ * This serializer matches the 3.0 CompactEndpointSerializationHelper, encoding the number of address bytes
+ * in a single byte before the address itself.
*/
public static final class Serializer implements IVersionedSerializer<InetAddressAndPort>
{
@@ -382,4 +383,84 @@ public final class InetAddressAndPort implements Comparable<InetAddressAndPort>,
}
}
}
+
+ /** Serializer for handling FWD_FRM message parameters. Pre-4.0 deserialization is a special
+ * case in the message
+ */
+ public static final class FwdFrmSerializer implements IVersionedSerializer<InetAddressAndPort>
+ {
+ public static final FwdFrmSerializer fwdFrmSerializer = new FwdFrmSerializer();
+ private FwdFrmSerializer() { }
+
+ public void serialize(InetAddressAndPort endpoint, DataOutputPlus out, int version) throws IOException
+ {
+ byte[] buf = endpoint.addressBytes;
+
+ if (version >= MessagingService.VERSION_40)
+ {
+ out.writeByte(buf.length + 2);
+ out.write(buf);
+ out.writeShort(endpoint.port);
+ }
+ else
+ {
+ out.write(buf);
+ }
+ }
+
+ public long serializedSize(InetAddressAndPort from, int version)
+ {
+ //4.0 includes a port number
+ if (version >= MessagingService.VERSION_40)
+ {
+ if (from.address instanceof Inet4Address)
+ return 1 + 4 + 2;
+ assert from.address instanceof Inet6Address;
+ return 1 + 16 + 2;
+ }
+ else
+ {
+ if (from.address instanceof Inet4Address)
+ return 4;
+ assert from.address instanceof Inet6Address;
+ return 16;
+ }
+ }
+
+ @Override
+ public InetAddressAndPort deserialize(DataInputPlus in, int version) throws IOException
+ {
+ if (version >= MessagingService.VERSION_40)
+ {
+ int size = in.readByte() & 0xFF;
+ switch (size)
+ {
+ //Address and one port
+ case 6:
+ case 18:
+ {
+ byte[] bytes = new byte[size - 2];
+ in.readFully(bytes);
+
+ int port = in.readShort() & 0xFFFF;
+ return getByAddressOverrideDefaults(InetAddress.getByAddress(bytes), bytes, port);
+ }
+ default:
+ throw new AssertionError("Unexpected size " + size);
+ }
+ }
+ else
+ {
+ throw new IllegalStateException("FWD_FRM deserializations should be special-cased pre-4.0");
+ }
+ }
+
+ public InetAddressAndPort pre40DeserializeWithLength(DataInputPlus in, int version, int length) throws IOException
+ {
+ assert length == 4 || length == 16 : "unexpected length " + length;
+ byte[] from = new byte[length];
+ in.readFully(from, 0, length);
+ return InetAddressAndPort.getByAddress(from);
+ }
+ }
}
diff --git a/src/java/org/apache/cassandra/net/ForwardingInfo.java b/src/java/org/apache/cassandra/net/ForwardingInfo.java
index 737da48..76e2a75 100644
--- a/src/java/org/apache/cassandra/net/ForwardingInfo.java
+++ b/src/java/org/apache/cassandra/net/ForwardingInfo.java
@@ -58,13 +58,9 @@ public final class ForwardingInfo implements Serializable
* @return {@code true} if all host are to use the same message id, {@code false} otherwise. Starting with 4.0 and
* above, we should be reusing the same id, always, but it won't always be true until 3.0/3.11 are phased out.
*/
- public boolean useSameMessageID()
+ public boolean useSameMessageID(long id)
{
- if (messageIds.length < 2)
- return true;
-
- long id = messageIds[0];
- for (int i = 1; i < messageIds.length; i++)
+ for (int i = 0; i < messageIds.length; i++)
if (id != messageIds[i])
return false;
diff --git a/src/java/org/apache/cassandra/net/Message.java b/src/java/org/apache/cassandra/net/Message.java
index 214c5c0..ca74012 100644
--- a/src/java/org/apache/cassandra/net/Message.java
+++ b/src/java/org/apache/cassandra/net/Message.java
@@ -1151,9 +1151,21 @@ public class Message<T>
: in.readInt();
if (null != type)
- params.put(type, type.serializer.deserialize(in, version));
+ {
+ // Have to special case deserializer as pre-4.0 needs length to decode correctly
+ if (version < VERSION_40 && type == ParamType.RESPOND_TO)
+ {
+ params.put(type, InetAddressAndPort.FwdFrmSerializer.fwdFrmSerializer.pre40DeserializeWithLength(in, version, length));
+ }
+ else
+ {
+ params.put(type, type.serializer.deserialize(in, version));
+ }
+ }
else
+ {
in.skipBytesFully(length); // forward compatibiliy with minor version changes
+ }
}
return params;
diff --git a/src/java/org/apache/cassandra/net/ParamType.java b/src/java/org/apache/cassandra/net/ParamType.java
index d82689d..5d4b982 100644
--- a/src/java/org/apache/cassandra/net/ParamType.java
+++ b/src/java/org/apache/cassandra/net/ParamType.java
@@ -24,12 +24,11 @@ import javax.annotation.Nullable;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.UUIDSerializer;
import static java.lang.Math.max;
-import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAddressAndPortSerializer;
+import static org.apache.cassandra.locator.InetAddressAndPort.FwdFrmSerializer.fwdFrmSerializer;
/**
* Type names and serializers for various parameters that can be put in {@link Message} params map.
@@ -42,7 +41,7 @@ import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAdd
public enum ParamType
{
FORWARD_TO (0, "FWD_TO", ForwardingInfo.serializer),
- RESPOND_TO (1, "FWD_FRM", inetAddressAndPortSerializer),
+ RESPOND_TO (1, "FWD_FRM", fwdFrmSerializer),
@Deprecated
FAILURE_RESPONSE (2, "FAIL", LegacyFlag.serializer),
diff --git a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
index 9223c3d..7a4d2bb 100644
--- a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
@@ -60,10 +60,17 @@ public class UpgradeableCluster extends AbstractCluster<IUpgradeableInstance> im
{
return build(nodeCount).start();
}
-
public static UpgradeableCluster create(int nodeCount, Versions.Version version, Consumer<IInstanceConfig> configUpdater) throws IOException
{
- return build(nodeCount).withConfig(configUpdater).withVersion(version).start();
+ return create(nodeCount, version, configUpdater, null);
+ }
+
+ public static UpgradeableCluster create(int nodeCount, Versions.Version version, Consumer<IInstanceConfig> configUpdater, Consumer<Builder> builderUpdater) throws IOException
+ {
+ Builder builder = build(nodeCount).withConfig(configUpdater).withVersion(version);
+ if (builderUpdater != null)
+ builderUpdater.accept(builder);
+ return builder.start();
}
public static UpgradeableCluster create(int nodeCount, Versions.Version version) throws Throwable
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java
new file mode 100644
index 0000000..e82e96f
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.shared.Shared;
+import org.apache.cassandra.distributed.shared.Versions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.*;
+
+@Shared
+public class MixedModeMessageForwardTest extends UpgradeTestBase
+{
+ private static final Logger logger = LoggerFactory.getLogger(MixedModeMessageForwardTest.class);
+ private static int nextKey = 1;
+ private static String TABLE = "tbl";
+ private static String INSERT_QUERY = String.format("INSERT INTO %s.%s(pk) VALUES (?)", KEYSPACE, TABLE);
+ private static String CHECK_QUERY = String.format("SELECT pk FROM %s.%s WHERE pk = ?", KEYSPACE, TABLE);
+
+ private void writeReadTest(UpgradeableCluster cluster)
+ {
+ // Coordinate a write from each node and then check present on all replicas
+ int readKey = nextKey;
+ for (int coordId = 1; coordId <= cluster.size(); coordId++)
+ {
+ logger.info("Coordinating CL.ALL Insert from node{} ", coordId);
+ cluster.get(coordId).coordinator().execute(INSERT_QUERY, ConsistencyLevel.ALL, nextKey++);
+ }
+
+ for (int coordId = 1; coordId <= cluster.size(); coordId++)
+ {
+ for (int nodeId = 1; nodeId <= cluster.size(); nodeId++) {
+ Object[][] results = cluster.get(nodeId).executeInternal(CHECK_QUERY, readKey);
+ assertRows(results, row(readKey));
+ }
+ readKey++;
+ }
+ }
+
+ /* Verify that messages sent with sendToHintedReplicas to non-local DCs
+ * are forwarded on to the hosts there.
+ *
+ * 1) creates a mixed cluster with multiple datacenters and a keyspace
+ * configured to write to all replicas in the datacenter
+ * 2) check the original single-version cluster by issuing an INSERT
+ * mutation from a coordinator on each node, then check that value
+ * has locally been written to each of the nodes.
+ * 3) Upgrade nodes one at a time, rechecking that all writes are forwarded.
+ */
+ @Test
+ public void checkWritesForwardedToOtherDcTest() throws Throwable
+ {
+ int numDCs = 2;
+ int nodesPerDc = 2;
+ String ntsArgs = IntStream.range(1, numDCs + 1)
+ .mapToObj(dc -> String.format("'datacenter%d' : %d", dc, nodesPerDc))
+ .collect(Collectors.joining(","));
+
+ new TestCase()
+ .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK).set("request_timeout_in_ms", 30000))
+ .withBuilder(b -> b.withRacks(numDCs, 1, nodesPerDc))
+ .nodes(numDCs * nodesPerDc)
+ .upgrade(Versions.Major.v30, Versions.Major.v4)
+ .setup(cluster -> {
+ cluster.schemaChange("ALTER KEYSPACE " + KEYSPACE +
+ " WITH replication = {'class': 'NetworkTopologyStrategy', " + ntsArgs + " };");
+
+ cluster.schemaChange("CREATE TABLE "+ KEYSPACE + "." + TABLE + " (pk int, PRIMARY KEY(pk))");
+
+ logger.info("Testing after setup, all nodes running {}", cluster.get(1).getReleaseVersionString());
+ writeReadTest(cluster);
+ })
+ .runAfterNodeUpgrade((UpgradeableCluster cluster, int nodeId) -> {
+ // Should be able to coordinate a write to any node and have a copy appear locally on all others
+ logger.info("Testing after upgrading node{} to {}", nodeId, cluster.get(nodeId).getReleaseVersionString());
+ writeReadTest(cluster);
+ })
+ .run();
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
index 90254d1..45fc197 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
@@ -100,6 +100,7 @@ public class UpgradeTestBase extends DistributedTestBase
private RunOnCluster runAfterClusterUpgrade;
private final Set<Integer> nodesToUpgrade = new LinkedHashSet<>();
private Consumer<IInstanceConfig> configConsumer;
+ private Consumer<UpgradeableCluster.Builder> builderConsumer;
public TestCase()
{
@@ -162,6 +163,12 @@ public class UpgradeTestBase extends DistributedTestBase
return this;
}
+ public TestCase withBuilder(Consumer<UpgradeableCluster.Builder> builder)
+ {
+ this.builderConsumer = builder;
+ return this;
+ }
+
public void run() throws Throwable
{
if (setup == null)
@@ -182,7 +189,7 @@ public class UpgradeTestBase extends DistributedTestBase
for (TestVersions upgrade : this.upgrade)
{
- try (UpgradeableCluster cluster = init(UpgradeableCluster.create(nodeCount, upgrade.initial, configConsumer)))
+ try (UpgradeableCluster cluster = init(UpgradeableCluster.create(nodeCount, upgrade.initial, configConsumer, builderConsumer)))
{
setup.run(cluster);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org