You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2022/05/31 16:19:34 UTC
[cassandra] branch cassandra-3.11 updated: Split compact storage upgrade tests to prevent OOM
This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.11 by this push:
new 8a9ba8866d Split compact storage upgrade tests to prevent OOM
8a9ba8866d is described below
commit 8a9ba8866db6162a7b7352a260122d6e3c219567
Author: Andrés de la Peña <a....@gmail.com>
AuthorDate: Mon May 16 17:29:32 2022 +0100
Split compact storage upgrade tests to prevent OOM
patch by Andrés de la Peña; reviewed by Ekaterina Dimitrova for CASSANDRA-17213
---
.../upgrade/CompactStorage2to3UpgradeTest.java | 363 ---------------------
.../upgrade/CompactStorageMultiColumnTest.java | 65 ++++
.../upgrade/CompactStorageSingleColumnTest.java | 66 ++++
...opCompactStorageBeforeUpgradeSSTablesTest.java} | 3 +-
.../upgrade/DropCompactStorageTester.java | 66 ++++
...actStorageWithClusteringAndValueColumnTest.java | 120 +++++++
...DropCompactStorageWithDeletesAndWritesTest.java | 159 +++++++++
7 files changed, 477 insertions(+), 365 deletions(-)
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage2to3UpgradeTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage2to3UpgradeTest.java
deleted file mode 100644
index 7235c728d1..0000000000
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage2to3UpgradeTest.java
+++ /dev/null
@@ -1,363 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.upgrade;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Test;
-
-import org.apache.cassandra.distributed.UpgradeableCluster;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
-import org.apache.cassandra.distributed.api.ICoordinator;
-import org.apache.cassandra.distributed.api.IMessageFilters;
-import org.apache.cassandra.distributed.api.NodeToolResult;
-import org.apache.cassandra.distributed.shared.Versions;
-
-import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
-import static org.apache.cassandra.distributed.shared.AssertUtils.row;
-import static org.junit.Assert.assertEquals;
-import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
-import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
-import static org.apache.cassandra.distributed.api.Feature.NETWORK;
-
-
-public class CompactStorage2to3UpgradeTest extends UpgradeTestBase
-{
- @Test
- public void multiColumn() throws Throwable
- {
- new TestCase()
- .upgradesFrom(v22)
- .setup(cluster -> {
- assert cluster.size() == 3;
- int rf = cluster.size() - 1;
- assert rf == 2;
- cluster.schemaChange("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + (cluster.size() - 1) + "};");
- cluster.schemaChange("CREATE TABLE ks.tbl (pk int, v1 int, v2 text, PRIMARY KEY (pk)) WITH COMPACT STORAGE");
- ICoordinator coordinator = cluster.coordinator(1);
- // these shouldn't be replicated by the 3rd node
- coordinator.execute("INSERT INTO ks.tbl (pk, v1, v2) VALUES (3, 3, '3')", ConsistencyLevel.ALL);
- coordinator.execute("INSERT INTO ks.tbl (pk, v1, v2) VALUES (9, 9, '9')", ConsistencyLevel.ALL);
- for (int i = 0; i < cluster.size(); i++)
- {
- int nodeNum = i + 1;
- System.out.printf("****** node %s: %s%n", nodeNum, cluster.get(nodeNum).config());
- }
- })
- .runAfterNodeUpgrade(((cluster, node) -> {
- if (node != 2)
- return;
-
- Object[][] rows = cluster.coordinator(3).execute("SELECT * FROM ks.tbl LIMIT 2", ConsistencyLevel.ALL);
- Object[][] expected = {
- row(9, 9, "9"),
- row(3, 3, "3")
- };
- assertRows(rows, expected);
- })).run();
- }
-
- @Test
- public void singleColumn() throws Throwable
- {
- new TestCase()
- .upgradesFrom(v22)
- .setup(cluster -> {
- assert cluster.size() == 3;
- int rf = cluster.size() - 1;
- assert rf == 2;
- cluster.schemaChange("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + (cluster.size() - 1) + "};");
- cluster.schemaChange("CREATE TABLE ks.tbl (pk int, v int, PRIMARY KEY (pk)) WITH COMPACT STORAGE");
- ICoordinator coordinator = cluster.coordinator(1);
- // these shouldn't be replicated by the 3rd node
- coordinator.execute("INSERT INTO ks.tbl (pk, v) VALUES (3, 3)", ConsistencyLevel.ALL);
- coordinator.execute("INSERT INTO ks.tbl (pk, v) VALUES (9, 9)", ConsistencyLevel.ALL);
- for (int i = 0; i < cluster.size(); i++)
- {
- int nodeNum = i + 1;
- System.out.printf("****** node %s: %s%n", nodeNum, cluster.get(nodeNum).config());
- }
- })
- .runAfterNodeUpgrade(((cluster, node) -> {
-
- if (node < 2)
- return;
-
- Object[][] rows = cluster.coordinator(3).execute("SELECT * FROM ks.tbl LIMIT 2", ConsistencyLevel.ALL);
- Object[][] expected = {
- row(9, 9),
- row(3, 3)
- };
- assertRows(rows, expected);
- })).run();
- }
-
- @Test
- public void testDropCompactWithClusteringAndValueColumn() throws Throwable
- {
- final String table = "clustering_and_value";
- final int partitions = 10;
- final int rowsPerPartition = 10;
-
- final ResultsRecorder recorder = new ResultsRecorder();
- new TestCase()
- .nodes(2)
- .upgradesFrom(v22)
- .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
- .setup(cluster -> {
- cluster.schemaChange(String.format(
- "CREATE TABLE %s.%s (key int, c1 int, c2 int, c3 int, PRIMARY KEY (key, c1, c2)) WITH COMPACT STORAGE",
- KEYSPACE, table));
- ICoordinator coordinator = cluster.coordinator(1);
-
- for (int i = 1; i <= partitions; i++)
- {
- for (int j = 1; j <= rowsPerPartition; j++)
- {
- coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 1, 1)",
- KEYSPACE, table, i, j), ConsistencyLevel.ALL);
- coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 2, 1)",
- KEYSPACE, table, i, j), ConsistencyLevel.ALL);
- }
- }
-
- runQueries(cluster.coordinator(1), recorder, new String[] {
- String.format("SELECT * FROM %s.%s", KEYSPACE, table),
-
- String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
- KEYSPACE, table, partitions - 3, rowsPerPartition - 2),
-
- String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
- KEYSPACE, table, partitions - 1, rowsPerPartition - 5),
-
- String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
- KEYSPACE, table, partitions - 1, rowsPerPartition - 5, 1),
-
- String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
- KEYSPACE, table, partitions - 4, rowsPerPartition - 9, 1),
-
- String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 > %d",
- KEYSPACE, table, partitions - 4, rowsPerPartition - 9, 1),
-
- String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 > %d",
- KEYSPACE, table, partitions - 4, rowsPerPartition - 9, 2),
-
- String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d",
- KEYSPACE, table, partitions - 8, rowsPerPartition - 3),
-
- });
- }).runBeforeNodeRestart((cluster, node) ->
- {
- cluster.get(node).config().set("enable_drop_compact_storage", true);
-
-
- }).runAfterClusterUpgrade(cluster ->
- {
- for (int i = 1; i <= cluster.size(); i++)
- {
- NodeToolResult result = cluster.get(i).nodetoolResult("upgradesstables");
- assertEquals("upgrade sstables failed for node " + i, 0, result.getRc());
- }
- Thread.sleep(1000);
-
- // make sure the results are the same after upgrade and upgrade sstables but before dropping compact storage
- recorder.validateResults(cluster, 1);
- recorder.validateResults(cluster, 2);
-
- // make sure the results are the same after dropping compact storage on only the first node
- IMessageFilters.Filter filter = cluster.verbs().allVerbs().to(2).drop();
- cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 1);
-
- recorder.validateResults(cluster, 1, ConsistencyLevel.ONE);
-
- filter.off();
- recorder.validateResults(cluster, 1);
- recorder.validateResults(cluster, 2);
-
- // make sure the results continue to be the same after dropping compact storage on the second node
- cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 2);
- recorder.validateResults(cluster, 1);
- recorder.validateResults(cluster, 2);
- })
- .run();
- }
-
- @Test
- public void testDropCompactWithClusteringAndValueColumnWithDeletesAndWrites() throws Throwable
- {
- final String table = "clustering_and_value_with_deletes";
- final int partitions = 10;
- final int rowsPerPartition = 10;
- final int additionalParititons = 5;
-
- new TestCase()
- .nodes(2)
- .upgradesFrom(v22)
- .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL).set("enable_drop_compact_storage", true))
- .setup(cluster -> {
- cluster.schemaChange(String.format(
- "CREATE TABLE %s.%s (key int, c1 int, c2 int, c3 int, PRIMARY KEY (key, c1, c2)) WITH COMPACT STORAGE",
- KEYSPACE, table));
- ICoordinator coordinator = cluster.coordinator(1);
-
- for (int i = 1; i <= partitions; i++)
- {
- for (int j = 1; j <= rowsPerPartition; j++)
- {
- coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 1, 1)",
- KEYSPACE, table, i, j), ConsistencyLevel.ALL);
- coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 2, 2)",
- KEYSPACE, table, i, j), ConsistencyLevel.ALL);
- coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 3, 3)",
- KEYSPACE, table, i, j), ConsistencyLevel.ALL);
- }
- }
-
- })
- .runAfterClusterUpgrade(cluster -> {
- cluster.forEach(n -> n.nodetoolResult("upgradesstables", KEYSPACE).asserts().success());
- Thread.sleep(1000);
-
- // drop compact storage on only one node before performing writes
- IMessageFilters.Filter filter = cluster.verbs().allVerbs().to(2).drop();
- cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 1);
- filter.off();
-
- // add new partitions and delete some of the old ones
- ICoordinator coordinator = cluster.coordinator(1);
- for (int i = 0; i < additionalParititons; i++)
- {
- for (int j = 1; j <= rowsPerPartition; j++)
- {
- coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 1, 1)",
- KEYSPACE, table, i, j), ConsistencyLevel.ALL);
- }
- }
-
- coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d",
- KEYSPACE, table, 0, 3), ConsistencyLevel.ALL);
-
- coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d",
- KEYSPACE, table, 1), ConsistencyLevel.ALL);
-
- coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
- KEYSPACE, table, 7, 2, 2), ConsistencyLevel.ALL);
-
- coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
- KEYSPACE, table, 7, 6, 1), ConsistencyLevel.ALL);
-
- coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
- KEYSPACE, table, 4, 1, 1), ConsistencyLevel.ALL);
-
- coordinator.execute(String.format("DELETE c3 FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
- KEYSPACE, table, 8, 1, 3), ConsistencyLevel.ALL);
-
- coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 > 1",
- KEYSPACE, table, 6, 2), ConsistencyLevel.ALL);
-
- ResultsRecorder recorder = new ResultsRecorder();
- runQueries(coordinator, recorder, new String[] {
- String.format("SELECT * FROM %s.%s", KEYSPACE, table),
-
- String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
- KEYSPACE, table, partitions - 3, rowsPerPartition - 2),
-
- String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
- KEYSPACE, table, partitions - 1, rowsPerPartition - 5),
-
-
- String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d",
- KEYSPACE, table, partitions - 8, rowsPerPartition - 3),
-
- String.format("SELECT * FROM %s.%s WHERE key = %d",
- KEYSPACE, table, 7),
-
- String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
- KEYSPACE, table, 7, 2),
-
- String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
- KEYSPACE, table, 8, 1),
-
- String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d and c1 = %d",
- KEYSPACE, table, 8, 1),
-
- String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d and c1 = %d",
- KEYSPACE, table, 8, 1),
-
- String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d and c1 = %d",
- KEYSPACE, table, 4, 1),
-
- String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d",
- KEYSPACE, table, 6),
-
- String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d",
- KEYSPACE, table, 0, 1),
-
- String.format("SELECT * FROM %s.%s WHERE key = %d",
- KEYSPACE, table, partitions - (additionalParititons - 2)),
-
- String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d",
- KEYSPACE, table, partitions - (additionalParititons - 3), 4)
-
- });
-
- // drop compact storage on remaining node and check result
- cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 2);
- recorder.validateResults(cluster, 1);
- recorder.validateResults(cluster, 2);
- }).run();
- }
-
- private void runQueries(ICoordinator coordinator, ResultsRecorder helper, String[] queries)
- {
- for (String query : queries)
- helper.addResult(query, coordinator.execute(query, ConsistencyLevel.ALL));
- }
-
- public static class ResultsRecorder
- {
- final private Map<String, Object[][]> preUpgradeResults = new HashMap<>();
-
- public void addResult(String query, Object[][] results)
- {
- preUpgradeResults.put(query, results);
- }
-
- public Map<String, Object[][]> queriesAndResults()
- {
- return preUpgradeResults;
- }
-
- public void validateResults(UpgradeableCluster cluster, int node)
- {
- validateResults(cluster, node, ConsistencyLevel.ALL);
- }
-
- public void validateResults(UpgradeableCluster cluster, int node, ConsistencyLevel cl)
- {
- for (Map.Entry<String, Object[][]> entry : queriesAndResults().entrySet())
- {
- Object[][] postUpgradeResult = cluster.coordinator(node).execute(entry.getKey(), cl);
- assertRows(postUpgradeResult, entry.getValue());
- }
-
- }
- }
-}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorageMultiColumnTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorageMultiColumnTest.java
new file mode 100644
index 0000000000..086982ead2
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorageMultiColumnTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+
+
+public class CompactStorageMultiColumnTest extends UpgradeTestBase
+{
+ @Test
+ public void multiColumn() throws Throwable
+ {
+ new TestCase()
+ .upgradesFrom(v22)
+ .setup(cluster -> {
+ assert cluster.size() == 3;
+ int rf = cluster.size() - 1;
+ assert rf == 2;
+ cluster.schemaChange("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + (cluster.size() - 1) + "};");
+ cluster.schemaChange("CREATE TABLE ks.tbl (pk int, v1 int, v2 text, PRIMARY KEY (pk)) WITH COMPACT STORAGE");
+ ICoordinator coordinator = cluster.coordinator(1);
+ // these shouldn't be replicated by the 3rd node
+ coordinator.execute("INSERT INTO ks.tbl (pk, v1, v2) VALUES (3, 3, '3')", ConsistencyLevel.ALL);
+ coordinator.execute("INSERT INTO ks.tbl (pk, v1, v2) VALUES (9, 9, '9')", ConsistencyLevel.ALL);
+ for (int i = 0; i < cluster.size(); i++)
+ {
+ int nodeNum = i + 1;
+ System.out.printf("****** node %s: %s%n", nodeNum, cluster.get(nodeNum).config());
+ }
+ })
+ .runAfterNodeUpgrade(((cluster, node) -> {
+ if (node != 2)
+ return;
+
+ Object[][] rows = cluster.coordinator(3).execute("SELECT * FROM ks.tbl LIMIT 2", ConsistencyLevel.ALL);
+ Object[][] expected = {
+ row(9, 9, "9"),
+ row(3, 3, "3")
+ };
+ assertRows(rows, expected);
+ })).run();
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorageSingleColumnTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorageSingleColumnTest.java
new file mode 100644
index 0000000000..8a7170f28d
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorageSingleColumnTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+
+
+public class CompactStorageSingleColumnTest extends UpgradeTestBase
+{
+ @Test
+ public void singleColumn() throws Throwable
+ {
+ new TestCase()
+ .upgradesFrom(v22)
+ .setup(cluster -> {
+ assert cluster.size() == 3;
+ int rf = cluster.size() - 1;
+ assert rf == 2;
+ cluster.schemaChange("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + (cluster.size() - 1) + "};");
+ cluster.schemaChange("CREATE TABLE ks.tbl (pk int, v int, PRIMARY KEY (pk)) WITH COMPACT STORAGE");
+ ICoordinator coordinator = cluster.coordinator(1);
+ // these shouldn't be replicated by the 3rd node
+ coordinator.execute("INSERT INTO ks.tbl (pk, v) VALUES (3, 3)", ConsistencyLevel.ALL);
+ coordinator.execute("INSERT INTO ks.tbl (pk, v) VALUES (9, 9)", ConsistencyLevel.ALL);
+ for (int i = 0; i < cluster.size(); i++)
+ {
+ int nodeNum = i + 1;
+ System.out.printf("****** node %s: %s%n", nodeNum, cluster.get(nodeNum).config());
+ }
+ })
+ .runAfterNodeUpgrade(((cluster, node) -> {
+
+ if (node < 2)
+ return;
+
+ Object[][] rows = cluster.coordinator(3).execute("SELECT * FROM ks.tbl LIMIT 2", ConsistencyLevel.ALL);
+ Object[][] expected = {
+ row(9, 9),
+ row(3, 3)
+ };
+ assertRows(rows, expected);
+ })).run();
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageBeforeUpgradeSSTablesTest.java
similarity index 96%
rename from test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageTest.java
rename to test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageBeforeUpgradeSSTablesTest.java
index 80ce02afe0..2fd94894ff 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageBeforeUpgradeSSTablesTest.java
@@ -22,7 +22,6 @@ import com.vdurmont.semver4j.Semver;
import org.junit.Test;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
-import org.apache.cassandra.distributed.shared.Versions;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
@@ -30,7 +29,7 @@ import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowable;
-public class DropCompactStorageTest extends UpgradeTestBase
+public class DropCompactStorageBeforeUpgradeSSTablesTest extends DropCompactStorageTester
{
@Test
public void dropCompactStorageBeforeUpgradesstablesTo3X() throws Throwable
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageTester.java b/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageTester.java
new file mode 100644
index 0000000000..bd9f4d67ad
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageTester.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+
+public abstract class DropCompactStorageTester extends UpgradeTestBase
+{
+ protected void runQueries(ICoordinator coordinator, ResultsRecorder helper, String[] queries)
+ {
+ for (String query : queries)
+ helper.addResult(query, coordinator.execute(query, ConsistencyLevel.ALL));
+ }
+
+ public static class ResultsRecorder
+ {
+ final private Map<String, Object[][]> preUpgradeResults = new HashMap<>();
+
+ public void addResult(String query, Object[][] results)
+ {
+ preUpgradeResults.put(query, results);
+ }
+
+ public Map<String, Object[][]> queriesAndResults()
+ {
+ return preUpgradeResults;
+ }
+
+ public void validateResults(UpgradeableCluster cluster, int node)
+ {
+ validateResults(cluster, node, ConsistencyLevel.ALL);
+ }
+
+ public void validateResults(UpgradeableCluster cluster, int node, ConsistencyLevel cl)
+ {
+ for (Map.Entry<String, Object[][]> entry : queriesAndResults().entrySet())
+ {
+ Object[][] postUpgradeResult = cluster.coordinator(node).execute(entry.getKey(), cl);
+ assertRows(postUpgradeResult, entry.getValue());
+ }
+ }
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageWithClusteringAndValueColumnTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageWithClusteringAndValueColumnTest.java
new file mode 100644
index 0000000000..53042d0ea0
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageWithClusteringAndValueColumnTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.junit.Assert.assertEquals;
+
+
+public class DropCompactStorageWithClusteringAndValueColumnTest extends DropCompactStorageTester
+{
+ @Test
+ public void testDropCompactWithClusteringAndValueColumn() throws Throwable
+ {
+ final String table = "clustering_and_value";
+ final int partitions = 10;
+ final int rowsPerPartition = 10;
+
+ final ResultsRecorder recorder = new ResultsRecorder();
+ new TestCase()
+ .nodes(2)
+ .upgradesFrom(v22)
+ .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
+ .setup(cluster -> {
+ cluster.schemaChange(String.format(
+ "CREATE TABLE %s.%s (key int, c1 int, c2 int, c3 int, PRIMARY KEY (key, c1, c2)) WITH COMPACT STORAGE",
+ KEYSPACE, table));
+ ICoordinator coordinator = cluster.coordinator(1);
+
+ for (int i = 1; i <= partitions; i++)
+ {
+ for (int j = 1; j <= rowsPerPartition; j++)
+ {
+ coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 1, 1)",
+ KEYSPACE, table, i, j), ConsistencyLevel.ALL);
+ coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 2, 1)",
+ KEYSPACE, table, i, j), ConsistencyLevel.ALL);
+ }
+ }
+
+ runQueries(cluster.coordinator(1), recorder, new String[]{
+ String.format("SELECT * FROM %s.%s", KEYSPACE, table),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
+ KEYSPACE, table, partitions - 3, rowsPerPartition - 2),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
+ KEYSPACE, table, partitions - 1, rowsPerPartition - 5),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
+ KEYSPACE, table, partitions - 1, rowsPerPartition - 5, 1),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
+ KEYSPACE, table, partitions - 4, rowsPerPartition - 9, 1),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 > %d",
+ KEYSPACE, table, partitions - 4, rowsPerPartition - 9, 1),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 > %d",
+ KEYSPACE, table, partitions - 4, rowsPerPartition - 9, 2),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d",
+ KEYSPACE, table, partitions - 8, rowsPerPartition - 3),
+ });
+ }).runBeforeNodeRestart((cluster, node) -> {
+ cluster.get(node).config().set("enable_drop_compact_storage", true);
+ }).runAfterClusterUpgrade(cluster -> {
+ for (int i = 1; i <= cluster.size(); i++)
+ {
+ NodeToolResult result = cluster.get(i).nodetoolResult("upgradesstables");
+ assertEquals("upgrade sstables failed for node " + i, 0, result.getRc());
+ }
+ Thread.sleep(1000);
+
+ // make sure the results are the same after upgrade and upgrade sstables but before dropping compact storage
+ recorder.validateResults(cluster, 1);
+ recorder.validateResults(cluster, 2);
+
+ // make sure the results are the same after dropping compact storage on only the first node
+ IMessageFilters.Filter filter = cluster.verbs().allVerbs().to(2).drop();
+ cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 1);
+
+ recorder.validateResults(cluster, 1, ConsistencyLevel.ONE);
+
+ filter.off();
+ recorder.validateResults(cluster, 1);
+ recorder.validateResults(cluster, 2);
+
+ // make sure the results continue to be the same after dropping compact storage on the second node
+ cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 2);
+ recorder.validateResults(cluster, 1);
+ recorder.validateResults(cluster, 2);
+ })
+ .run();
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageWithDeletesAndWritesTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageWithDeletesAndWritesTest.java
new file mode 100644
index 0000000000..9d9e278fde
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageWithDeletesAndWritesTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+
+public class DropCompactStorageWithDeletesAndWritesTest extends DropCompactStorageTester
+{
+ @Test
+ public void testDropCompactWithClusteringAndValueColumnWithDeletesAndWrites() throws Throwable
+ {
+ final String table = "clustering_and_value_with_deletes";
+ final int partitions = 10;
+ final int rowsPerPartition = 10;
+ final int additionalParititons = 5;
+
+ new TestCase()
+ .nodes(2)
+ .upgradesFrom(v22)
+ .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL).set("enable_drop_compact_storage", true))
+ .setup(cluster -> {
+ cluster.schemaChange(String.format(
+ "CREATE TABLE %s.%s (key int, c1 int, c2 int, c3 int, PRIMARY KEY (key, c1, c2)) WITH COMPACT STORAGE",
+ KEYSPACE, table));
+ ICoordinator coordinator = cluster.coordinator(1);
+
+ for (int i = 1; i <= partitions; i++)
+ {
+ for (int j = 1; j <= rowsPerPartition; j++)
+ {
+ coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 1, 1)",
+ KEYSPACE, table, i, j), ConsistencyLevel.ALL);
+ coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 2, 2)",
+ KEYSPACE, table, i, j), ConsistencyLevel.ALL);
+ coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 3, 3)",
+ KEYSPACE, table, i, j), ConsistencyLevel.ALL);
+ }
+ }
+
+ })
+ .runAfterClusterUpgrade(cluster -> {
+ cluster.forEach(n -> n.nodetoolResult("upgradesstables", KEYSPACE).asserts().success());
+ Thread.sleep(1000);
+
+ // drop compact storage on only one node before performing writes
+ IMessageFilters.Filter filter = cluster.verbs().allVerbs().to(2).drop();
+ cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 1);
+ filter.off();
+
+ // add new partitions and delete some of the old ones
+ ICoordinator coordinator = cluster.coordinator(1);
+ for (int i = 0; i < additionalParititons; i++)
+ {
+ for (int j = 1; j <= rowsPerPartition; j++)
+ {
+ coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 1, 1)",
+ KEYSPACE, table, i, j), ConsistencyLevel.ALL);
+ }
+ }
+
+ coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d",
+ KEYSPACE, table, 0, 3), ConsistencyLevel.ALL);
+
+ coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d",
+ KEYSPACE, table, 1), ConsistencyLevel.ALL);
+
+ coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
+ KEYSPACE, table, 7, 2, 2), ConsistencyLevel.ALL);
+
+ coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
+ KEYSPACE, table, 7, 6, 1), ConsistencyLevel.ALL);
+
+ coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
+ KEYSPACE, table, 4, 1, 1), ConsistencyLevel.ALL);
+
+ coordinator.execute(String.format("DELETE c3 FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
+ KEYSPACE, table, 8, 1, 3), ConsistencyLevel.ALL);
+
+ coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 > 1",
+ KEYSPACE, table, 6, 2), ConsistencyLevel.ALL);
+
+ ResultsRecorder recorder = new ResultsRecorder();
+ runQueries(coordinator, recorder, new String[] {
+ String.format("SELECT * FROM %s.%s", KEYSPACE, table),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
+ KEYSPACE, table, partitions - 3, rowsPerPartition - 2),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
+ KEYSPACE, table, partitions - 1, rowsPerPartition - 5),
+
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d",
+ KEYSPACE, table, partitions - 8, rowsPerPartition - 3),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d",
+ KEYSPACE, table, 7),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
+ KEYSPACE, table, 7, 2),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
+ KEYSPACE, table, 8, 1),
+
+ String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d and c1 = %d",
+ KEYSPACE, table, 8, 1),
+
+ String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d and c1 = %d",
+ KEYSPACE, table, 8, 1),
+
+ String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d and c1 = %d",
+ KEYSPACE, table, 4, 1),
+
+ String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d",
+ KEYSPACE, table, 6),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d",
+ KEYSPACE, table, 0, 1),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d",
+ KEYSPACE, table, partitions - (additionalParititons - 2)),
+
+ String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d",
+ KEYSPACE, table, partitions - (additionalParititons - 3), 4)
+
+ });
+
+ // drop compact storage on remaining node and check result
+ cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 2);
+ recorder.validateResults(cluster, 1);
+ recorder.validateResults(cluster, 2);
+ }).run();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org