You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/04/16 11:01:35 UTC
[cassandra] branch cassandra-2.2 updated: Duplicate results with
DISTINCT queries in mixed mode
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch cassandra-2.2
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-2.2 by this push:
new 16f639a Duplicate results with DISTINCT queries in mixed mode
16f639a is described below
commit 16f639af94f56cdc7145299730278e27a9113e2e
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Tue Apr 14 16:26:30 2020 +0200
Duplicate results with DISTINCT queries in mixed mode
Patch by marcuse; reviewed by Aleksey Yeschenko, Sam Tunnicliffe and Alex Petrov for CASSANDRA-15501
---
CHANGES.txt | 1 +
.../cassandra/distributed/UpgradeableCluster.java | 7 ++
.../distributed/impl/AbstractCluster.java | 11 ++-
.../impl/DelegatingInvokableInstance.java | 3 +-
.../cassandra/distributed/impl/Instance.java | 2 +-
.../upgrade/MixedModeReadRepairTest.java | 29 +++----
.../cassandra/distributed/upgrade/PagingTest.java | 96 ++++++++++++++++++++++
.../distributed/upgrade/UpgradeTestBase.java | 13 ++-
8 files changed, 139 insertions(+), 23 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 56dd315..6913575 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.18
+ * Duplicate results with DISTINCT queries in mixed mode (CASSANDRA-15501)
* Disable JMX rebinding (CASSANDRA-15653)
Merged from 2.1:
* Fix parse error in cqlsh COPY FROM and formatting for map of blobs (CASSANDRA-15679)
diff --git a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
index 73a3c8a..71f3f8c 100644
--- a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
@@ -19,7 +19,9 @@
package org.apache.cassandra.distributed;
import java.io.File;
+import java.io.IOException;
import java.util.List;
+import java.util.function.Consumer;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.impl.AbstractCluster;
@@ -69,6 +71,11 @@ public class UpgradeableCluster extends AbstractCluster<IUpgradeableInstance> im
return build(nodeCount).start();
}
+ public static UpgradeableCluster create(int nodeCount, Versions.Version version, Consumer<IInstanceConfig> configUpdater) throws IOException
+ {
+ return build(nodeCount).withConfig(configUpdater).withVersion(version).start();
+ }
+
public static UpgradeableCluster create(int nodeCount, Versions.Version version) throws Throwable
{
return build(nodeCount).withVersion(version).start();
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 05c8af8..f123338 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -122,6 +122,13 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
protected IInvokableInstance delegate()
{
if (delegate == null)
+ throw new IllegalStateException("Can't use shut down instances, delegate is null");
+ return delegate;
+ }
+
+ protected IInvokableInstance delegateForStartup()
+ {
+ if (delegate == null)
delegate = newInstance(generation);
return delegate;
}
@@ -157,7 +164,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
{
if (!isShutdown)
throw new IllegalStateException();
- delegate().startup(AbstractCluster.this);
+ delegateForStartup().startup(AbstractCluster.this);
isShutdown = false;
updateMessagingVersions();
}
@@ -249,7 +256,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
I instance = newInstanceWrapperInternal(generation, initialVersion, config);
instances.add(instance);
// we use the config().broadcastAddressAndPort() here because we have not initialised the Instance
- I prev = instanceMap.put(instance.broadcastAddress(), instance);
+ I prev = instanceMap.put(instance.config().broadcastAddress(), instance);
if (null != prev)
throw new IllegalStateException("Cluster cannot have multiple nodes with same InetAddressAndPort: " + instance.broadcastAddress() + " vs " + prev.broadcastAddress());
}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java b/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
index 2f7a043..019481d 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.distributed.shared.NetworkTopology;
public abstract class DelegatingInvokableInstance implements IInvokableInstance
{
protected abstract IInvokableInstance delegate();
+ protected abstract IInvokableInstance delegateForStartup();
@Override
public <E extends Serializable> E transfer(E object)
@@ -124,7 +125,7 @@ public abstract class DelegatingInvokableInstance implements IInvokableInstance
@Override
public void startup(ICluster cluster)
{
- delegate().startup(cluster);
+ delegateForStartup().startup(cluster);
}
@Override
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 1c19bca..7a1c988 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -253,7 +253,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
int toNum = to.config().num();
return cluster.filters().permitOutbound(fromNum, toNum, serializeMessage(message, id,
broadcastAddress(),
- to.broadcastAddress()));
+ to.config().broadcastAddress()));
}
public boolean allowIncomingMessage(MessageIn message, int id)
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
index e69e38a..fabf172 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
@@ -21,9 +21,9 @@ package org.apache.cassandra.distributed.upgrade;
import org.junit.Test;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.shared.DistributedTestBase;
import org.apache.cassandra.distributed.shared.Versions;
-import static org.apache.cassandra.distributed.shared.Versions.find;
public class MixedModeReadRepairTest extends UpgradeTestBase
{
@@ -33,26 +33,21 @@ public class MixedModeReadRepairTest extends UpgradeTestBase
new TestCase()
.nodes(2)
.upgrade(Versions.Major.v22, Versions.Major.v30)
- .nodesToUpgrade(2)
- .setup((cluster) -> cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk ascii, b boolean, v blob, PRIMARY KEY (pk)) WITH COMPACT STORAGE"))
- .runAfterClusterUpgrade((cluster) -> {
- // now node2 is 3.0 and node1 is 2.2
+ .setup((cluster) -> cluster.schemaChange("CREATE TABLE " + DistributedTestBase.KEYSPACE + ".tbl (pk ascii, b boolean, v blob, PRIMARY KEY (pk)) WITH COMPACT STORAGE"))
+ .runAfterNodeUpgrade((cluster, node) -> {
+ if (node != 1)
+ return;
+ // now node1 is 3.0 and node2 is 2.2
// make sure 2.2 side does not get the mutation
- cluster.get(2).executeInternal("DELETE FROM " + KEYSPACE + ".tbl WHERE pk = ?",
+ cluster.get(1).executeInternal("DELETE FROM " + DistributedTestBase.KEYSPACE + ".tbl WHERE pk = ?",
"something");
// trigger a read repair
- cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
+ cluster.coordinator(2).execute("SELECT * FROM " + DistributedTestBase.KEYSPACE + ".tbl WHERE pk = ?",
ConsistencyLevel.ALL,
"something");
- cluster.get(1).flush(KEYSPACE);
- // upgrade node1 to 3.0
- cluster.get(1).shutdown().get();
- Versions allVersions = find();
- cluster.get(1).setVersion(allVersions.getLatest(Versions.Major.v30));
- cluster.get(1).startup();
-
- // and make sure the sstables are readable
- cluster.get(1).forceCompact(KEYSPACE, "tbl");
- }).run();
+ cluster.get(2).flush(DistributedTestBase.KEYSPACE);
+ })
+ .runAfterClusterUpgrade((cluster) -> cluster.get(2).forceCompact(DistributedTestBase.KEYSPACE, "tbl"))
+ .run();
}
}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/PagingTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/PagingTest.java
new file mode 100644
index 0000000..1af5856
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/PagingTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.junit.Test;
+
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.shared.DistributedTestBase;
+import org.apache.cassandra.distributed.shared.Versions;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class PagingTest extends UpgradeTestBase
+{
+ @Test
+ public void testReads() throws Throwable
+ {
+ new UpgradeTestBase.TestCase()
+ .nodes(2)
+ .upgrade(Versions.Major.v22, Versions.Major.v30)
+ .nodesToUpgrade(2)
+ .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
+ .setup((cluster) -> {
+ cluster.disableAutoCompaction(DistributedTestBase.KEYSPACE);
+ cluster.schemaChange("CREATE TABLE " + DistributedTestBase.KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck)) ");
+ for (int j = 0; j < 5000; j++)
+ {
+ for (int i = 0; i < 10; i++)
+ cluster.coordinator(1).execute("insert into " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (" + j + ", " + i + ", 'hello')", ConsistencyLevel.ALL);
+ }
+ cluster.forEach(c -> c.flush(DistributedTestBase.KEYSPACE));
+ checkDuplicates("BOTH ON 2.2");
+ })
+ .runAfterClusterUpgrade((cluster) -> checkDuplicates("MIXED MODE"))
+ .run();
+ }
+
+ private void checkDuplicates(String message) throws InterruptedException
+ {
+ Thread.sleep(5000); // sometimes one node doesn't have time come up properly?
+ try (com.datastax.driver.core.Cluster c = com.datastax.driver.core.Cluster.builder()
+ .addContactPoint("127.0.0.1")
+ .withProtocolVersion(ProtocolVersion.V3)
+ .withQueryOptions(new QueryOptions().setFetchSize(101))
+ .build();
+ Session s = c.connect())
+ {
+ Statement stmt = new SimpleStatement("select distinct token(pk) from " + DistributedTestBase.KEYSPACE + ".tbl WHERE token(pk) > " + Long.MIN_VALUE + " AND token(pk) < " + Long.MAX_VALUE);
+ stmt.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.ALL);
+ ResultSet res = s.execute(stmt);
+ Set<Object> seenTokens = new HashSet<>();
+ Iterator<Row> rows = res.iterator();
+ Set<Object> dupes = new HashSet<>();
+ while (rows.hasNext())
+ {
+ Object token = rows.next().getObject(0);
+ if (seenTokens.contains(token))
+ dupes.add(token);
+ seenTokens.add(token);
+ }
+ assertEquals(message+": too few rows", 5000, seenTokens.size());
+ assertTrue(message+": dupes is not empty", dupes.isEmpty());
+ }
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
index effec83..3567453 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.function.Consumer;
import org.junit.After;
import org.junit.BeforeClass;
@@ -30,8 +31,9 @@ import org.junit.BeforeClass;
import org.apache.cassandra.distributed.UpgradeableCluster;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.impl.Instance;
-
+import org.apache.cassandra.distributed.impl.InstanceConfig;
import org.apache.cassandra.distributed.shared.Builder;
import org.apache.cassandra.distributed.shared.DistributedTestBase;
import org.apache.cassandra.distributed.shared.Versions;
@@ -91,6 +93,7 @@ public class UpgradeTestBase extends DistributedTestBase
private RunOnClusterAndNode runAfterNodeUpgrade;
private RunOnCluster runAfterClusterUpgrade;
private final Set<Integer> nodesToUpgrade = new HashSet<>();
+ private Consumer<IInstanceConfig> configConsumer;
public TestCase()
{
@@ -141,6 +144,12 @@ public class UpgradeTestBase extends DistributedTestBase
return this;
}
+ public TestCase withConfig(Consumer<IInstanceConfig> config)
+ {
+ this.configConsumer = config;
+ return this;
+ }
+
public void run() throws Throwable
{
if (setup == null)
@@ -159,7 +168,7 @@ public class UpgradeTestBase extends DistributedTestBase
for (TestVersions upgrade : this.upgrade)
{
- try (UpgradeableCluster cluster = init(UpgradeableCluster.create(nodeCount, upgrade.initial)))
+ try (UpgradeableCluster cluster = init(UpgradeableCluster.create(nodeCount, upgrade.initial, configConsumer)))
{
setup.run(cluster);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org