You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/04/16 11:01:35 UTC

[cassandra] branch cassandra-2.2 updated: Duplicate results with DISTINCT queries in mixed mode

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-2.2
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-2.2 by this push:
     new 16f639a  Duplicate results with DISTINCT queries in mixed mode
16f639a is described below

commit 16f639af94f56cdc7145299730278e27a9113e2e
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Tue Apr 14 16:26:30 2020 +0200

    Duplicate results with DISTINCT queries in mixed mode
    
    Patch by marcuse; reviewed by Aleksey Yeschenko, Sam Tunnicliffe and Alex Petrov for CASSANDRA-15501
---
 CHANGES.txt                                        |  1 +
 .../cassandra/distributed/UpgradeableCluster.java  |  7 ++
 .../distributed/impl/AbstractCluster.java          | 11 ++-
 .../impl/DelegatingInvokableInstance.java          |  3 +-
 .../cassandra/distributed/impl/Instance.java       |  2 +-
 .../upgrade/MixedModeReadRepairTest.java           | 29 +++----
 .../cassandra/distributed/upgrade/PagingTest.java  | 96 ++++++++++++++++++++++
 .../distributed/upgrade/UpgradeTestBase.java       | 13 ++-
 8 files changed, 139 insertions(+), 23 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 56dd315..6913575 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.18
+ * Duplicate results with DISTINCT queries in mixed mode (CASSANDRA-15501)
  * Disable JMX rebinding (CASSANDRA-15653)
 Merged from 2.1:
   * Fix parse error in cqlsh COPY FROM and formatting for map of blobs (CASSANDRA-15679)
diff --git a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
index 73a3c8a..71f3f8c 100644
--- a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
@@ -19,7 +19,9 @@
 package org.apache.cassandra.distributed;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.List;
+import java.util.function.Consumer;
 
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.impl.AbstractCluster;
@@ -69,6 +71,11 @@ public class UpgradeableCluster extends AbstractCluster<IUpgradeableInstance> im
         return build(nodeCount).start();
     }
 
+    public static UpgradeableCluster create(int nodeCount, Versions.Version version, Consumer<IInstanceConfig> configUpdater) throws IOException
+    {
+        return build(nodeCount).withConfig(configUpdater).withVersion(version).start();
+    }
+
     public static UpgradeableCluster create(int nodeCount, Versions.Version version) throws Throwable
     {
         return build(nodeCount).withVersion(version).start();
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 05c8af8..f123338 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -122,6 +122,13 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
         protected IInvokableInstance delegate()
         {
             if (delegate == null)
+                throw new IllegalStateException("Can't use shut down instances, delegate is null");
+            return delegate;
+        }
+
+        protected IInvokableInstance delegateForStartup()
+        {
+            if (delegate == null)
                 delegate = newInstance(generation);
             return delegate;
         }
@@ -157,7 +164,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
         {
             if (!isShutdown)
                 throw new IllegalStateException();
-            delegate().startup(AbstractCluster.this);
+            delegateForStartup().startup(AbstractCluster.this);
             isShutdown = false;
             updateMessagingVersions();
         }
@@ -249,7 +256,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
             I instance = newInstanceWrapperInternal(generation, initialVersion, config);
             instances.add(instance);
             // we use the config().broadcastAddressAndPort() here because we have not initialised the Instance
-            I prev = instanceMap.put(instance.broadcastAddress(), instance);
+            I prev = instanceMap.put(instance.config().broadcastAddress(), instance);
             if (null != prev)
                 throw new IllegalStateException("Cluster cannot have multiple nodes with same InetAddressAndPort: " + instance.broadcastAddress() + " vs " + prev.broadcastAddress());
         }
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java b/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
index 2f7a043..019481d 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.distributed.shared.NetworkTopology;
 public abstract class DelegatingInvokableInstance implements IInvokableInstance
 {
     protected abstract IInvokableInstance delegate();
+    protected abstract IInvokableInstance delegateForStartup();
     
     @Override
     public <E extends Serializable> E transfer(E object)
@@ -124,7 +125,7 @@ public abstract class DelegatingInvokableInstance implements IInvokableInstance
     @Override
     public void startup(ICluster cluster)
     {
-        delegate().startup(cluster);
+        delegateForStartup().startup(cluster);
     }
 
     @Override
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 1c19bca..7a1c988 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -253,7 +253,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
                 int toNum = to.config().num();
                 return cluster.filters().permitOutbound(fromNum, toNum, serializeMessage(message, id,
                                                                                  broadcastAddress(),
-                                                                                 to.broadcastAddress()));
+                                                                                 to.config().broadcastAddress()));
             }
 
             public boolean allowIncomingMessage(MessageIn message, int id)
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
index e69e38a..fabf172 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
@@ -21,9 +21,9 @@ package org.apache.cassandra.distributed.upgrade;
 import org.junit.Test;
 
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.shared.DistributedTestBase;
 import org.apache.cassandra.distributed.shared.Versions;
 
