You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2022/10/20 05:49:00 UTC

[cassandra] branch trunk updated: Keep sstable level when streaming for decommission and move

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7e6799a4b9 Keep sstable level when streaming for decommission and move
7e6799a4b9 is described below

commit 7e6799a4b903b4cdc1a467ada69f2f12a6099fcb
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Tue Oct 18 08:26:21 2022 +0200

    Keep sstable level when streaming for decommission and move
    
    Patch by marcuse; reviewed by David Capwell for CASSANDRA-17969
---
 CHANGES.txt                                        |   1 +
 .../db/streaming/CassandraOutgoingFile.java        |   4 +-
 .../cassandra/streaming/StreamOperation.java       |  43 ++++---
 .../test/streaming/LCSStreamingKeepLevelTest.java  | 128 +++++++++++++++++++++
 4 files changed, 154 insertions(+), 22 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 96e30bab5c..b2a7ae0d05 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.2
+ * Keep sstable level when streaming for decommission and move (CASSANDRA-17969)
  * Add Unavailables metric for CASWrite in the docs (CASSANDRA-16357)
  * Make Cassandra logs able to be viewed in the virtual table system_views.system_logs (CASSANDRA-17946)
  * IllegalArgumentException in Gossiper#order due to concurrent mutations to elements being applied (CASSANDRA-17908)
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
index 367c304b08..6a712276bb 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
@@ -76,8 +76,6 @@ public class CassandraOutgoingFile implements OutgoingStream
                                                     boolean shouldStreamEntireSSTable,
                                                     ComponentManifest manifest)
     {
-        boolean keepSSTableLevel = operation == StreamOperation.BOOTSTRAP || operation == StreamOperation.REBUILD;
-
         CompressionInfo compressionInfo = sstable.compression
                 ? CompressionInfo.newLazyInstance(sstable.getCompressionMetadata(), sections)
                 : null;
@@ -85,7 +83,7 @@ public class CassandraOutgoingFile implements OutgoingStream
         return CassandraStreamHeader.builder()
                                     .withSSTableFormat(sstable.descriptor.formatType)
                                     .withSSTableVersion(sstable.descriptor.version)
-                                    .withSSTableLevel(keepSSTableLevel ? sstable.getSSTableLevel() : 0)
+                                    .withSSTableLevel(operation.keepSSTableLevel() ? sstable.getSSTableLevel() : 0)
                                     .withEstimatedKeys(estimatedKeys)
                                     .withSections(sections)
                                     .withCompressionInfo(compressionInfo)
diff --git a/src/java/org/apache/cassandra/streaming/StreamOperation.java b/src/java/org/apache/cassandra/streaming/StreamOperation.java
index 8151b47b22..98a4070d2b 100644
--- a/src/java/org/apache/cassandra/streaming/StreamOperation.java
+++ b/src/java/org/apache/cassandra/streaming/StreamOperation.java
@@ -19,43 +19,43 @@ package org.apache.cassandra.streaming;
 
 public enum StreamOperation
 {
-    OTHER("Other"), // Fallback to avoid null types when deserializing from string
-    RESTORE_REPLICA_COUNT("Restore replica count", false), // Handles removeNode
-    DECOMMISSION("Unbootstrap", false),
-    RELOCATION("Relocation", false),
-    BOOTSTRAP("Bootstrap", false),
-    REBUILD("Rebuild", false),
-    BULK_LOAD("Bulk Load"),
-    REPAIR("Repair");
+    OTHER("Other", true, false), // Fallback to avoid null types when deserializing from string
+    RESTORE_REPLICA_COUNT("Restore replica count", false, false), // Handles removeNode
+    DECOMMISSION("Unbootstrap", false, true),
+    RELOCATION("Relocation", false, true),
+    BOOTSTRAP("Bootstrap", false, true),
+    REBUILD("Rebuild", false, true),
+    BULK_LOAD("Bulk Load", true, false),
+    REPAIR("Repair", true, false);
 
     private final String description;
     private final boolean requiresViewBuild;
-
-
-    StreamOperation(String description) {
-        this(description, true);
-    }
+    private final boolean keepSSTableLevel;
 
     /**
      * @param description The operation description
      * @param requiresViewBuild Whether this operation requires views to be updated if it involves a base table
      */
-    StreamOperation(String description, boolean requiresViewBuild) {
+    StreamOperation(String description, boolean requiresViewBuild, boolean keepSSTableLevel)
+    {
         this.description = description;
         this.requiresViewBuild = requiresViewBuild;
+        this.keepSSTableLevel = keepSSTableLevel;
     }
 
-    public static StreamOperation fromString(String text) {
-        for (StreamOperation b : StreamOperation.values()) {
-            if (b.description.equalsIgnoreCase(text)) {
+    public static StreamOperation fromString(String text)
+    {
+        for (StreamOperation b : StreamOperation.values())
+        {
+            if (b.description.equalsIgnoreCase(text))
                 return b;
-            }
         }
 
         return OTHER;
     }
 
-    public String getDescription() {
+    public String getDescription()
+    {
         return description;
     }
 
@@ -66,4 +66,9 @@ public enum StreamOperation
     {
         return this.requiresViewBuild;
     }
+
+    public boolean keepSSTableLevel()
+    {
+        return keepSSTableLevel;
+    }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/streaming/LCSStreamingKeepLevelTest.java b/test/distributed/org/apache/cassandra/distributed/test/streaming/LCSStreamingKeepLevelTest.java
new file mode 100644
index 0000000000..4319768169
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/streaming/LCSStreamingKeepLevelTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.junit.Test;
+
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.junit.Assert.assertTrue;
+
+public class LCSStreamingKeepLevelTest extends TestBaseImpl
+{
+    @Test
+    public void testDecom() throws IOException
+    {
+        try (Cluster cluster = builder().withNodes(4)
+                                        .withConfig(config -> config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL))
+                                        .withoutVNodes()
+                                        .withDataDirCount(1)
+                                        .start())
+        {
+            populate(cluster);
+
+            cluster.get(4).nodetoolResult("decommission").asserts().success();
+
+            assertEmptyL0(cluster);
+        }
+    }
+
+    @Test
+    public void testMove() throws IOException
+    {
+        try (Cluster cluster = builder().withNodes(4)
+                                        .withConfig(config -> config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL))
+                                        .withoutVNodes()
+                                        .withDataDirCount(1)
+                                        .start())
+        {
+            populate(cluster);
+
+            long tokenVal = ((Murmur3Partitioner.LongToken)cluster.tokens().get(3).getToken()).token;
+            long prevTokenVal = ((Murmur3Partitioner.LongToken)cluster.tokens().get(2).getToken()).token;
+            // move node 4 to the middle point between its current position and the previous node
+            long newToken = (tokenVal + prevTokenVal) / 2;
+            cluster.get(4).nodetoolResult("move", String.valueOf(newToken)).asserts().success();
+
+            assertEmptyL0(cluster);
+        }
+    }
+
+    private static void populate(Cluster cluster)
+    {
+        cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2}"));
+        cluster.schemaChange(withKeyspace("CREATE TABLE %s.decom_test (id int PRIMARY KEY, value int) with compaction = { 'class':'LeveledCompactionStrategy', 'enabled':'false' }"));
+
+        for (int i = 0; i < 500; i++)
+        {
+            cluster.coordinator(1).execute(withKeyspace("insert into %s.decom_test (id, value) VALUES (?, ?)"), ConsistencyLevel.ALL, i, i);
+            if (i % 100 == 0)
+                cluster.forEach((inst) -> inst.flush(KEYSPACE));
+        }
+        cluster.forEach((i) -> i.flush(KEYSPACE));
+        relevel(cluster);
+    }
+
+    private static void relevel(Cluster cluster)
+    {
+        for (IInvokableInstance i : cluster)
+        {
+            i.runOnInstance(() -> {
+                Set<SSTableReader> sstables = Keyspace.open(KEYSPACE).getColumnFamilyStore("decom_test").getLiveSSTables();
+                int lvl = 1;
+                for (SSTableReader sstable : sstables)
+                {
+                    try
+                    {
+                        sstable.mutateLevelAndReload(lvl++);
+                    }
+                    catch (IOException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
+        }
+
+        assertEmptyL0(cluster);
+    }
+
+    private static void assertEmptyL0(Cluster cluster)
+    {
+        for (IInvokableInstance i : cluster)
+        {
+            i.runOnInstance(() -> {
+                for (SSTableReader sstable : Keyspace.open(KEYSPACE).getColumnFamilyStore("decom_test").getLiveSSTables())
+                    assertTrue(sstable.getSSTableLevel() > 0);
+            });
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org