You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2022/05/31 16:19:34 UTC

[cassandra] branch cassandra-3.11 updated: Split compact storage upgrade tests to prevent OOM

This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.11 by this push:
     new 8a9ba8866d Split compact storage upgrade tests to prevent OOM
8a9ba8866d is described below

commit 8a9ba8866db6162a7b7352a260122d6e3c219567
Author: Andrés de la Peña <a....@gmail.com>
AuthorDate: Mon May 16 17:29:32 2022 +0100

    Split compact storage upgrade tests to prevent OOM
    
    patch by Andrés de la Peña; reviewed by Ekaterina Dimitrova for CASSANDRA-17213
---
 .../upgrade/CompactStorage2to3UpgradeTest.java     | 363 ---------------------
 .../upgrade/CompactStorageMultiColumnTest.java     |  65 ++++
 .../upgrade/CompactStorageSingleColumnTest.java    |  66 ++++
 ...opCompactStorageBeforeUpgradeSSTablesTest.java} |   3 +-
 .../upgrade/DropCompactStorageTester.java          |  66 ++++
 ...actStorageWithClusteringAndValueColumnTest.java | 120 +++++++
 ...DropCompactStorageWithDeletesAndWritesTest.java | 159 +++++++++
 7 files changed, 477 insertions(+), 365 deletions(-)

diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage2to3UpgradeTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage2to3UpgradeTest.java
deleted file mode 100644
index 7235c728d1..0000000000
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage2to3UpgradeTest.java
+++ /dev/null
@@ -1,363 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.upgrade;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Test;
-
-import org.apache.cassandra.distributed.UpgradeableCluster;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
-import org.apache.cassandra.distributed.api.ICoordinator;
-import org.apache.cassandra.distributed.api.IMessageFilters;
-import org.apache.cassandra.distributed.api.NodeToolResult;
-import org.apache.cassandra.distributed.shared.Versions;
-
-import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
-import static org.apache.cassandra.distributed.shared.AssertUtils.row;
-import static org.junit.Assert.assertEquals;
-import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
-import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
-import static org.apache.cassandra.distributed.api.Feature.NETWORK;
-
-
-public class CompactStorage2to3UpgradeTest extends UpgradeTestBase
-{
-    @Test
-    public void multiColumn() throws Throwable
-    {
-        new TestCase()
-        .upgradesFrom(v22)
-        .setup(cluster -> {
-            assert cluster.size() == 3;
-            int rf = cluster.size() - 1;
-            assert rf == 2;
-            cluster.schemaChange("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + (cluster.size() - 1) + "};");
-            cluster.schemaChange("CREATE TABLE ks.tbl (pk int, v1 int, v2 text, PRIMARY KEY (pk)) WITH COMPACT STORAGE");
-            ICoordinator coordinator = cluster.coordinator(1);
-            // these shouldn't be replicated by the 3rd node
-            coordinator.execute("INSERT INTO ks.tbl (pk, v1, v2) VALUES (3, 3, '3')", ConsistencyLevel.ALL);
-            coordinator.execute("INSERT INTO ks.tbl (pk, v1, v2) VALUES (9, 9, '9')", ConsistencyLevel.ALL);
-            for (int i = 0; i < cluster.size(); i++)
-            {
-                int nodeNum = i + 1;
-                System.out.printf("****** node %s: %s%n", nodeNum, cluster.get(nodeNum).config());
-            }
-        })
-        .runAfterNodeUpgrade(((cluster, node) -> {
-            if (node != 2)
-                return;
-
-            Object[][] rows = cluster.coordinator(3).execute("SELECT * FROM ks.tbl LIMIT 2", ConsistencyLevel.ALL);
-            Object[][] expected = {
-            row(9, 9, "9"),
-            row(3, 3, "3")
-            };
-            assertRows(rows, expected);
-        })).run();
-    }
-
-    @Test
-    public void singleColumn() throws Throwable
-    {
-        new TestCase()
-        .upgradesFrom(v22)
-        .setup(cluster -> {
-            assert cluster.size() == 3;
-            int rf = cluster.size() - 1;
-            assert rf == 2;
-            cluster.schemaChange("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + (cluster.size() - 1) + "};");
-            cluster.schemaChange("CREATE TABLE ks.tbl (pk int, v int, PRIMARY KEY (pk)) WITH COMPACT STORAGE");
-            ICoordinator coordinator = cluster.coordinator(1);
-            // these shouldn't be replicated by the 3rd node
-            coordinator.execute("INSERT INTO ks.tbl (pk, v) VALUES (3, 3)", ConsistencyLevel.ALL);
-            coordinator.execute("INSERT INTO ks.tbl (pk, v) VALUES (9, 9)", ConsistencyLevel.ALL);
-            for (int i = 0; i < cluster.size(); i++)
-            {
-                int nodeNum = i + 1;
-                System.out.printf("****** node %s: %s%n", nodeNum, cluster.get(nodeNum).config());
-            }
-        })
-        .runAfterNodeUpgrade(((cluster, node) -> {
-
-            if (node < 2)
-                return;
-
-            Object[][] rows = cluster.coordinator(3).execute("SELECT * FROM ks.tbl LIMIT 2", ConsistencyLevel.ALL);
-            Object[][] expected = {
-            row(9, 9),
-            row(3, 3)
-            };
-            assertRows(rows, expected);
-        })).run();
-    }
-
-    @Test
-    public void testDropCompactWithClusteringAndValueColumn() throws Throwable
-    {
-        final String table = "clustering_and_value";
-        final int partitions = 10;
-        final int rowsPerPartition = 10;
-
-        final ResultsRecorder recorder = new ResultsRecorder();
-        new TestCase()
-        .nodes(2)
-        .upgradesFrom(v22)
-        .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
-        .setup(cluster -> {
-            cluster.schemaChange(String.format(
-            "CREATE TABLE %s.%s (key int, c1 int, c2 int, c3 int, PRIMARY KEY (key, c1, c2)) WITH COMPACT STORAGE",
-            KEYSPACE, table));
-            ICoordinator coordinator = cluster.coordinator(1);
-
-            for (int i = 1; i <= partitions; i++)
-            {
-                for (int j = 1; j <= rowsPerPartition; j++)
-                {
-                    coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 1, 1)",
-                                                      KEYSPACE, table, i, j), ConsistencyLevel.ALL);
-                    coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 2, 1)",
-                                                      KEYSPACE, table, i, j), ConsistencyLevel.ALL);
-                }
-            }
-
-            runQueries(cluster.coordinator(1), recorder, new String[] {
-            String.format("SELECT * FROM %s.%s", KEYSPACE, table),
-
-            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
-                          KEYSPACE, table, partitions - 3, rowsPerPartition - 2),
-
-            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
-                          KEYSPACE, table, partitions - 1, rowsPerPartition - 5),
-
-            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
-                          KEYSPACE, table, partitions - 1, rowsPerPartition - 5, 1),
-
-            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
-                          KEYSPACE, table, partitions - 4, rowsPerPartition - 9, 1),
-
-            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 > %d",
-                          KEYSPACE, table, partitions - 4, rowsPerPartition - 9, 1),
-
-            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 > %d",
-                          KEYSPACE, table, partitions - 4, rowsPerPartition - 9, 2),
-
-            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d",
-                          KEYSPACE, table, partitions - 8, rowsPerPartition - 3),
-
-            });
-        }).runBeforeNodeRestart((cluster, node) ->
-        {
-            cluster.get(node).config().set("enable_drop_compact_storage", true);
-
-
-        }).runAfterClusterUpgrade(cluster ->
-                                          {
-                                              for (int i = 1; i <= cluster.size(); i++)
-                                              {
-                                                  NodeToolResult result = cluster.get(i).nodetoolResult("upgradesstables");
-                                                  assertEquals("upgrade sstables failed for node " + i, 0, result.getRc());
-                                              }
-                                              Thread.sleep(1000);
-
-                                              // make sure the results are the same after upgrade and upgrade sstables but before dropping compact storage
-                                              recorder.validateResults(cluster, 1);
-                                              recorder.validateResults(cluster, 2);
-
-                                              // make sure the results are the same after dropping compact storage on only the first node
-                                              IMessageFilters.Filter filter = cluster.verbs().allVerbs().to(2).drop();
-                                              cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 1);
-
-                                              recorder.validateResults(cluster, 1, ConsistencyLevel.ONE);
-
-                                              filter.off();
-                                              recorder.validateResults(cluster, 1);
-                                              recorder.validateResults(cluster, 2);
-
-                                              // make sure the results continue to be the same after dropping compact storage on the second node
-                                              cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 2);
-                                              recorder.validateResults(cluster, 1);
-                                              recorder.validateResults(cluster, 2);
-                                          })
-        .run();
-    }
-
-    @Test
-    public void testDropCompactWithClusteringAndValueColumnWithDeletesAndWrites() throws Throwable
-    {
-        final String table = "clustering_and_value_with_deletes";
-        final int partitions = 10;
-        final int rowsPerPartition = 10;
-        final int additionalParititons = 5;
-
-        new TestCase()
-        .nodes(2)
-        .upgradesFrom(v22)
-        .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL).set("enable_drop_compact_storage", true))
-        .setup(cluster -> {
-            cluster.schemaChange(String.format(
-            "CREATE TABLE %s.%s (key int, c1 int, c2 int, c3 int, PRIMARY KEY (key, c1, c2)) WITH COMPACT STORAGE",
-            KEYSPACE, table));
-            ICoordinator coordinator = cluster.coordinator(1);
-
-            for (int i = 1; i <= partitions; i++)
-            {
-                for (int j = 1; j <= rowsPerPartition; j++)
-                {
-                    coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 1, 1)",
-                                                      KEYSPACE, table, i, j), ConsistencyLevel.ALL);
-                    coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 2, 2)",
-                                                      KEYSPACE, table, i, j), ConsistencyLevel.ALL);
-                    coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 3, 3)",
-                                                      KEYSPACE, table, i, j), ConsistencyLevel.ALL);
-                }
-            }
-
-        })
-        .runAfterClusterUpgrade(cluster -> {
-            cluster.forEach(n -> n.nodetoolResult("upgradesstables", KEYSPACE).asserts().success());
-            Thread.sleep(1000);
-
-            // drop compact storage on only one node before performing writes
-            IMessageFilters.Filter filter = cluster.verbs().allVerbs().to(2).drop();
-            cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 1);
-            filter.off();
-
-            // add new partitions and delete some of the old ones
-            ICoordinator coordinator = cluster.coordinator(1);
-            for (int i = 0; i < additionalParititons; i++)
-            {
-                for (int j = 1; j <= rowsPerPartition; j++)
-                {
-                    coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 1, 1)",
-                                                      KEYSPACE, table, i, j), ConsistencyLevel.ALL);
-                }
-            }
-
-            coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d",
-                                              KEYSPACE, table, 0, 3), ConsistencyLevel.ALL);
-
-            coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d",
-                                              KEYSPACE, table, 1), ConsistencyLevel.ALL);
-
-            coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
-                                              KEYSPACE, table, 7, 2, 2), ConsistencyLevel.ALL);
-
-            coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
-                                              KEYSPACE, table, 7, 6, 1), ConsistencyLevel.ALL);
-
-            coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
-                                              KEYSPACE, table, 4, 1, 1), ConsistencyLevel.ALL);
-
-            coordinator.execute(String.format("DELETE c3 FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
-                                              KEYSPACE, table, 8, 1, 3), ConsistencyLevel.ALL);
-
-            coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 > 1",
-                                              KEYSPACE, table, 6, 2), ConsistencyLevel.ALL);
-
-            ResultsRecorder recorder = new ResultsRecorder();
-            runQueries(coordinator, recorder, new String[] {
-            String.format("SELECT * FROM %s.%s", KEYSPACE, table),
-
-            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
-                          KEYSPACE, table, partitions - 3, rowsPerPartition - 2),
-
-            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
-                          KEYSPACE, table, partitions - 1, rowsPerPartition - 5),
-
-
-            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d",
-                          KEYSPACE, table, partitions - 8, rowsPerPartition - 3),
-
-            String.format("SELECT * FROM %s.%s WHERE key = %d",
-                          KEYSPACE, table, 7),
-
-            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
-                          KEYSPACE, table, 7, 2),
-
-            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
-                          KEYSPACE, table, 8, 1),
-
-            String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d and c1 = %d",
-                          KEYSPACE, table, 8, 1),
-
-            String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d and c1 = %d",
-                          KEYSPACE, table, 8, 1),
-
-            String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d and c1 = %d",
-                          KEYSPACE, table, 4, 1),
-
-            String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d",
-                          KEYSPACE, table, 6),
-
-            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d",
-                          KEYSPACE, table, 0, 1),
-
-            String.format("SELECT * FROM %s.%s WHERE key = %d",
-                          KEYSPACE, table, partitions - (additionalParititons - 2)),
-
-            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d",
-                          KEYSPACE, table, partitions - (additionalParititons - 3), 4)
-
-            });
-
-            // drop compact storage on remaining node and check result
-            cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 2);
-            recorder.validateResults(cluster, 1);
-            recorder.validateResults(cluster, 2);
-        }).run();
-    }
-
-    private void runQueries(ICoordinator coordinator, ResultsRecorder helper, String[] queries)
-    {
-        for (String query : queries)
-            helper.addResult(query, coordinator.execute(query, ConsistencyLevel.ALL));
-    }
-
-    public static class ResultsRecorder
-    {
-        final private Map<String, Object[][]> preUpgradeResults = new HashMap<>();
-
-        public void addResult(String query, Object[][] results)
-        {
-            preUpgradeResults.put(query, results);
-        }
-
-        public Map<String, Object[][]> queriesAndResults()
-        {
-            return preUpgradeResults;
-        }
-
-        public void validateResults(UpgradeableCluster cluster, int node)
-        {
-            validateResults(cluster, node, ConsistencyLevel.ALL);
-        }
-
-        public void validateResults(UpgradeableCluster cluster, int node, ConsistencyLevel cl)
-        {
-            for (Map.Entry<String, Object[][]> entry : queriesAndResults().entrySet())
-            {
-                Object[][] postUpgradeResult = cluster.coordinator(node).execute(entry.getKey(), cl);
-                assertRows(postUpgradeResult, entry.getValue());
-            }
-
-        }
-    }
-}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorageMultiColumnTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorageMultiColumnTest.java
new file mode 100644
index 0000000000..086982ead2
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorageMultiColumnTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+
+
+public class CompactStorageMultiColumnTest extends UpgradeTestBase
+{
+    @Test
+    public void multiColumn() throws Throwable
+    {
+        new TestCase()
+        .upgradesFrom(v22)
+        .setup(cluster -> {
+            assert cluster.size() == 3;
+            int rf = cluster.size() - 1;
+            assert rf == 2;
+            cluster.schemaChange("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + (cluster.size() - 1) + "};");
+            cluster.schemaChange("CREATE TABLE ks.tbl (pk int, v1 int, v2 text, PRIMARY KEY (pk)) WITH COMPACT STORAGE");
+            ICoordinator coordinator = cluster.coordinator(1);
+            // these shouldn't be replicated by the 3rd node
+            coordinator.execute("INSERT INTO ks.tbl (pk, v1, v2) VALUES (3, 3, '3')", ConsistencyLevel.ALL);
+            coordinator.execute("INSERT INTO ks.tbl (pk, v1, v2) VALUES (9, 9, '9')", ConsistencyLevel.ALL);
+            for (int i = 0; i < cluster.size(); i++)
+            {
+                int nodeNum = i + 1;
+                System.out.printf("****** node %s: %s%n", nodeNum, cluster.get(nodeNum).config());
+            }
+        })
+        .runAfterNodeUpgrade(((cluster, node) -> {
+            if (node != 2)
+                return;
+
+            Object[][] rows = cluster.coordinator(3).execute("SELECT * FROM ks.tbl LIMIT 2", ConsistencyLevel.ALL);
+            Object[][] expected = {
+            row(9, 9, "9"),
+            row(3, 3, "3")
+            };
+            assertRows(rows, expected);
+        })).run();
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorageSingleColumnTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorageSingleColumnTest.java
new file mode 100644
index 0000000000..8a7170f28d
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorageSingleColumnTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+
+
+public class CompactStorageSingleColumnTest extends UpgradeTestBase
+{
+    @Test
+    public void singleColumn() throws Throwable
+    {
+        new TestCase()
+        .upgradesFrom(v22)
+        .setup(cluster -> {
+            assert cluster.size() == 3;
+            int rf = cluster.size() - 1;
+            assert rf == 2;
+            cluster.schemaChange("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + (cluster.size() - 1) + "};");
+            cluster.schemaChange("CREATE TABLE ks.tbl (pk int, v int, PRIMARY KEY (pk)) WITH COMPACT STORAGE");
+            ICoordinator coordinator = cluster.coordinator(1);
+            // these shouldn't be replicated by the 3rd node
+            coordinator.execute("INSERT INTO ks.tbl (pk, v) VALUES (3, 3)", ConsistencyLevel.ALL);
+            coordinator.execute("INSERT INTO ks.tbl (pk, v) VALUES (9, 9)", ConsistencyLevel.ALL);
+            for (int i = 0; i < cluster.size(); i++)
+            {
+                int nodeNum = i + 1;
+                System.out.printf("****** node %s: %s%n", nodeNum, cluster.get(nodeNum).config());
+            }
+        })
+        .runAfterNodeUpgrade(((cluster, node) -> {
+
+            if (node < 2)
+                return;
+
+            Object[][] rows = cluster.coordinator(3).execute("SELECT * FROM ks.tbl LIMIT 2", ConsistencyLevel.ALL);
+            Object[][] expected = {
+            row(9, 9),
+            row(3, 3)
+            };
+            assertRows(rows, expected);
+        })).run();
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageBeforeUpgradeSSTablesTest.java
similarity index 96%
rename from test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageTest.java
rename to test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageBeforeUpgradeSSTablesTest.java
index 80ce02afe0..2fd94894ff 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageBeforeUpgradeSSTablesTest.java
@@ -22,7 +22,6 @@ import com.vdurmont.semver4j.Semver;
 import org.junit.Test;
 
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
-import org.apache.cassandra.distributed.shared.Versions;
 
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
@@ -30,7 +29,7 @@ import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.catchThrowable;
 
-public class DropCompactStorageTest extends UpgradeTestBase
+public class DropCompactStorageBeforeUpgradeSSTablesTest extends DropCompactStorageTester
 {
     @Test
     public void dropCompactStorageBeforeUpgradesstablesTo3X() throws Throwable
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageTester.java b/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageTester.java
new file mode 100644
index 0000000000..bd9f4d67ad
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageTester.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+
+public abstract class DropCompactStorageTester extends UpgradeTestBase
+{
+    protected void runQueries(ICoordinator coordinator, ResultsRecorder helper, String[] queries)
+    {
+        for (String query : queries)
+            helper.addResult(query, coordinator.execute(query, ConsistencyLevel.ALL));
+    }
+
+    public static class ResultsRecorder
+    {
+        final private Map<String, Object[][]> preUpgradeResults = new HashMap<>();
+
+        public void addResult(String query, Object[][] results)
+        {
+            preUpgradeResults.put(query, results);
+        }
+
+        public Map<String, Object[][]> queriesAndResults()
+        {
+            return preUpgradeResults;
+        }
+
+        public void validateResults(UpgradeableCluster cluster, int node)
+        {
+            validateResults(cluster, node, ConsistencyLevel.ALL);
+        }
+
+        public void validateResults(UpgradeableCluster cluster, int node, ConsistencyLevel cl)
+        {
+            for (Map.Entry<String, Object[][]> entry : queriesAndResults().entrySet())
+            {
+                Object[][] postUpgradeResult = cluster.coordinator(node).execute(entry.getKey(), cl);
+                assertRows(postUpgradeResult, entry.getValue());
+            }
+        }
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageWithClusteringAndValueColumnTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageWithClusteringAndValueColumnTest.java
new file mode 100644
index 0000000000..53042d0ea0
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageWithClusteringAndValueColumnTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.junit.Assert.assertEquals;
+
+
+public class DropCompactStorageWithClusteringAndValueColumnTest extends DropCompactStorageTester
+{
+    @Test
+    public void testDropCompactWithClusteringAndValueColumn() throws Throwable
+    {
+        final String table = "clustering_and_value";
+        final int partitions = 10;
+        final int rowsPerPartition = 10;
+
+        final ResultsRecorder recorder = new ResultsRecorder();
+        new TestCase()
+        .nodes(2)
+        .upgradesFrom(v22)
+        .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
+        .setup(cluster -> {
+            cluster.schemaChange(String.format(
+            "CREATE TABLE %s.%s (key int, c1 int, c2 int, c3 int, PRIMARY KEY (key, c1, c2)) WITH COMPACT STORAGE",
+            KEYSPACE, table));
+            ICoordinator coordinator = cluster.coordinator(1);
+
+            for (int i = 1; i <= partitions; i++)
+            {
+                for (int j = 1; j <= rowsPerPartition; j++)
+                {
+                    coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 1, 1)",
+                                                      KEYSPACE, table, i, j), ConsistencyLevel.ALL);
+                    coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 2, 1)",
+                                                      KEYSPACE, table, i, j), ConsistencyLevel.ALL);
+                }
+            }
+
+            runQueries(cluster.coordinator(1), recorder, new String[]{
+            String.format("SELECT * FROM %s.%s", KEYSPACE, table),
+
+            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
+                          KEYSPACE, table, partitions - 3, rowsPerPartition - 2),
+
+            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
+                          KEYSPACE, table, partitions - 1, rowsPerPartition - 5),
+
+            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
+                          KEYSPACE, table, partitions - 1, rowsPerPartition - 5, 1),
+
+            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
+                          KEYSPACE, table, partitions - 4, rowsPerPartition - 9, 1),
+
+            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 > %d",
+                          KEYSPACE, table, partitions - 4, rowsPerPartition - 9, 1),
+
+            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d and c2 > %d",
+                          KEYSPACE, table, partitions - 4, rowsPerPartition - 9, 2),
+
+            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d",
+                          KEYSPACE, table, partitions - 8, rowsPerPartition - 3),
+            });
+        }).runBeforeNodeRestart((cluster, node) -> {
+            cluster.get(node).config().set("enable_drop_compact_storage", true);
+        }).runAfterClusterUpgrade(cluster -> {
+            for (int i = 1; i <= cluster.size(); i++)
+            {
+                NodeToolResult result = cluster.get(i).nodetoolResult("upgradesstables");
+                assertEquals("upgrade sstables failed for node " + i, 0, result.getRc());
+            }
+            Thread.sleep(1000);
+
+            // make sure the results are the same after upgrade and upgrade sstables but before dropping compact storage
+            recorder.validateResults(cluster, 1);
+            recorder.validateResults(cluster, 2);
+
+            // make sure the results are the same after dropping compact storage on only the first node
+            IMessageFilters.Filter filter = cluster.verbs().allVerbs().to(2).drop();
+            cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 1);
+
+            recorder.validateResults(cluster, 1, ConsistencyLevel.ONE);
+
+            filter.off();
+            recorder.validateResults(cluster, 1);
+            recorder.validateResults(cluster, 2);
+
+            // make sure the results continue to be the same after dropping compact storage on the second node
+            cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 2);
+            recorder.validateResults(cluster, 1);
+            recorder.validateResults(cluster, 2);
+        })
+        .run();
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageWithDeletesAndWritesTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageWithDeletesAndWritesTest.java
new file mode 100644
index 0000000000..9d9e278fde
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/DropCompactStorageWithDeletesAndWritesTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+
+public class DropCompactStorageWithDeletesAndWritesTest extends DropCompactStorageTester
+{
+    @Test
+    public void testDropCompactWithClusteringAndValueColumnWithDeletesAndWrites() throws Throwable
+    {
+        final String table = "clustering_and_value_with_deletes";
+        final int partitions = 10;
+        final int rowsPerPartition = 10;
+        final int additionalParititons = 5;
+
+        new TestCase()
+        .nodes(2)
+        .upgradesFrom(v22)
+        .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL).set("enable_drop_compact_storage", true))
+        .setup(cluster -> {
+            cluster.schemaChange(String.format(
+            "CREATE TABLE %s.%s (key int, c1 int, c2 int, c3 int, PRIMARY KEY (key, c1, c2)) WITH COMPACT STORAGE",
+            KEYSPACE, table));
+            ICoordinator coordinator = cluster.coordinator(1);
+
+            for (int i = 1; i <= partitions; i++)
+            {
+                for (int j = 1; j <= rowsPerPartition; j++)
+                {
+                    coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 1, 1)",
+                                                      KEYSPACE, table, i, j), ConsistencyLevel.ALL);
+                    coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 2, 2)",
+                                                      KEYSPACE, table, i, j), ConsistencyLevel.ALL);
+                    coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 3, 3)",
+                                                      KEYSPACE, table, i, j), ConsistencyLevel.ALL);
+                }
+            }
+
+        })
+        .runAfterClusterUpgrade(cluster -> {
+            cluster.forEach(n -> n.nodetoolResult("upgradesstables", KEYSPACE).asserts().success());
+            Thread.sleep(1000);
+
+            // drop compact storage on only one node before performing writes
+            IMessageFilters.Filter filter = cluster.verbs().allVerbs().to(2).drop();
+            cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 1);
+            filter.off();
+
+            // add new partitions and delete some of the old ones
+            ICoordinator coordinator = cluster.coordinator(1);
+            for (int i = 0; i < additionalParititons; i++)
+            {
+                for (int j = 1; j <= rowsPerPartition; j++)
+                {
+                    coordinator.execute(String.format("INSERT INTO %s.%s (key, c1, c2, c3) VALUES (%d, %d, 1, 1)",
+                                                      KEYSPACE, table, i, j), ConsistencyLevel.ALL);
+                }
+            }
+
+            coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d",
+                                              KEYSPACE, table, 0, 3), ConsistencyLevel.ALL);
+
+            coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d",
+                                              KEYSPACE, table, 1), ConsistencyLevel.ALL);
+
+            coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
+                                              KEYSPACE, table, 7, 2, 2), ConsistencyLevel.ALL);
+
+            coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
+                                              KEYSPACE, table, 7, 6, 1), ConsistencyLevel.ALL);
+
+            coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
+                                              KEYSPACE, table, 4, 1, 1), ConsistencyLevel.ALL);
+
+            coordinator.execute(String.format("DELETE c3 FROM %s.%s WHERE key = %d and c1 = %d and c2 = %d",
+                                              KEYSPACE, table, 8, 1, 3), ConsistencyLevel.ALL);
+
+            coordinator.execute(String.format("DELETE FROM %s.%s WHERE key = %d and c1 = %d and c2 > 1",
+                                              KEYSPACE, table, 6, 2), ConsistencyLevel.ALL);
+
+            ResultsRecorder recorder = new ResultsRecorder();
+            runQueries(coordinator, recorder, new String[] {
+            String.format("SELECT * FROM %s.%s", KEYSPACE, table),
+
+            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
+                          KEYSPACE, table, partitions - 3, rowsPerPartition - 2),
+
+            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
+                          KEYSPACE, table, partitions - 1, rowsPerPartition - 5),
+
+
+            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d",
+                          KEYSPACE, table, partitions - 8, rowsPerPartition - 3),
+
+            String.format("SELECT * FROM %s.%s WHERE key = %d",
+                          KEYSPACE, table, 7),
+
+            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
+                          KEYSPACE, table, 7, 2),
+
+            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 = %d",
+                          KEYSPACE, table, 8, 1),
+
+            String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d and c1 = %d",
+                          KEYSPACE, table, 8, 1),
+
+            String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d and c1 = %d",
+                          KEYSPACE, table, 8, 1),
+
+            String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d and c1 = %d",
+                          KEYSPACE, table, 4, 1),
+
+            String.format("SELECT c1, c2 FROM %s.%s WHERE key = %d",
+                          KEYSPACE, table, 6),
+
+            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d",
+                          KEYSPACE, table, 0, 1),
+
+            String.format("SELECT * FROM %s.%s WHERE key = %d",
+                          KEYSPACE, table, partitions - (additionalParititons - 2)),
+
+            String.format("SELECT * FROM %s.%s WHERE key = %d and c1 > %d",
+                          KEYSPACE, table, partitions - (additionalParititons - 3), 4)
+
+            });
+
+            // drop compact storage on remaining node and check result
+            cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, table), 2);
+            recorder.validateResults(cluster, 1);
+            recorder.validateResults(cluster, 2);
+        }).run();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org