-import static org.apache.cassandra.distributed.shared.Versions.find;
 
 public class MixedModeReadRepairTest extends UpgradeTestBase
 {
@@ -33,26 +33,21 @@ public class MixedModeReadRepairTest extends UpgradeTestBase
         new TestCase()
         .nodes(2)
         .upgrade(Versions.Major.v22, Versions.Major.v30)
-        .nodesToUpgrade(2)
-        .setup((cluster) -> cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk ascii, b boolean, v blob, PRIMARY KEY (pk)) WITH COMPACT STORAGE"))
-        .runAfterClusterUpgrade((cluster) -> {
-            // now node2 is 3.0 and node1 is 2.2
+        .setup((cluster) -> cluster.schemaChange("CREATE TABLE " + DistributedTestBase.KEYSPACE + ".tbl (pk ascii, b boolean, v blob, PRIMARY KEY (pk)) WITH COMPACT STORAGE"))
+        .runAfterNodeUpgrade((cluster, node) -> {
+            if (node != 1)
+                return;
+            // now node1 is 3.0 and node2 is 2.2
             // make sure 2.2 side does not get the mutation
-            cluster.get(2).executeInternal("DELETE FROM " + KEYSPACE + ".tbl WHERE pk = ?",
+            cluster.get(1).executeInternal("DELETE FROM " + DistributedTestBase.KEYSPACE + ".tbl WHERE pk = ?",
                                                                           "something");
             // trigger a read repair
-            cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
+            cluster.coordinator(2).execute("SELECT * FROM " + DistributedTestBase.KEYSPACE + ".tbl WHERE pk = ?",
                                            ConsistencyLevel.ALL,
                                            "something");
-            cluster.get(1).flush(KEYSPACE);
-            // upgrade node1 to 3.0
-            cluster.get(1).shutdown().get();
-            Versions allVersions = find();
-            cluster.get(1).setVersion(allVersions.getLatest(Versions.Major.v30));
-            cluster.get(1).startup();
-
-            // and make sure the sstables are readable
-            cluster.get(1).forceCompact(KEYSPACE, "tbl");
-        }).run();
+            cluster.get(2).flush(DistributedTestBase.KEYSPACE);
+        })
+        .runAfterClusterUpgrade((cluster) -> cluster.get(2).forceCompact(DistributedTestBase.KEYSPACE, "tbl"))
+        .run();
     }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/PagingTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/PagingTest.java
new file mode 100644
index 0000000..1af5856
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/PagingTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.junit.Test;
+
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.shared.DistributedTestBase;
+import org.apache.cassandra.distributed.shared.Versions;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class PagingTest extends UpgradeTestBase
+{
+    @Test
+    public void testReads() throws Throwable
+    {
+        new UpgradeTestBase.TestCase()
+        .nodes(2)
+        .upgrade(Versions.Major.v22, Versions.Major.v30)
+        .nodesToUpgrade(2)
+        .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
+        .setup((cluster) -> {
+            cluster.disableAutoCompaction(DistributedTestBase.KEYSPACE);
+            cluster.schemaChange("CREATE TABLE " + DistributedTestBase.KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck)) ");
+            for (int j = 0; j < 5000; j++)
+            {
+                for (int i = 0; i < 10; i++)
+                    cluster.coordinator(1).execute("insert into " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (" + j + ", " + i + ", 'hello')", ConsistencyLevel.ALL);
+            }
+            cluster.forEach(c -> c.flush(DistributedTestBase.KEYSPACE));
+            checkDuplicates("BOTH ON 2.2");
+        })
+        .runAfterClusterUpgrade((cluster) -> checkDuplicates("MIXED MODE"))
+        .run();
+    }
+
+    private void checkDuplicates(String message) throws InterruptedException
+    {
+        Thread.sleep(5000); // sometimes one node doesn't have time come up properly?
+        try (com.datastax.driver.core.Cluster c = com.datastax.driver.core.Cluster.builder()
+                                                                                  .addContactPoint("127.0.0.1")
+                                                                                  .withProtocolVersion(ProtocolVersion.V3)
+                                                                                  .withQueryOptions(new QueryOptions().setFetchSize(101))
+                                                                                  .build();
+             Session s = c.connect())
+        {
+            Statement stmt = new SimpleStatement("select distinct token(pk) from " + DistributedTestBase.KEYSPACE + ".tbl WHERE token(pk) > " + Long.MIN_VALUE + " AND token(pk) < " + Long.MAX_VALUE);
+            stmt.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.ALL);
+            ResultSet res = s.execute(stmt);
+            Set<Object> seenTokens = new HashSet<>();
+            Iterator<Row> rows = res.iterator();
+            Set<Object> dupes = new HashSet<>();
+            while (rows.hasNext())
+            {
+                Object token = rows.next().getObject(0);
+                if (seenTokens.contains(token))
+                    dupes.add(token);
+                seenTokens.add(token);
+            }
+            assertEquals(message+": too few rows", 5000, seenTokens.size());
+            assertTrue(message+": dupes is not empty", dupes.isEmpty());
+        }
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
index effec83..3567453 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.function.Consumer;
 
 import org.junit.After;
 import org.junit.BeforeClass;
@@ -30,8 +31,9 @@ import org.junit.BeforeClass;
 import org.apache.cassandra.distributed.UpgradeableCluster;
 import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.impl.Instance;
-
+import org.apache.cassandra.distributed.impl.InstanceConfig;
 import org.apache.cassandra.distributed.shared.Builder;
 import org.apache.cassandra.distributed.shared.DistributedTestBase;
 import org.apache.cassandra.distributed.shared.Versions;
@@ -91,6 +93,7 @@ public class UpgradeTestBase extends DistributedTestBase
         private RunOnClusterAndNode runAfterNodeUpgrade;
         private RunOnCluster runAfterClusterUpgrade;
         private final Set<Integer> nodesToUpgrade = new HashSet<>();
+        private Consumer<IInstanceConfig> configConsumer;
 
         public TestCase()
         {
@@ -141,6 +144,12 @@ public class UpgradeTestBase extends DistributedTestBase
             return this;
         }
 
+        public TestCase withConfig(Consumer<IInstanceConfig> config)
+        {
+            this.configConsumer = config;
+            return this;
+        }
+
         public void run() throws Throwable
         {
             if (setup == null)
@@ -159,7 +168,7 @@ public class UpgradeTestBase extends DistributedTestBase
 
             for (TestVersions upgrade : this.upgrade)
             {
-                try (UpgradeableCluster cluster = init(UpgradeableCluster.create(nodeCount, upgrade.initial)))
+                try (UpgradeableCluster cluster = init(UpgradeableCluster.create(nodeCount, upgrade.initial, configConsumer)))
                 {
                     setup.run(cluster);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org