You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/07/21 17:17:55 UTC

[cassandra] branch cassandra-4.0 updated (e0cecae -> cdf68e3)

This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a change to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from e0cecae  Merge branch 'cassandra-3.11' into cassandra-4.0
     new 4d4e1e8  Fix pre-4.0 FWD_FRM parameter serializer
     new cdf68e3  Merge branch 'cassandra-4.0.0' into cassandra-4.0

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/db/MutationVerbHandler.java   |   2 +-
 .../cassandra/locator/InetAddressAndPort.java      |  85 ++++++++++++++++-
 .../org/apache/cassandra/net/ForwardingInfo.java   |   8 +-
 src/java/org/apache/cassandra/net/Message.java     |  14 ++-
 src/java/org/apache/cassandra/net/ParamType.java   |   5 +-
 .../cassandra/distributed/UpgradeableCluster.java  |  11 ++-
 .../upgrade/MixedModeMessageForwardTest.java       | 104 +++++++++++++++++++++
 .../distributed/upgrade/UpgradeTestBase.java       |   9 +-
 9 files changed, 223 insertions(+), 16 deletions(-)
 create mode 100644 test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/02: Fix pre-4.0 FWD_FRM parameter serializer

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 4d4e1e88d095e10d53b59bf004a59709c3cee186
Author: Jon Meredith <jm...@apple.com>
AuthorDate: Fri Jul 16 22:08:23 2021 -0600

    Fix pre-4.0 FWD_FRM parameter serializer
    
    patch by Jon Meredith; reviewed by Caleb Rackliffe and Brandon Williams for CASSANDRA-16808
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/db/MutationVerbHandler.java   |   2 +-
 .../cassandra/locator/InetAddressAndPort.java      |  85 ++++++++++++++++-
 .../org/apache/cassandra/net/ForwardingInfo.java   |   8 +-
 src/java/org/apache/cassandra/net/Message.java     |  14 ++-
 src/java/org/apache/cassandra/net/ParamType.java   |   5 +-
 .../cassandra/distributed/UpgradeableCluster.java  |  11 ++-
 .../upgrade/MixedModeMessageForwardTest.java       | 104 +++++++++++++++++++++
 .../distributed/upgrade/UpgradeTestBase.java       |   9 +-
 9 files changed, 223 insertions(+), 16 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 48a500c..b158970 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0.0
+ * Fix pre-4.0 FWD_FRM parameter serializer (CASSANDRA-16808)
  * Fix fwd to/from headers in DC write forwarding (CASSANDRA-16797)
  * Fix CassandraVersion::compareTo (CASSANDRA-16794)
  * BinLog does not close chronicle queue leaving this to GC to cleanup (CASSANDRA-16774)
diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
index bcb9cc7..1d4f868 100644
--- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
@@ -73,7 +73,7 @@ public class MutationVerbHandler implements IVerbHandler<Mutation>
                    .withParam(ParamType.RESPOND_TO, originalMessage.from())
                    .withoutParam(ParamType.FORWARD_TO);
 
-        boolean useSameMessageID = forwardTo.useSameMessageID();
+        boolean useSameMessageID = forwardTo.useSameMessageID(originalMessage.id());
         // reuse the same Message if all ids are identical (as they will be for 4.0+ node originated messages)
         Message<Mutation> message = useSameMessageID ? builder.build() : null;
 
diff --git a/src/java/org/apache/cassandra/locator/InetAddressAndPort.java b/src/java/org/apache/cassandra/locator/InetAddressAndPort.java
index 06c4ea6..6e67a23 100644
--- a/src/java/org/apache/cassandra/locator/InetAddressAndPort.java
+++ b/src/java/org/apache/cassandra/locator/InetAddressAndPort.java
@@ -24,7 +24,6 @@ import java.net.Inet6Address;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.Objects;
 import java.util.regex.Pattern;
 
 import com.google.common.base.Preconditions;
@@ -278,8 +277,10 @@ public final class InetAddressAndPort implements Comparable<InetAddressAndPort>,
         return defaultPort;
     }
 
