You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/07/21 17:17:56 UTC

[cassandra] 01/02: Fix pre-4.0 FWD_FRM parameter serializer

This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 4d4e1e88d095e10d53b59bf004a59709c3cee186
Author: Jon Meredith <jm...@apple.com>
AuthorDate: Fri Jul 16 22:08:23 2021 -0600

    Fix pre-4.0 FWD_FRM parameter serializer
    
    patch by Jon Meredith; reviewed by Caleb Rackliffe and Brandon Williams for CASSANDRA-16808
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/db/MutationVerbHandler.java   |   2 +-
 .../cassandra/locator/InetAddressAndPort.java      |  85 ++++++++++++++++-
 .../org/apache/cassandra/net/ForwardingInfo.java   |   8 +-
 src/java/org/apache/cassandra/net/Message.java     |  14 ++-
 src/java/org/apache/cassandra/net/ParamType.java   |   5 +-
 .../cassandra/distributed/UpgradeableCluster.java  |  11 ++-
 .../upgrade/MixedModeMessageForwardTest.java       | 104 +++++++++++++++++++++
 .../distributed/upgrade/UpgradeTestBase.java       |   9 +-
 9 files changed, 223 insertions(+), 16 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 48a500c..b158970 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0.0
+ * Fix pre-4.0 FWD_FRM parameter serializer (CASSANDRA-16808)
  * Fix fwd to/from headers in DC write forwarding (CASSANDRA-16797)
  * Fix CassandraVersion::compareTo (CASSANDRA-16794)
  * BinLog does not close chronicle queue leaving this to GC to cleanup (CASSANDRA-16774)
diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
index bcb9cc7..1d4f868 100644
--- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
@@ -73,7 +73,7 @@ public class MutationVerbHandler implements IVerbHandler<Mutation>
                    .withParam(ParamType.RESPOND_TO, originalMessage.from())
                    .withoutParam(ParamType.FORWARD_TO);
 
-        boolean useSameMessageID = forwardTo.useSameMessageID();
+        boolean useSameMessageID = forwardTo.useSameMessageID(originalMessage.id());
         // reuse the same Message if all ids are identical (as they will be for 4.0+ node originated messages)
         Message<Mutation> message = useSameMessageID ? builder.build() : null;
 
diff --git a/src/java/org/apache/cassandra/locator/InetAddressAndPort.java b/src/java/org/apache/cassandra/locator/InetAddressAndPort.java
index 06c4ea6..6e67a23 100644
--- a/src/java/org/apache/cassandra/locator/InetAddressAndPort.java
+++ b/src/java/org/apache/cassandra/locator/InetAddressAndPort.java
@@ -24,7 +24,6 @@ import java.net.Inet6Address;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.Objects;
 import java.util.regex.Pattern;
 
 import com.google.common.base.Preconditions;
@@ -278,8 +277,10 @@ public final class InetAddressAndPort implements Comparable<InetAddressAndPort>,
         return defaultPort;
     }
 
-    /*
+    /**
      * As of version 4.0 the endpoint description includes a port number as an unsigned short
+     * This serializer matches the 3.0 CompactEndpointSerializationHelper, encoding the number of address bytes
+     * in a single byte before the address itself.
      */
     public static final class Serializer implements IVersionedSerializer<InetAddressAndPort>
     {
@@ -382,4 +383,84 @@ public final class InetAddressAndPort implements Comparable<InetAddressAndPort>,
             }
         }
     }
