You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by mc...@apache.org on 2021/07/12 20:56:59 UTC
[cassandra] 01/01: Merge branch 'cassandra-2.2' into cassandra-3.0
This is an automated email from the ASF dual-hosted git repository.
mck pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 0a84dda36a2ec971681f0002b45a00c3432a8024
Merge: eadb171 b3f9921
Author: Mick Semb Wever <mc...@apache.org>
AuthorDate: Mon Jul 12 22:14:13 2021 +0200
Merge branch 'cassandra-2.2' into cassandra-3.0
build.xml | 2 +-
.../cassandra/distributed/impl/InstanceConfig.java | 14 ++--
.../upgrade/CompactStorage2to3UpgradeTest.java | 23 +++----
.../upgrade/DropCompactStorageTest.java | 7 +-
.../upgrade/MigrateDropColumnsTest.java | 3 +-
.../upgrade/MixedModeRangeTombstoneTest.java | 2 +-
.../upgrade/MixedModeReadRepairTest.java | 4 +-
.../cassandra/distributed/upgrade/PagingTest.java | 2 +-
.../cassandra/distributed/upgrade/UpgradeTest.java | 27 +-------
.../distributed/upgrade/UpgradeTestBase.java | 77 +++++++++++++++-------
10 files changed, 83 insertions(+), 78 deletions(-)
diff --cc build.xml
index 8dc7dfd,7d1cc58..87851b8
--- a/build.xml
+++ b/build.xml
@@@ -392,9 -393,8 +392,9 @@@
</dependency>
<dependency groupId="junit" artifactId="junit" version="4.6" />
<dependency groupId="org.mockito" artifactId="mockito-core" version="3.2.4" />
- <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.7" />
+ <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.8" />
<dependency groupId="org.reflections" artifactId="reflections" version="0.9.12" />
+ <dependency groupId="org.quicktheories" artifactId="quicktheories" version="0.25" />
<dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.10">
<exclusion groupId="commons-lang" artifactId="commons-lang"/>
</dependency>
diff --cc test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index dfd842d,f4ef623..b97cb75
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@@ -33,7 -32,7 +32,6 @@@ import com.vdurmont.semver4j.Semver
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
--import org.apache.cassandra.config.YamlConfigurationLoader;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.shared.NetworkTopology;
diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage2to3UpgradeTest.java
index 37e8bb4,0000000..9ea54c3
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage2to3UpgradeTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage2to3UpgradeTest.java
@@@ -1,363 -1,0 +1,364 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.shared.Versions;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+import static org.junit.Assert.assertEquals;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+
+public class CompactStorage2to3UpgradeTest extends UpgradeTestBase
+{
+ @Test
+ public void multiColumn() throws Throwable
+ {
+ new TestCase()
- .upgrade(Versions.Major.v22, Versions.Major.v30)
++ .upgradesFrom(v22)
+ .setup(cluster -> {
+ assert cluster.size() == 3;
+ int rf = cluster.size() - 1;
+ assert rf == 2;
+ cluster.schemaChange("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + (cluster.size() - 1) + "};");
+ cluster.schemaChange("CREATE TABLE ks.tbl (pk int, v1 int, v2 text, PRIMARY KEY (pk)) WITH COMPACT STORAGE");
+ ICoordinator coordinator = cluster.coordinator(1);
+ // these shouldn't be replicated by the 3rd node
+ coordinator.execute("INSERT INTO ks.tbl (pk, v1, v2) VALUES (3, 3, '3')", ConsistencyLevel.ALL);
+ coordinator.execute("INSERT INTO ks.tbl (pk, v1, v2) VALUES (9, 9, '9')", ConsistencyLevel.ALL);
+ for (int i = 0; i < cluster.size(); i++)
+ {
+ int nodeNum = i + 1;
+ System.out.printf("****** node %s: %s%n", nodeNum, cluster.get(nodeNum).config());
+ }
+ })
+ .runAfterNodeUpgrade(((cluster, node) -> {
+ if (node != 2)
+ return;
+
+ Object[][] rows = cluster.coordinator(3).execute("SELECT * FROM ks.tbl LIMIT 2", ConsistencyLevel.ALL);
+ Object[][] expected = {
+ row(9, 9, "9"),
+ row(3, 3, "3")
+ };
+ assertRows(rows, expected);
+ })).run();
+ }
+
+ @Test
+ public void singleColumn() throws Throwable
+ {
+ new TestCase()
- .upgrade(Versions.Major.v22, Versions.Major.v30)
++ .upgradesFrom(v22)
+ .setup(cluster -> {
+ assert cluster.size() == 3;
+ int rf = cluster.size() - 1;
+ assert rf == 2;
+ cluster.schemaChange("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + (cluster.size() - 1) + "};");
+ cluster.schemaChange("CREATE TABLE ks.tbl (pk int, v int, PRIMARY KEY (pk)) WITH COMPACT STORAGE");
+ ICoordinator coordinator = cluster.coordinator(1);
+ // these shouldn't be replicated by the 3rd node
+ coordinator.execute("INSERT INTO ks.tbl (pk, v) VALUES (3, 3)", ConsistencyLevel.ALL);
+ coordinator.execute("INSERT INTO ks.tbl (pk, v) VALUES (9, 9)", ConsistencyLevel.ALL);
+ for (int i = 0; i < cluster.size(); i++)
+ {
+ int nodeNum = i + 1;
+ System.out.printf("****** node %s: %s%n", nodeNum, cluster.get(nodeNum).config());
+ }
+ })
+ .runAfterNodeUpgrade(((cluster, node) -> {
+
+ if (node < 2)
+ return;
+
+ Object[][] rows = cluster.coordinator(3).execute("SELECT * FROM ks.tbl LIMIT 2", ConsistencyLevel.ALL);
+ Object[][] expected = {
+ row(9, 9),
+ row(3, 3)
+ };
+ assertRows(rows, expected);
+ })).run();
+ }
+
+ @Test
+ public void testDropCompactWithClusteringAndValueColumn() throws Throwable
+ {
+ final String table = "clustering_and_value";
+ final int partitions = 10;
+ final int rowsPerPartition = 10;
+
+ final ResultsRecorder recorder = new ResultsRecorder();
+ new TestCase()
- .nodes(2)
- .upgrade(Versions.Major.v22, Versions.Major.v30)
- .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
- .setup(cluster -> {
- cluster.schemaChange(String.format(
- "CREATE TABLE %s.%s (key int, c1 int, c2 int, c3 int, PRIMARY KEY (key, c1, c2)) WITH COMPACT STORAGE",
- KEYSPACE, table));
- ICoordinator coordinator = cluster.coordinator(1);
++
++ .nodes(2)
++ .upgradesFrom(v22)
++ .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
++ .setup(cluster -> {
++ cluster.schemaChange(String.format(
++ "CREATE TABLE %s.%s (key int, c1 int, c2 int, c3 int, PRIMARY KEY (key, c1, c2)) WITH COMPACT STORAGE",
++ KEYSPACE, table));
++ ICoordinator coordinator = cluster.coordinator(1);
+
+
+ for (int i = 1; i <= partitions; i++)
+ {
+ for (int j = 1; j <= rowsPerPartition; j++)
+ {
+ coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 1, 1)",
+ KEYSPACE, table, i, j), ConsistencyLevel.ALL);
+ coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 2, 1)",
+ KEYSPACE, table, i, j), ConsistencyLevel.ALL);
+ }
+ }
+
+ runQueries(cluster.coordinator(1), recorder, new String[] {
+ String.format("SELECT * FROM %s.%s", KEYSPACE, table),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
+ KEYSPACE, table, partitions - 3, rowsPerPartition - 2),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
+ KEYSPACE, table, partitions - 1, rowsPerPartition - 5),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
+ KEYSPACE, table, partitions - 1, rowsPerPartition - 5, 1),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
+ KEYSPACE, table, partitions - 4, rowsPerPartition - 9, 1),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 > %d",
+ KEYSPACE, table, partitions - 4, rowsPerPartition - 9, 1),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 > %d",
+ KEYSPACE, table, partitions - 4, rowsPerPartition - 9, 2),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d",
+ KEYSPACE, table, partitions - 8, rowsPerPartition - 3),
+
+ });
+
+ }).runBeforeNodeRestart((cluster, node) ->
+ {
+ cluster.get(node).config().set("enable_drop_compact_storage", true);
+
+ }).runAfterClusterUpgrade(cluster ->
+ {
+ for (int i = 1; i <= cluster.size(); i++)
+ {
+ NodeToolResult result = cluster.get(i).nodetoolResult("upgradesstables");
+ assertEquals("upgrade sstables failed for node " + i, 0, result.getRc());
+ }
+ Thread.sleep(600);
+
+ // make sure the results are the same after upgrade and upgrade sstables but before dropping compact storage
+ recorder.validateResults(cluster, 1);
+ recorder.validateResults(cluster, 2);
+
+ // make sure the results are the same after dropping compact storage on only the first node
+ IMessageFilters.Filter filter = cluster.verbs().allVerbs().to(2).drop();
+ cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 1);
+
+ recorder.validateResults(cluster, 1, ConsistencyLevel.ONE);
+
+ filter.off();
+ recorder.validateResults(cluster, 1);
+ recorder.validateResults(cluster, 2);
+
+ // make sure the results continue to be the same after dropping compact storage on the second node
+ cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 2);
+ recorder.validateResults(cluster, 1);
+ recorder.validateResults(cluster, 2);
+ })
+ .run();
+ }
+
+ @Test
+ public void testDropCompactWithClusteringAndValueColumnWithDeletesAndWrites() throws Throwable
+ {
+ final String table = "clustering_and_value_with_deletes";
+ final int partitions = 10;
+ final int rowsPerPartition = 10;
+ final int additionalParititons = 5;
+
+ new TestCase()
+ .nodes(2)
- .upgrade(Versions.Major.v22, Versions.Major.v30)
++ .upgradesFrom(v22)
+ .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL).set("enable_drop_compact_storage", true))
+ .setup(cluster -> {
+ cluster.schemaChange(String.format(
+ "CREATE TABLE %s.%s (key int, c1 int, c2 int, c3 int, PRIMARY KEY (key, c1, c2)) WITH COMPACT STORAGE",
+ KEYSPACE, table));
+ ICoordinator coordinator = cluster.coordinator(1);
+
+ for (int i = 1; i <= partitions; i++)
+ {
+ for (int j = 1; j <= rowsPerPartition; j++)
+ {
+ coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 1, 1)",
+ KEYSPACE, table, i, j), ConsistencyLevel.ALL);
+ coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 2, 2)",
+ KEYSPACE, table, i, j), ConsistencyLevel.ALL);
+ coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 3, 3)",
+ KEYSPACE, table, i, j), ConsistencyLevel.ALL);
+ }
+ }
+ })
+ .runAfterClusterUpgrade(cluster -> {
+ cluster.forEach(n -> n.nodetoolResult("upgradesstables", KEYSPACE).asserts().success());
+ Thread.sleep(1000);
+ // drop compact storage on only one node before performing writes
+ IMessageFilters.Filter filter = cluster.verbs().allVerbs().to(2).drop();
+ cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 1);
+ filter.off();
+
+ // add new partitions and delete some of the old ones
+ ICoordinator coordinator = cluster.coordinator(1);
+ for (int i = 0; i < additionalParititons; i++)
+ {
+ for (int j = 1; j <= rowsPerPartition; j++)
+ {
+ coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 1, 1)",
+ KEYSPACE, table, i, j), ConsistencyLevel.ALL);
+ }
+ }
+
+ coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d",
+ KEYSPACE, table, 0, 3), ConsistencyLevel.ALL);
+
+ coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d",
+ KEYSPACE, table, 1), ConsistencyLevel.ALL);
+
+ coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
+ KEYSPACE, table, 7, 2, 2), ConsistencyLevel.ALL);
+
+ coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
+ KEYSPACE, table, 7, 6, 1), ConsistencyLevel.ALL);
+
+ coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
+ KEYSPACE, table, 4, 1, 1), ConsistencyLevel.ALL);
+
+ coordinator.execute(String.format("DELETE c3 FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
+ KEYSPACE, table, 8, 1, 3), ConsistencyLevel.ALL);
+
+ coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 > 1",
+ KEYSPACE, table, 6, 2), ConsistencyLevel.ALL);
+
+ ResultsRecorder recorder = new ResultsRecorder();
+ runQueries(coordinator, recorder, new String[] {
+ String.format("SELECT * FROM %s.%s", KEYSPACE, table),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
+ KEYSPACE, table, partitions - 3, rowsPerPartition - 2),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
+ KEYSPACE, table, partitions - 1, rowsPerPartition - 5),
+
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d",
+ KEYSPACE, table, partitions - 8, rowsPerPartition - 3),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d",
+ KEYSPACE, table, 7),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
+ KEYSPACE, table, 7, 2),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
+ KEYSPACE, table, 8, 1),
+
+ String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d and c1 = %d",
+ KEYSPACE, table, 8, 1),
+
+ String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d and c1 = %d",
+ KEYSPACE, table, 8, 1),
+
+ String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d and c1 = %d",
+ KEYSPACE, table, 4, 1),
+
+ String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d",
+ KEYSPACE, table, 6),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d",
+ KEYSPACE, table, 0, 1),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d",
+ KEYSPACE, table, partitions - (additionalParititons - 2)),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d",
+ KEYSPACE, table, partitions - (additionalParititons - 3), 4)
+
+ });
+
+ // drop compact storage on remaining node and check result
+ cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 2);
+ recorder.validateResults(cluster, 1);
+ recorder.validateResults(cluster, 2);
+ }).run();
+ }
+
+ private void runQueries(ICoordinator coordinator, ResultsRecorder helper, String[] queries)
+ {
+ for (String query : queries)
+ helper.addResult(query, coordinator.execute(query, ConsistencyLevel.ALL));
+ }
+
+ public static class ResultsRecorder
+ {
+ final private Map<String, Object[][]> preUpgradeResults = new HashMap<>();
+
+ public void addResult(String query, Object[][] results)
+ {
+ preUpgradeResults.put(query, results);
+ }
+
+ public Map<String, Object[][]> queriesAndResults()
+ {
+ return preUpgradeResults;
+ }
+
+ public void validateResults(UpgradeableCluster cluster, int node)
+ {
+ validateResults(cluster, node, ConsistencyLevel.ALL);
+ }
+
+ public void validateResults(UpgradeableCluster cluster, int node, ConsistencyLevel cl)
+ {
+ for (Map.Entry<String, Object[][]> entry : queriesAndResults().entrySet())
+ {
+ Object[][] postUpgradeResult = cluster.coordinator(node).execute(entry.getKey(), cl);
+ assertRows(postUpgradeResult, entry.getValue());
+ }
+
+ }
+ }
+
+}
diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageTest.java
index 417af4e,0000000..920458a
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageTest.java
@@@ -1,69 -1,0 +1,70 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
++import com.vdurmont.semver4j.Semver;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.shared.Versions;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+
+public class DropCompactStorageTest extends UpgradeTestBase
+{
+ @Test
+ public void dropCompactStorageBeforeUpgradesstablesTo30() throws Throwable
+ {
- dropCompactStorageBeforeUpgradeSstables(Versions.Major.v30);
++ dropCompactStorageBeforeUpgradeSstables(v30);
+ }
+
+ /**
+ * Upgrades a node from 2.2 to 3.x and DROP COMPACT just after the upgrade but _before_ upgrading the underlying
+ * sstables.
+ *
+ * <p>This test reproduces the issue from CASSANDRA-15897.
+ */
- public void dropCompactStorageBeforeUpgradeSstables(Versions.Major upgradeTo) throws Throwable
++ public void dropCompactStorageBeforeUpgradeSstables(Semver upgradeTo) throws Throwable
+ {
+ new TestCase()
+ .nodes(1)
- .upgrade(Versions.Major.v22, upgradeTo)
++ .singleUpgrade(v22, upgradeTo)
+ .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL).set("enable_drop_compact_storage", true))
+ .setup((cluster) -> {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (id int, ck int, v int, PRIMARY KEY (id, ck)) WITH COMPACT STORAGE");
+ for (int i = 0; i < 5; i++)
+ cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (id, ck, v) values (1, ?, ?)", ConsistencyLevel.ALL, i, i);
+ cluster.get(1).flush(KEYSPACE);
+ })
+ .runAfterNodeUpgrade((cluster, node) -> {
+ Throwable thrown = catchThrowable(() -> cluster.schemaChange("ALTER TABLE "+KEYSPACE+".tbl DROP COMPACT STORAGE"));
+ assertThat(thrown).hasMessageContainingAll("Cannot DROP COMPACT STORAGE as some nodes in the cluster",
+ "has some non-upgraded 2.x sstables");
+
+ assertThat(cluster.get(1).nodetool("upgradesstables")).isEqualTo(0);
+ cluster.schemaChange("ALTER TABLE "+KEYSPACE+".tbl DROP COMPACT STORAGE");
+ cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL);
+ })
+ .run();
+ }
+}
diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/MigrateDropColumnsTest.java
index 2c45a10,0000000..43027a4
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MigrateDropColumnsTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MigrateDropColumnsTest.java
@@@ -1,124 -1,0 +1,125 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
++import com.vdurmont.semver4j.Semver;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.QueryResults;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.shared.AssertUtils;
+import org.apache.cassandra.distributed.shared.Versions;
+import org.apache.cassandra.distributed.test.ThriftClientUtils;
+import org.apache.cassandra.thrift.Deletion;
+import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class MigrateDropColumnsTest extends UpgradeTestBase
+{
+ private static final MapType MAP_TYPE = MapType.getInstance(Int32Type.instance, Int32Type.instance, true);
+
+ @Test
+ public void dropColumns() throws Throwable
+ {
+ new TestCase()
- .upgrade(Versions.Major.v22, Versions.Major.v30)
++ .upgradesFrom(v22)
+// .upgrade(Versions.Major.v22, Versions.Major.v3X)
+// .upgrade(Versions.Major.v30, Versions.Major.v3X)
+// .upgrade(Versions.Major.v22, Versions.Major.v30, Versions.Major.v3X)
+ .withConfig(c -> c.with(Feature.NATIVE_PROTOCOL))
+ .setup(cluster -> {
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl(pk int, tables map<int, int>, PRIMARY KEY (pk))"));
+
+ ICoordinator coordinator = cluster.coordinator(1);
+
+ // write a RT to pk=0
+ ThriftClientUtils.thriftClient(cluster.get(1), thrift -> {
+ thrift.set_keyspace(KEYSPACE);
+
+ Mutation mutation = new Mutation();
+ Deletion deletion = new Deletion();
+ SlicePredicate slice = new SlicePredicate();
+ SliceRange range = new SliceRange();
+ range.setStart(CompositeType.build(ByteBufferUtil.bytes("tables")));
+ range.setFinish(CompositeType.build(ByteBufferUtil.bytes("tables")));
+ slice.setSlice_range(range);
+ deletion.setPredicate(slice);
+ deletion.setTimestamp(System.currentTimeMillis());
+ mutation.setDeletion(deletion);
+
+ thrift.batch_mutate(Collections.singletonMap(ByteBufferUtil.bytes(0),
+ Collections.singletonMap("tbl", Arrays.asList(mutation))),
+ org.apache.cassandra.thrift.ConsistencyLevel.ALL);
+ });
+
+ // write table to pk=1
+ // NOTE: because jvm-dtest doesn't support collections in the execute interface (see CASSANDRA-15969)
+ // need to encode to a ByteBuffer first
+ coordinator.execute(withKeyspace("INSERT INTO %s.tbl (pk, tables) VALUES (?, ?)"), ConsistencyLevel.ONE, 1, MAP_TYPE.decompose(ImmutableMap.of(1, 1)));
+
+ cluster.forEach(inst -> inst.flush(KEYSPACE));
+
+ cluster.schemaChange(withKeyspace("ALTER TABLE %s.tbl DROP tables"));
+ })
+ .runAfterClusterUpgrade(cluster -> {
+ ICoordinator coordinator = cluster.coordinator(1);
+ SimpleQueryResult qr = coordinator.executeWithResult("SELECT column_name " +
+ "FROM system_schema.dropped_columns " +
+ "WHERE keyspace_name=?" +
+ " AND table_name=?;",
+ ConsistencyLevel.ALL, KEYSPACE, "tbl");
+ Assert.assertEquals(ImmutableSet.of("tables"), Sets.newHashSet(qr.map(r -> r.getString("column_name"))));
+
+ assertRows(coordinator);
+
+ // upgradesstables, make sure everything is still working
+ cluster.forEach(n -> n.nodetoolResult("upgradesstables", KEYSPACE).asserts().success());
+
+ assertRows(coordinator);
+ })
+ .run();
+ }
+
+ private static void assertRows(ICoordinator coordinator)
+ {
+ // since only a RT was written to this row there is no liveness information, so the row will be skipped
+ AssertUtils.assertRows(
+ coordinator.executeWithResult(withKeyspace("SELECT * FROM %s.tbl WHERE pk=?"), ConsistencyLevel.ALL, 0),
+ QueryResults.empty());
+
+ AssertUtils.assertRows(
+ coordinator.executeWithResult(withKeyspace("SELECT * FROM %s.tbl WHERE pk=?"), ConsistencyLevel.ALL, 1),
+ QueryResults.builder().row(1).build());
+ }
+}
diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeRangeTombstoneTest.java
index e4b3a17,0000000..cd2c9f9
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeRangeTombstoneTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeRangeTombstoneTest.java
@@@ -1,73 -1,0 +1,73 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.shared.DistributedTestBase;
+import org.apache.cassandra.distributed.shared.Versions;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+
+/**
+ * Tests related to the handle of range tombstones during 2.x to 3.x upgrades.
+ */
+public class MixedModeRangeTombstoneTest extends UpgradeTestBase
+{
+ /**
+ * Tests the interaction of range tombstones covering multiple rows and collection tombsones within the covered
+ * rows.
+ *
+ * <p>This test reproduces the issue of CASSANDRA-15805.
+ */
+ @Test
+ public void multiRowsRangeTombstoneAndCollectionTombstoneInteractionTest() throws Throwable {
+ String tableName = DistributedTestBase.KEYSPACE + ".t";
+ String schema = "CREATE TABLE " + tableName + " (" +
+ " k int," +
+ " c1 text," +
+ " c2 text," +
+ " a text," +
+ " b set<text>," +
+ " c text," +
+ " PRIMARY KEY((k), c1, c2)" +
+ " )";
+
+
+ new TestCase()
+ .nodes(2)
- .upgrade(Versions.Major.v22, Versions.Major.v30)
++ .singleUpgrade(v22, v30)
+ .setup(cluster -> {
+ cluster.schemaChange(schema);
+ cluster.coordinator(1).execute(format("DELETE FROM %s USING TIMESTAMP 1 WHERE k = 0 AND c1 = 'A'", tableName), ConsistencyLevel.ALL);
+ cluster.coordinator(1).execute(format("INSERT INTO %s(k, c1, c2, a, b, c) VALUES (0, 'A', 'X', 'foo', {'whatever'}, 'bar') USING TIMESTAMP 2", tableName), ConsistencyLevel.ALL);
+ cluster.coordinator(1).execute(format("DELETE b FROM %s USING TIMESTAMP 3 WHERE k = 0 AND c1 = 'A' and c2 = 'X'", tableName), ConsistencyLevel.ALL);
+ cluster.get(1).flush(DistributedTestBase.KEYSPACE);
+ cluster.get(2).flush(DistributedTestBase.KEYSPACE);
+ })
+ .runAfterNodeUpgrade((cluster, node) -> {
+ assertRows(cluster.coordinator(node).execute(format("SELECT * FROM %s", tableName), ConsistencyLevel.ALL),
+ row(0, "A", "X", "foo", null, "bar"));
+ })
+ .run();
+ }
+}
diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
index e9391e0,549596b..b8b648b
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
@@@ -38,7 -32,7 +38,7 @@@ public class MixedModeReadRepairTest ex
{
new TestCase()
.nodes(2)
- .upgrade(Versions.Major.v22, Versions.Major.v30)
- .upgradesFrom(v22)
++ .singleUpgrade(v22, v30)
.setup((cluster) -> cluster.schemaChange("CREATE TABLE " + DistributedTestBase.KEYSPACE + ".tbl (pk ascii, b boolean, v blob, PRIMARY KEY (pk)) WITH COMPACT STORAGE"))
.runAfterNodeUpgrade((cluster, node) -> {
if (node != 1)
@@@ -56,82 -50,4 +56,82 @@@
.runAfterClusterUpgrade((cluster) -> cluster.get(2).forceCompact(DistributedTestBase.KEYSPACE, "tbl"))
.run();
}
+
+ @Test
+ public void mixedModeReadRepairDuplicateRows() throws Throwable
+ {
+ final String[] workload1 = new String[]
+ {
+ "DELETE FROM " + DistributedTestBase.KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 1 AND ck = 2;",
+ "INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, {'a':'b'}) USING TIMESTAMP 3;",
+ "INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, {'c':'d'}) USING TIMESTAMP 3;",
+ "INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, {'e':'f'}) USING TIMESTAMP 3;",
+ };
+
+ final String[] workload2 = new String[]
+ {
+ "INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, {'g':'h'}) USING TIMESTAMP 5;",
+ };
+
+ new TestCase()
+ .nodes(2)
- .upgrade(Versions.Major.v22, Versions.Major.v30)
++ .singleUpgrade(v22, v30)
+ .setup((cluster) ->
+ {
+ cluster.schemaChange("CREATE TABLE " + DistributedTestBase.KEYSPACE + ".tbl (pk int, ck int, v map<text, text>, PRIMARY KEY (pk, ck));");
+ })
+ .runAfterNodeUpgrade((cluster, node) ->
+ {
+ if (node == 2)
+ return;
+
+ // now node1 is 3.0 and node2 is 2.2
+ for (int i = 0; i < workload1.length; i++ )
+ cluster.coordinator(2).execute(workload1[i], ConsistencyLevel.QUORUM);
+
+ cluster.get(1).flush(KEYSPACE);
+ cluster.get(2).flush(KEYSPACE);
+
+ validate(cluster, 2, false);
+
+ for (int i = 0; i < workload2.length; i++ )
+ cluster.coordinator(2).execute(workload2[i], ConsistencyLevel.QUORUM);
+
+ cluster.get(1).flush(KEYSPACE);
+ cluster.get(2).flush(KEYSPACE);
+
+ validate(cluster, 1, true);
+ })
+ .run();
+ }
+
+ private void validate(UpgradeableCluster cluster, int nodeid, boolean local)
+ {
+ String query = "SELECT * FROM " + KEYSPACE + ".tbl";
+
+ Iterator<Object[]> iter = local
+ ? Iterators.forArray(cluster.get(nodeid).executeInternal(query))
+ : cluster.coordinator(nodeid).executeWithPaging(query, ConsistencyLevel.ALL, 2);
+
+ Object[] prevRow = null;
+ Object prevClustering = null;
+
+ while (iter.hasNext())
+ {
+ Object[] row = iter.next();
+ Object clustering = row[1];
+
+ if (clustering.equals(prevClustering))
+ {
+ fail(String.format("Duplicate rows on node %d in %s mode: \n%s\n%s",
+ nodeid,
+ local ? "local" : "distributed",
+ Arrays.toString(prevRow),
+ Arrays.toString(row)));
+ }
+
+ prevRow = row;
+ prevClustering = clustering;
+ }
+ }
}
diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/PagingTest.java
index 1af5856,7b958a7..30e248d
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/PagingTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/PagingTest.java
@@@ -48,7 -48,7 +48,7 @@@ public class PagingTest extends Upgrade
{
new UpgradeTestBase.TestCase()
.nodes(2)
- .upgrade(Versions.Major.v22, Versions.Major.v30)
- .upgradesFrom(v22)
++ .upgrades(v22, v30)
.nodesToUpgrade(2)
.withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
.setup((cluster) -> {
diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
index 07494d1,f1a4ffd..943e305
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
@@@ -55,59 -51,4 +55,36 @@@ public class UpgradeTest extends Upgrad
}).run();
}
+ @Test
+ public void mixedModePagingTest() throws Throwable
+ {
+ new TestCase()
- .upgrade(Versions.Major.v22, Versions.Major.v30)
++ .singleUpgrade(v22, v30)
+ .nodes(2)
+ .nodesToUpgrade(2)
+ .setup((cluster) -> {
+ cluster.schemaChange("ALTER KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) with compact storage");
+ for (int i = 0; i < 100; i++)
+ for (int j = 0; j < 200; j++)
+ cluster.coordinator(2).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, ?, 1)", ConsistencyLevel.ALL, i, j);
+ cluster.forEach((i) -> i.flush(KEYSPACE));
+ for (int i = 0; i < 100; i++)
+ for (int j = 10; j < 30; j++)
+ cluster.coordinator(2).execute("DELETE FROM " + KEYSPACE + ".tbl where pk=? and ck=?", ConsistencyLevel.ALL, i, j);
+ cluster.forEach((i) -> i.flush(KEYSPACE));
+ })
+ .runAfterClusterUpgrade((cluster) -> {
+ for (int i = 0; i < 100; i++)
+ {
+ for (int pageSize = 10; pageSize < 100; pageSize++)
+ {
+ Iterator<Object[]> res = cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
+ ConsistencyLevel.ALL,
+ pageSize, i);
+ Assert.assertEquals(180, Iterators.size(res));
+ }
+ }
+ }).run();
+ }
-
- @Test
- public void simpleUpgradeWithNetworkAndGossipTest() throws Throwable
- {
- new TestCase()
- .nodes(2)
- .nodesToUpgrade(1)
- .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP))
- .upgrade(Versions.Major.v30, Versions.Major.v4)
- .setup((cluster) -> {
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
- cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)", ConsistencyLevel.ALL);
- })
- .runAfterNodeUpgrade((cluster, node) -> {
- for (int i : new int[]{ 1, 2 })
- {
- assertRows(cluster.coordinator(i).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
- ConsistencyLevel.ALL,
- 1),
- row(1, 1, 1));
- }
- }).run();
- }
}
diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
index de29408,f81139d..db34c61
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
@@@ -70,6 -76,13 +76,15 @@@ public class UpgradeTestBase extends Di
public void run(UpgradeableCluster cluster, int node) throws Throwable;
}
- public static final Semver v22 = new Semver("2.2.0-beta1", SemverType.LOOSE);
++ public static final Semver v22 = new Semver("2.2", SemverType.LOOSE);
++ public static final Semver v30 = new Semver("3.0", SemverType.LOOSE);
+
- protected static final List<Pair<Semver,Semver>> SUPPORTED_UPGRADE_PATHS = ImmutableList.of(Pair.create(v22, v22));
++ protected static final List<Pair<Semver,Semver>> SUPPORTED_UPGRADE_PATHS = ImmutableList.of(
++ Pair.create(v22, v30));
+
+ // the last is always the current
+ public static final Semver CURRENT = SUPPORTED_UPGRADE_PATHS.get(SUPPORTED_UPGRADE_PATHS.size() - 1).right;
+
public static class TestVersions
{
final Version initial;
@@@ -160,11 -177,9 +186,11 @@@
if (setup == null)
throw new AssertionError();
if (upgrade.isEmpty())
- throw new AssertionError();
+ throw new AssertionError("no upgrade paths have been specified (or exist)");
if (runAfterClusterUpgrade == null && runAfterNodeUpgrade == null)
throw new AssertionError();
+ if (runBeforeNodeRestart == null)
+ runBeforeNodeRestart = (c, n) -> {};
if (runAfterClusterUpgrade == null)
runAfterClusterUpgrade = (c) -> {};
if (runAfterNodeUpgrade == null)
@@@ -179,19 -195,15 +206,16 @@@
{
setup.run(cluster);
- for (Version version : upgrade.upgrade)
+ for (int n : nodesToUpgrade)
{
- for (int n=1; n<=nodesToUpgrade.size(); n++)
- {
- cluster.get(n).shutdown().get();
- cluster.get(n).setVersion(version);
- runBeforeNodeRestart.run(cluster, n);
- cluster.get(n).startup();
- runAfterNodeUpgrade.run(cluster, n);
- }
-
- runAfterClusterUpgrade.run(cluster);
+ cluster.get(n).shutdown().get();
+ cluster.get(n).setVersion(upgrade.upgrade);
++ runBeforeNodeRestart.run(cluster, n);
+ cluster.get(n).startup();
+ runAfterNodeUpgrade.run(cluster, n);
}
+
+ runAfterClusterUpgrade.run(cluster);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org