-    /*
+    /**
      * As of version 4.0 the endpoint description includes a port number as an unsigned short
+     * This serializer matches the 3.0 CompactEndpointSerializationHelper, encoding the number of address bytes
+     * in a single byte before the address itself.
      */
     public static final class Serializer implements IVersionedSerializer<InetAddressAndPort>
     {
@@ -382,4 +383,84 @@ public final class InetAddressAndPort implements Comparable<InetAddressAndPort>,
             }
         }
     }
+
+    /** Serializer for handling FWD_FRM message parameters. Pre-4.0 deserialization is a special
+     * case in the message
+     */
+    public static final class FwdFrmSerializer implements IVersionedSerializer<InetAddressAndPort>
+    {
+        public static final FwdFrmSerializer fwdFrmSerializer = new FwdFrmSerializer();
+        private FwdFrmSerializer() { }
+
+        public void serialize(InetAddressAndPort endpoint, DataOutputPlus out, int version) throws IOException
+        {
+            byte[] buf = endpoint.addressBytes;
+
+            if (version >= MessagingService.VERSION_40)
+            {
+                out.writeByte(buf.length + 2);
+                out.write(buf);
+                out.writeShort(endpoint.port);
+            }
+            else
+            {
+                out.write(buf);
+            }
+        }
+
+        public long serializedSize(InetAddressAndPort from, int version)
+        {
+            //4.0 includes a port number
+            if (version >= MessagingService.VERSION_40)
+            {
+                if (from.address instanceof Inet4Address)
+                    return 1 + 4 + 2;
+                assert from.address instanceof Inet6Address;
+                return 1 + 16 + 2;
+            }
+            else
+            {
+                if (from.address instanceof Inet4Address)
+                    return 4;
+                assert from.address instanceof Inet6Address;
+                return 16;
+            }
+        }
+
+        @Override
+        public InetAddressAndPort deserialize(DataInputPlus in, int version) throws IOException
+        {
+            if (version >= MessagingService.VERSION_40)
+            {
+                int size = in.readByte() & 0xFF;
+                switch (size)
+                {
+                    //Address and one port
+                    case 6:
+                    case 18:
+                    {
+                        byte[] bytes = new byte[size - 2];
+                        in.readFully(bytes);
+
+                        int port = in.readShort() & 0xFFFF;
+                        return getByAddressOverrideDefaults(InetAddress.getByAddress(bytes), bytes, port);
+                    }
+                    default:
+                        throw new AssertionError("Unexpected size " + size);
+                }
+            }
+            else
+            {
+                throw new IllegalStateException("FWD_FRM deserializations should be special-cased pre-4.0");
+            }
+        }
+
+        public InetAddressAndPort pre40DeserializeWithLength(DataInputPlus in, int version, int length) throws IOException
+        {
+            assert length == 4 || length == 16 : "unexpected length " + length;
+            byte[] from = new byte[length];
+            in.readFully(from, 0, length);
+            return InetAddressAndPort.getByAddress(from);
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/net/ForwardingInfo.java b/src/java/org/apache/cassandra/net/ForwardingInfo.java
index 737da48..76e2a75 100644
--- a/src/java/org/apache/cassandra/net/ForwardingInfo.java
+++ b/src/java/org/apache/cassandra/net/ForwardingInfo.java
@@ -58,13 +58,9 @@ public final class ForwardingInfo implements Serializable
      * @return {@code true} if all host are to use the same message id, {@code false} otherwise. Starting with 4.0 and
      * above, we should be reusing the same id, always, but it won't always be true until 3.0/3.11 are phased out.
      */
-    public boolean useSameMessageID()
+    public boolean useSameMessageID(long id)
     {
-        if (messageIds.length < 2)
-            return true;
-
-        long id = messageIds[0];
-        for (int i = 1; i < messageIds.length; i++)
+        for (int i = 0; i < messageIds.length; i++)
             if (id != messageIds[i])
                 return false;
 
diff --git a/src/java/org/apache/cassandra/net/Message.java b/src/java/org/apache/cassandra/net/Message.java
index 214c5c0..ca74012 100644
--- a/src/java/org/apache/cassandra/net/Message.java
+++ b/src/java/org/apache/cassandra/net/Message.java
@@ -1151,9 +1151,21 @@ public class Message<T>
                     : in.readInt();
 
                 if (null != type)
-                    params.put(type, type.serializer.deserialize(in, version));
+                {
+                    // Have to special case deserializer as pre-4.0 needs length to decode correctly
+                    if (version < VERSION_40 && type == ParamType.RESPOND_TO)
+                    {
+                        params.put(type, InetAddressAndPort.FwdFrmSerializer.fwdFrmSerializer.pre40DeserializeWithLength(in, version, length));
+                    }
+                    else
+                    {
+                        params.put(type, type.serializer.deserialize(in, version));
+                    }
+                }
                 else
+                {
                     in.skipBytesFully(length); // forward compatibiliy with minor version changes
+                }
             }
 
             return params;
diff --git a/src/java/org/apache/cassandra/net/ParamType.java b/src/java/org/apache/cassandra/net/ParamType.java
index d82689d..5d4b982 100644
--- a/src/java/org/apache/cassandra/net/ParamType.java
+++ b/src/java/org/apache/cassandra/net/ParamType.java
@@ -24,12 +24,11 @@ import javax.annotation.Nullable;
 
 import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 import static java.lang.Math.max;
-import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAddressAndPortSerializer;
+import static org.apache.cassandra.locator.InetAddressAndPort.FwdFrmSerializer.fwdFrmSerializer;
 
 /**
  * Type names and serializers for various parameters that can be put in {@link Message} params map.
@@ -42,7 +41,7 @@ import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAdd
 public enum ParamType
 {
     FORWARD_TO          (0, "FWD_TO",        ForwardingInfo.serializer),
-    RESPOND_TO          (1, "FWD_FRM",       inetAddressAndPortSerializer),
+    RESPOND_TO          (1, "FWD_FRM",       fwdFrmSerializer),
 
     @Deprecated
     FAILURE_RESPONSE    (2, "FAIL",          LegacyFlag.serializer),
diff --git a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
index 9223c3d..7a4d2bb 100644
--- a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
@@ -60,10 +60,17 @@ public class UpgradeableCluster extends AbstractCluster<IUpgradeableInstance> im
     {
         return build(nodeCount).start();
     }
-
     public static UpgradeableCluster create(int nodeCount, Versions.Version version, Consumer<IInstanceConfig> configUpdater) throws IOException
     {
-        return build(nodeCount).withConfig(configUpdater).withVersion(version).start();
+        return create(nodeCount, version, configUpdater, null);
+    }
+
+    public static UpgradeableCluster create(int nodeCount, Versions.Version version, Consumer<IInstanceConfig> configUpdater, Consumer<Builder> builderUpdater) throws IOException
+    {
+        Builder builder = build(nodeCount).withConfig(configUpdater).withVersion(version);
+        if (builderUpdater != null)
+            builderUpdater.accept(builder);
+        return builder.start();
     }
 
     public static UpgradeableCluster create(int nodeCount, Versions.Version version) throws Throwable
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java
new file mode 100644
index 0000000..e82e96f
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.shared.Shared;
+import org.apache.cassandra.distributed.shared.Versions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.*;
+
+@Shared
+public class MixedModeMessageForwardTest extends UpgradeTestBase
+{
+    private static final Logger logger = LoggerFactory.getLogger(MixedModeMessageForwardTest.class);
+    private static int nextKey = 1;
+    private static String TABLE = "tbl";
+    private static String INSERT_QUERY = String.format("INSERT INTO %s.%s(pk) VALUES (?)", KEYSPACE, TABLE);
+    private static String CHECK_QUERY = String.format("SELECT pk FROM %s.%s WHERE pk = ?", KEYSPACE, TABLE);
+
+    private void writeReadTest(UpgradeableCluster cluster)
+    {
+        // Coordinate a write from each node and then check present on all replicas
+        int readKey = nextKey;
+        for (int coordId = 1; coordId <= cluster.size(); coordId++)
+        {
+            logger.info("Coordinating CL.ALL Insert from node{} ", coordId);
+            cluster.get(coordId).coordinator().execute(INSERT_QUERY, ConsistencyLevel.ALL, nextKey++);
+        }
+
+        for (int coordId = 1; coordId <= cluster.size(); coordId++)
+        {
+            for (int nodeId = 1; nodeId <= cluster.size(); nodeId++) {
+                Object[][] results = cluster.get(nodeId).executeInternal(CHECK_QUERY, readKey);
+                assertRows(results, row(readKey));
+            }
+            readKey++;
+        }
+    }
+
+    /* Verify that messages sent with sendToHintedReplicas to non-local DCs
+     * are forwarded on to the hosts there.
+     *
+     * 1) creates a mixed cluster with multiple datacenters and a keyspace
+     *    configured to write to all replicas in the datacenter
+     * 2) check the original single-version cluster by issuing an INSERT
+     *    mutation from a coordinator on each node, then check that value
+     *    has locally been written to each of the nodes.
+     * 3) Upgrade nodes one at a time, rechecking that all writes are forwarded.
+     */
+    @Test
+    public void checkWritesForwardedToOtherDcTest() throws Throwable
+    {
+        int numDCs = 2;
+        int nodesPerDc = 2;
+        String ntsArgs = IntStream.range(1, numDCs + 1)
+                                  .mapToObj(dc -> String.format("'datacenter%d' : %d", dc, nodesPerDc))
+                                  .collect(Collectors.joining(","));
+
+        new TestCase()
+        .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK).set("request_timeout_in_ms", 30000))
+        .withBuilder(b -> b.withRacks(numDCs, 1, nodesPerDc))
+        .nodes(numDCs * nodesPerDc)
+        .upgrade(Versions.Major.v30, Versions.Major.v4)
+        .setup(cluster -> {
+            cluster.schemaChange("ALTER KEYSPACE " + KEYSPACE +
+                " WITH replication = {'class': 'NetworkTopologyStrategy', " + ntsArgs + " };");
+
+            cluster.schemaChange("CREATE TABLE "+ KEYSPACE + "." + TABLE + " (pk int, PRIMARY KEY(pk))");
+
+            logger.info("Testing after setup, all nodes running {}", cluster.get(1).getReleaseVersionString());
+            writeReadTest(cluster);
+        })
+        .runAfterNodeUpgrade((UpgradeableCluster cluster, int nodeId) -> {
+            // Should be able to coordinate a write to any node and have a copy appear locally on all others
+            logger.info("Testing after upgrading node{} to {}", nodeId, cluster.get(nodeId).getReleaseVersionString());
+            writeReadTest(cluster);
+        })
+        .run();
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
index 90254d1..45fc197 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
@@ -100,6 +100,7 @@ public class UpgradeTestBase extends DistributedTestBase
         private RunOnCluster runAfterClusterUpgrade;
         private final Set<Integer> nodesToUpgrade = new LinkedHashSet<>();
         private Consumer<IInstanceConfig> configConsumer;
+        private Consumer<UpgradeableCluster.Builder> builderConsumer;
 
         public TestCase()
         {
@@ -162,6 +163,12 @@ public class UpgradeTestBase extends DistributedTestBase
             return this;
         }
 
+        public TestCase withBuilder(Consumer<UpgradeableCluster.Builder> builder)
+        {
+            this.builderConsumer = builder;
+            return this;
+        }
+
         public void run() throws Throwable
         {
             if (setup == null)
@@ -182,7 +189,7 @@ public class UpgradeTestBase extends DistributedTestBase
 
             for (TestVersions upgrade : this.upgrade)
             {
-                try (UpgradeableCluster cluster = init(UpgradeableCluster.create(nodeCount, upgrade.initial, configConsumer)))
+                try (UpgradeableCluster cluster = init(UpgradeableCluster.create(nodeCount, upgrade.initial, configConsumer, builderConsumer)))
                 {
                     setup.run(cluster);
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 02/02: Merge branch 'cassandra-4.0.0' into cassandra-4.0

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit cdf68e39ba9ab399936d3359bc0d24b4f38dbae8
Merge: e0cecae 4d4e1e8
Author: Caleb Rackliffe <ca...@gmail.com>
AuthorDate: Wed Jul 21 12:07:03 2021 -0500

    Merge branch 'cassandra-4.0.0' into cassandra-4.0

 CHANGES.txt                                        |   1 +
 .../apache/cassandra/db/MutationVerbHandler.java   |   2 +-
 .../cassandra/locator/InetAddressAndPort.java      |  85 ++++++++++++++++-
 .../org/apache/cassandra/net/ForwardingInfo.java   |   8 +-
 src/java/org/apache/cassandra/net/Message.java     |  14 ++-
 src/java/org/apache/cassandra/net/ParamType.java   |   5 +-
 .../cassandra/distributed/UpgradeableCluster.java  |  11 ++-
 .../upgrade/MixedModeMessageForwardTest.java       | 104 +++++++++++++++++++++
 .../distributed/upgrade/UpgradeTestBase.java       |   9 +-
 9 files changed, 223 insertions(+), 16 deletions(-)

diff --cc CHANGES.txt
index 414dc3b,b158970..b39b35d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,22 -1,5 +1,23 @@@
 +4.0.1
 + * Add repaired/unrepaired bytes back to nodetool (CASSANDRA-15282)
 + * Upgrade lz4-java to 1.8.0 to add RH6 support back (CASSANDRA-16753)
 + * Improve DiagnosticEventService.publish(event) logging message of events (CASSANDRA-16749)
 + * Cleanup dependency scopes (CASSANDRA-16704)
 + * Make JmxHistogram#getRecentValues() and JmxTimer#getRecentValues() thread-safe (CASSANDRA-16707)
 +Merged from 3.11:
 + * Make cqlsh use the same set of reserved keywords than the server uses (CASSANDRA-15663)
 + * Optimize bytes skipping when reading SSTable files (CASSANDRA-14415)
 + * Enable tombstone compactions when unchecked_tombstone_compaction is set in TWCS (CASSANDRA-14496)
 + * Read only the required SSTables for single partition queries (CASSANDRA-16737)
 +Merged from 3.0:
 + * Receipt of gossip shutdown notification updates TokenMetadata (CASSANDRA-16796)
 + * Count bloom filter misses correctly (CASSANDRA-12922)
 + * Reject token() in MV WHERE clause (CASSANDRA-13464)
 + * Ensure java executable is on the path (CASSANDRA-14325)
 + * Clean transaction log leftovers at the beginning of sstablelevelreset and sstableofflinerelevel (CASSANDRA-12519)
 +
  4.0.0
+  * Fix pre-4.0 FWD_FRM parameter serializer (CASSANDRA-16808)
   * Fix fwd to/from headers in DC write forwarding (CASSANDRA-16797)
   * Fix CassandraVersion::compareTo (CASSANDRA-16794)
   * BinLog does not close chronicle queue leaving this to GC to cleanup (CASSANDRA-16774)
diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java
index 0000000,e82e96f..78e2219
mode 000000,100644..100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java
@@@ -1,0 -1,104 +1,104 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.distributed.upgrade;
+ 
 -import org.apache.cassandra.distributed.UpgradeableCluster;
 -import org.apache.cassandra.distributed.api.ConsistencyLevel;
 -import org.apache.cassandra.distributed.api.Feature;
 -import org.apache.cassandra.distributed.shared.Shared;
 -import org.apache.cassandra.distributed.shared.Versions;
++import java.util.stream.Collectors;
++import java.util.stream.IntStream;
++
+ import org.junit.Test;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
 -import java.util.stream.Collectors;
 -import java.util.stream.IntStream;
++import org.apache.cassandra.distributed.UpgradeableCluster;
++import org.apache.cassandra.distributed.api.ConsistencyLevel;
++import org.apache.cassandra.distributed.api.Feature;
++import org.apache.cassandra.distributed.shared.Shared;
+ 
+ import static org.apache.cassandra.distributed.shared.AssertUtils.*;
+ 
+ @Shared
+ public class MixedModeMessageForwardTest extends UpgradeTestBase
+ {
+     private static final Logger logger = LoggerFactory.getLogger(MixedModeMessageForwardTest.class);
+     private static int nextKey = 1;
 -    private static String TABLE = "tbl";
 -    private static String INSERT_QUERY = String.format("INSERT INTO %s.%s(pk) VALUES (?)", KEYSPACE, TABLE);
 -    private static String CHECK_QUERY = String.format("SELECT pk FROM %s.%s WHERE pk = ?", KEYSPACE, TABLE);
++    private static final String TABLE = "tbl";
++    private static final String INSERT_QUERY = String.format("INSERT INTO %s.%s(pk) VALUES (?)", KEYSPACE, TABLE);
++    private static final String CHECK_QUERY = String.format("SELECT pk FROM %s.%s WHERE pk = ?", KEYSPACE, TABLE);
+ 
+     private void writeReadTest(UpgradeableCluster cluster)
+     {
+         // Coordinate a write from each node and then check present on all replicas
+         int readKey = nextKey;
+         for (int coordId = 1; coordId <= cluster.size(); coordId++)
+         {
+             logger.info("Coordinating CL.ALL Insert from node{} ", coordId);
+             cluster.get(coordId).coordinator().execute(INSERT_QUERY, ConsistencyLevel.ALL, nextKey++);
+         }
+ 
+         for (int coordId = 1; coordId <= cluster.size(); coordId++)
+         {
+             for (int nodeId = 1; nodeId <= cluster.size(); nodeId++) {
+                 Object[][] results = cluster.get(nodeId).executeInternal(CHECK_QUERY, readKey);
+                 assertRows(results, row(readKey));
+             }
+             readKey++;
+         }
+     }
+ 
+     /* Verify that messages sent with sendToHintedReplicas to non-local DCs
+      * are forwarded on to the hosts there.
+      *
+      * 1) creates a mixed cluster with multiple datacenters and a keyspace
+      *    configured to write to all replicas in the datacenter
+      * 2) check the original single-version cluster by issuing an INSERT
+      *    mutation from a coordinator on each node, then check that value
+      *    has locally been written to each of the nodes.
+      * 3) Upgrade nodes one at a time, rechecking that all writes are forwarded.
+      */
+     @Test
+     public void checkWritesForwardedToOtherDcTest() throws Throwable
+     {
+         int numDCs = 2;
+         int nodesPerDc = 2;
+         String ntsArgs = IntStream.range(1, numDCs + 1)
+                                   .mapToObj(dc -> String.format("'datacenter%d' : %d", dc, nodesPerDc))
+                                   .collect(Collectors.joining(","));
+ 
+         new TestCase()
+         .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK).set("request_timeout_in_ms", 30000))
+         .withBuilder(b -> b.withRacks(numDCs, 1, nodesPerDc))
+         .nodes(numDCs * nodesPerDc)
 -        .upgrade(Versions.Major.v30, Versions.Major.v4)
++        .singleUpgrade(v30, v40)
+         .setup(cluster -> {
+             cluster.schemaChange("ALTER KEYSPACE " + KEYSPACE +
+                 " WITH replication = {'class': 'NetworkTopologyStrategy', " + ntsArgs + " };");
+ 
 -            cluster.schemaChange("CREATE TABLE "+ KEYSPACE + "." + TABLE + " (pk int, PRIMARY KEY(pk))");
++            cluster.schemaChange(String.format("CREATE TABLE %s.%s (pk int, PRIMARY KEY(pk))", KEYSPACE, TABLE));
+ 
+             logger.info("Testing after setup, all nodes running {}", cluster.get(1).getReleaseVersionString());
+             writeReadTest(cluster);
+         })
+         .runAfterNodeUpgrade((UpgradeableCluster cluster, int nodeId) -> {
+             // Should be able to coordinate a write to any node and have a copy appear locally on all others
+             logger.info("Testing after upgrading node{} to {}", nodeId, cluster.get(nodeId).getReleaseVersionString());
+             writeReadTest(cluster);
+         })
+         .run();
+     }
+ }
diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
index c2afc2d,45fc197..11178c3
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
@@@ -210,8 -189,7 +217,8 @@@ public class UpgradeTestBase extends Di
  
              for (TestVersions upgrade : this.upgrade)
              {
 +                System.out.printf("testing upgrade from %s to %s%n", upgrade.initial.version, upgrade.upgrade.version);
-                 try (UpgradeableCluster cluster = init(UpgradeableCluster.create(nodeCount, upgrade.initial, configConsumer)))
+                 try (UpgradeableCluster cluster = init(UpgradeableCluster.create(nodeCount, upgrade.initial, configConsumer, builderConsumer)))
                  {
                      setup.run(cluster);
  

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org