+
+    /** Serializer for handling FWD_FRM message parameters. Pre-4.0 deserialization is a special
+     * case in the message
+     */
+    public static final class FwdFrmSerializer implements IVersionedSerializer<InetAddressAndPort>
+    {
+        public static final FwdFrmSerializer fwdFrmSerializer = new FwdFrmSerializer();
+        private FwdFrmSerializer() { }
+
+        public void serialize(InetAddressAndPort endpoint, DataOutputPlus out, int version) throws IOException
+        {
+            byte[] buf = endpoint.addressBytes;
+
+            if (version >= MessagingService.VERSION_40)
+            {
+                out.writeByte(buf.length + 2);
+                out.write(buf);
+                out.writeShort(endpoint.port);
+            }
+            else
+            {
+                out.write(buf);
+            }
+        }
+
+        public long serializedSize(InetAddressAndPort from, int version)
+        {
+            //4.0 includes a port number
+            if (version >= MessagingService.VERSION_40)
+            {
+                if (from.address instanceof Inet4Address)
+                    return 1 + 4 + 2;
+                assert from.address instanceof Inet6Address;
+                return 1 + 16 + 2;
+            }
+            else
+            {
+                if (from.address instanceof Inet4Address)
+                    return 4;
+                assert from.address instanceof Inet6Address;
+                return 16;
+            }
+        }
+
+        @Override
+        public InetAddressAndPort deserialize(DataInputPlus in, int version) throws IOException
+        {
+            if (version >= MessagingService.VERSION_40)
+            {
+                int size = in.readByte() & 0xFF;
+                switch (size)
+                {
+                    //Address and one port
+                    case 6:
+                    case 18:
+                    {
+                        byte[] bytes = new byte[size - 2];
+                        in.readFully(bytes);
+
+                        int port = in.readShort() & 0xFFFF;
+                        return getByAddressOverrideDefaults(InetAddress.getByAddress(bytes), bytes, port);
+                    }
+                    default:
+                        throw new AssertionError("Unexpected size " + size);
+                }
+            }
+            else
+            {
+                throw new IllegalStateException("FWD_FRM deserializations should be special-cased pre-4.0");
+            }
+        }
+
+        public InetAddressAndPort pre40DeserializeWithLength(DataInputPlus in, int version, int length) throws IOException
+        {
+            assert length == 4 || length == 16 : "unexpected length " + length;
+            byte[] from = new byte[length];
+            in.readFully(from, 0, length);
+            return InetAddressAndPort.getByAddress(from);
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/net/ForwardingInfo.java b/src/java/org/apache/cassandra/net/ForwardingInfo.java
index 737da48..76e2a75 100644
--- a/src/java/org/apache/cassandra/net/ForwardingInfo.java
+++ b/src/java/org/apache/cassandra/net/ForwardingInfo.java
@@ -58,13 +58,9 @@ public final class ForwardingInfo implements Serializable
      * @return {@code true} if all host are to use the same message id, {@code false} otherwise. Starting with 4.0 and
      * above, we should be reusing the same id, always, but it won't always be true until 3.0/3.11 are phased out.
      */
-    public boolean useSameMessageID()
+    public boolean useSameMessageID(long id)
     {
-        if (messageIds.length < 2)
-            return true;
-
-        long id = messageIds[0];
-        for (int i = 1; i < messageIds.length; i++)
+        for (int i = 0; i < messageIds.length; i++)
             if (id != messageIds[i])
                 return false;
 
diff --git a/src/java/org/apache/cassandra/net/Message.java b/src/java/org/apache/cassandra/net/Message.java
index 214c5c0..ca74012 100644
--- a/src/java/org/apache/cassandra/net/Message.java
+++ b/src/java/org/apache/cassandra/net/Message.java
@@ -1151,9 +1151,21 @@ public class Message<T>
                     : in.readInt();
 
                 if (null != type)
-                    params.put(type, type.serializer.deserialize(in, version));
+                {
+                    // Have to special case deserializer as pre-4.0 needs length to decode correctly
+                    if (version < VERSION_40 && type == ParamType.RESPOND_TO)
+                    {
+                        params.put(type, InetAddressAndPort.FwdFrmSerializer.fwdFrmSerializer.pre40DeserializeWithLength(in, version, length));
+                    }
+                    else
+                    {
+                        params.put(type, type.serializer.deserialize(in, version));
+                    }
+                }
                 else
+                {
                     in.skipBytesFully(length); // forward compatibiliy with minor version changes
+                }
             }
 
             return params;
diff --git a/src/java/org/apache/cassandra/net/ParamType.java b/src/java/org/apache/cassandra/net/ParamType.java
index d82689d..5d4b982 100644
--- a/src/java/org/apache/cassandra/net/ParamType.java
+++ b/src/java/org/apache/cassandra/net/ParamType.java
@@ -24,12 +24,11 @@ import javax.annotation.Nullable;
 
 import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 import static java.lang.Math.max;
-import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAddressAndPortSerializer;
+import static org.apache.cassandra.locator.InetAddressAndPort.FwdFrmSerializer.fwdFrmSerializer;
 
 /**
  * Type names and serializers for various parameters that can be put in {@link Message} params map.
@@ -42,7 +41,7 @@ import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAdd
 public enum ParamType
 {
     FORWARD_TO          (0, "FWD_TO",        ForwardingInfo.serializer),
-    RESPOND_TO          (1, "FWD_FRM",       inetAddressAndPortSerializer),
+    RESPOND_TO          (1, "FWD_FRM",       fwdFrmSerializer),
 
     @Deprecated
     FAILURE_RESPONSE    (2, "FAIL",          LegacyFlag.serializer),
diff --git a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
index 9223c3d..7a4d2bb 100644
--- a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
@@ -60,10 +60,17 @@ public class UpgradeableCluster extends AbstractCluster<IUpgradeableInstance> im
     {
         return build(nodeCount).start();
     }
-
     public static UpgradeableCluster create(int nodeCount, Versions.Version version, Consumer<IInstanceConfig> configUpdater) throws IOException
     {
-        return build(nodeCount).withConfig(configUpdater).withVersion(version).start();
+        return create(nodeCount, version, configUpdater, null);
+    }
+
+    public static UpgradeableCluster create(int nodeCount, Versions.Version version, Consumer<IInstanceConfig> configUpdater, Consumer<Builder> builderUpdater) throws IOException
+    {
+        Builder builder = build(nodeCount).withConfig(configUpdater).withVersion(version);
+        if (builderUpdater != null)
+            builderUpdater.accept(builder);
+        return builder.start();
     }
 
     public static UpgradeableCluster create(int nodeCount, Versions.Version version) throws Throwable
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java
new file mode 100644
index 0000000..e82e96f
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.shared.Shared;
+import org.apache.cassandra.distributed.shared.Versions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.*;
+
+@Shared
+public class MixedModeMessageForwardTest extends UpgradeTestBase
+{
+    private static final Logger logger = LoggerFactory.getLogger(MixedModeMessageForwardTest.class);
+    private static int nextKey = 1;
+    private static String TABLE = "tbl";
+    private static String INSERT_QUERY = String.format("INSERT INTO %s.%s(pk) VALUES (?)", KEYSPACE, TABLE);
+    private static String CHECK_QUERY = String.format("SELECT pk FROM %s.%s WHERE pk = ?", KEYSPACE, TABLE);
+
+    private void writeReadTest(UpgradeableCluster cluster)
+    {
+        // Coordinate a write from each node and then check present on all replicas
+        int readKey = nextKey;
+        for (int coordId = 1; coordId <= cluster.size(); coordId++)
+        {
+            logger.info("Coordinating CL.ALL Insert from node{} ", coordId);
+            cluster.get(coordId).coordinator().execute(INSERT_QUERY, ConsistencyLevel.ALL, nextKey++);
+        }
+
+        for (int coordId = 1; coordId <= cluster.size(); coordId++)
+        {
+            for (int nodeId = 1; nodeId <= cluster.size(); nodeId++) {
+                Object[][] results = cluster.get(nodeId).executeInternal(CHECK_QUERY, readKey);
+                assertRows(results, row(readKey));
+            }
+            readKey++;
+        }
+    }
+
+    /* Verify that messages sent with sendToHintedReplicas to non-local DCs
+     * are forwarded on to the hosts there.
+     *
+     * 1) creates a mixed cluster with multiple datacenters and a keyspace
+     *    configured to write to all replicas in the datacenter
+     * 2) check the original single-version cluster by issuing an INSERT
+     *    mutation from a coordinator on each node, then check that value
+     *    has locally been written to each of the nodes.
+     * 3) Upgrade nodes one at a time, rechecking that all writes are forwarded.
+     */
+    @Test
+    public void checkWritesForwardedToOtherDcTest() throws Throwable
+    {
+        int numDCs = 2;
+        int nodesPerDc = 2;
+        String ntsArgs = IntStream.range(1, numDCs + 1)
+                                  .mapToObj(dc -> String.format("'datacenter%d' : %d", dc, nodesPerDc))
+                                  .collect(Collectors.joining(","));
+
+        new TestCase()
+        .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK).set("request_timeout_in_ms", 30000))
+        .withBuilder(b -> b.withRacks(numDCs, 1, nodesPerDc))
+        .nodes(numDCs * nodesPerDc)
+        .upgrade(Versions.Major.v30, Versions.Major.v4)
+        .setup(cluster -> {
+            cluster.schemaChange("ALTER KEYSPACE " + KEYSPACE +
+                " WITH replication = {'class': 'NetworkTopologyStrategy', " + ntsArgs + " };");
+
+            cluster.schemaChange("CREATE TABLE "+ KEYSPACE + "." + TABLE + " (pk int, PRIMARY KEY(pk))");
+
+            logger.info("Testing after setup, all nodes running {}", cluster.get(1).getReleaseVersionString());
+            writeReadTest(cluster);
+        })
+        .runAfterNodeUpgrade((UpgradeableCluster cluster, int nodeId) -> {
+            // Should be able to coordinate a write to any node and have a copy appear locally on all others
+            logger.info("Testing after upgrading node{} to {}", nodeId, cluster.get(nodeId).getReleaseVersionString());
+            writeReadTest(cluster);
+        })
+        .run();
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
index 90254d1..45fc197 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
@@ -100,6 +100,7 @@ public class UpgradeTestBase extends DistributedTestBase
         private RunOnCluster runAfterClusterUpgrade;
         private final Set<Integer> nodesToUpgrade = new LinkedHashSet<>();
         private Consumer<IInstanceConfig> configConsumer;
+        private Consumer<UpgradeableCluster.Builder> builderConsumer;
 
         public TestCase()
         {
@@ -162,6 +163,12 @@ public class UpgradeTestBase extends DistributedTestBase
             return this;
         }
 
+        public TestCase withBuilder(Consumer<UpgradeableCluster.Builder> builder)
+        {
+            this.builderConsumer = builder;
+            return this;
+        }
+
         public void run() throws Throwable
         {
             if (setup == null)
@@ -182,7 +189,7 @@ public class UpgradeTestBase extends DistributedTestBase
 
             for (TestVersions upgrade : this.upgrade)
             {
-                try (UpgradeableCluster cluster = init(UpgradeableCluster.create(nodeCount, upgrade.initial, configConsumer)))
+                try (UpgradeableCluster cluster = init(UpgradeableCluster.create(nodeCount, upgrade.initial, configConsumer, builderConsumer)))
                 {
                     setup.run(cluster);
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org