You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2022/09/08 21:39:01 UTC

[cassandra] branch trunk updated: Optionally avoid hint transfer during decommission

This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d6aee7e08c Optionally avoid hint transfer during decommission
d6aee7e08c is described below

commit d6aee7e08c658db9d394a6b7e3e27791b4d6854f
Author: Caleb Rackliffe <ca...@gmail.com>
AuthorDate: Thu Sep 1 11:47:22 2022 -0500

    Optionally avoid hint transfer during decommission
    
    patch by Caleb Rackliffe; reviewed by Jon Meredith and Stefan Miklosovic for CASSANDRA-17808
---
 CHANGES.txt                                        |  1 +
 conf/cassandra.yaml                                |  5 +++
 src/java/org/apache/cassandra/config/Config.java   |  1 +
 .../cassandra/config/DatabaseDescriptor.java       | 10 +++++
 .../cassandra/hints/HintsDispatchExecutor.java     | 20 ++++++----
 .../apache/cassandra/service/StorageService.java   | 28 ++++++++++++--
 .../cassandra/service/StorageServiceMBean.java     |  3 ++
 .../test/HintedHandoffAddRemoveNodesTest.java      | 45 +++++++++++++++++++++-
 8 files changed, 102 insertions(+), 11 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 3328974dad..e4566fa723 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.2
+ * Optionally avoid hint transfer during decommission (CASSANDRA-17808)
  * Make disabling auto snapshot on selected tables possible (CASSANDRA-10383)
  * Introduce compaction priorities to prevent upgrade compaction inability to finish (CASSANDRA-17851)
  * Prevent a user from manually removing ephemeral snapshots (CASSANDRA-17757)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 55c8b5a756..93a581b058 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -99,6 +99,11 @@ max_hints_file_size: 128MiB
 # Disable the option in order to preserve those hints on the disk.
 auto_hints_cleanup_enabled: false
 
+# Enable/disable transfering hints to a peer during decommission. Even when enabled, this does not guarantee
+# consistency for logged batches, and it may delay decommission when coupled with a strict hinted_handoff_throttle. 
+# Default: true
+# transfer_hints_on_decommission: true
+
 # Compression to apply to the hint files. If omitted, hints files
 # will be written uncompressed. LZ4, Snappy, and Deflate compressors
 # are supported.
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 0281794759..c3a406b455 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -428,6 +428,7 @@ public class Config
 
     public ParameterizedClass hints_compression;
     public volatile boolean auto_hints_cleanup_enabled = false;
+    public volatile boolean transfer_hints_on_decommission = true;
 
     public volatile boolean incremental_backups = false;
     public boolean trickle_fsync = false;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 42b5e27fba..482e95fa75 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -3157,6 +3157,16 @@ public class DatabaseDescriptor
         conf.auto_hints_cleanup_enabled = value;
     }
 
+    public static boolean getTransferHintsOnDecommission()
+    {
+        return conf.transfer_hints_on_decommission;
+    }
+
+    public static void setTransferHintsOnDecommission(boolean enabled)
+    {
+        conf.transfer_hints_on_decommission = enabled;
+    }
+
     public static boolean isIncrementalBackupsEnabled()
     {
         return conf.incremental_backups;
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
index 0f34db6161..540f5bd85d 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
@@ -192,7 +192,7 @@ final class HintsDispatchExecutor
         private boolean transfer(UUID hostId)
         {
             catalog.stores()
-                   .map(store -> new DispatchHintsTask(store, hostId))
+                   .map(store -> new DispatchHintsTask(store, hostId, true))
                    .forEach(Runnable::run);
 
             return !catalog.hasFiles();
@@ -205,21 +205,27 @@ final class HintsDispatchExecutor
         private final UUID hostId;
         private final RateLimiter rateLimiter;
 
-        DispatchHintsTask(HintsStore store, UUID hostId)
+        DispatchHintsTask(HintsStore store, UUID hostId, boolean isTransfer)
         {
             this.store = store;
             this.hostId = hostId;
 
-            // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
-            // max rate is scaled by the number of nodes in the cluster (CASSANDRA-5272).
-            // the goal is to bound maximum hints traffic going towards a particular node from the rest of the cluster,
-            // not total outgoing hints traffic from this node - this is why the rate limiter is not shared between
+            // Rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
+            // Max rate is scaled by the number of nodes in the cluster (CASSANDRA-5272), unless we are transferring
+            // hints during decomission rather than dispatching them to their final destination.
+            // The goal is to bound maximum hints traffic going towards a particular node from the rest of the cluster,
+            // not total outgoing hints traffic from this node. This is why the rate limiter is not shared between
             // all the dispatch tasks (as there will be at most one dispatch task for a particular host id at a time).
-            int nodesCount = Math.max(1, StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1);
+            int nodesCount = isTransfer ? 1 : Math.max(1, StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1);
             double throttleInBytes = DatabaseDescriptor.getHintedHandoffThrottleInKiB() * 1024.0 / nodesCount;
             this.rateLimiter = RateLimiter.create(throttleInBytes == 0 ? Double.MAX_VALUE : throttleInBytes);
         }
 
+        DispatchHintsTask(HintsStore store, UUID hostId)
+        {
+            this(store, hostId, false);
+        }
+
         public void run()
         {
             try
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index f254c34221..47fad4a7e0 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -312,7 +312,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     /* the probability for tracing any particular request, 0 disables tracing and 1 enables for all */
     private double traceProbability = 0.0;
 
-    private enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED }
+    public enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED }
     private volatile Mode operationMode = Mode.STARTING;
 
     /* Used for tracking drain progress */
@@ -4980,9 +4980,20 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         logger.debug("waiting for batch log processing.");
         batchlogReplay.get();
 
-        setMode(Mode.LEAVING, "streaming hints to other nodes", true);
+        Future<?> hintsSuccess = ImmediateFuture.success(null);
 
-        Future hintsSuccess = streamHints();
+        if (DatabaseDescriptor.getTransferHintsOnDecommission()) 
+        {
+            setMode(Mode.LEAVING, "streaming hints to other nodes", true);
+            hintsSuccess = streamHints();
+        }
+        else
+        {
+            setMode(Mode.LEAVING, "pausing dispatch and deleting hints", true);
+            DatabaseDescriptor.setHintedHandoffEnabled(false);
+            HintsService.instance.pauseDispatch();
+            HintsService.instance.deleteAllHints();
+        }
 
         // wait for the transfer runnables to signal the latch.
         logger.debug("waiting for stream acks.");
@@ -6305,6 +6316,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         logger.info("updated hinted_handoff_throttle to {} KiB", throttleInKB);
     }
 
+    public boolean getTransferHintsOnDecommission()
+    {
+        return DatabaseDescriptor.getTransferHintsOnDecommission();
+    }
+
+    public void setTransferHintsOnDecommission(boolean enabled)
+    {
+        DatabaseDescriptor.setTransferHintsOnDecommission(enabled);
+        logger.info("updated transfer_hints_on_decommission to {}", enabled);
+    }
+
     @Override
     public void clearConnectionHistory()
     {
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 02485274a3..7e512cdaf7 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -886,6 +886,9 @@ public interface StorageServiceMBean extends NotificationEmitter
     /** Sets the hinted handoff throttle in KiB per second, per delivery thread. */
     public void setHintedHandoffThrottleInKB(int throttleInKB);
 
+    public boolean getTransferHintsOnDecommission();
+    public void setTransferHintsOnDecommission(boolean enabled);
+
     /**
      * Resume bootstrap streaming when there is failed data streaming.
      *
diff --git a/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java b/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java
index 5cf1ab66df..add6fdf500 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java
@@ -23,18 +23,22 @@ import org.junit.Test;
 
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.action.GossipHelper;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.TokenSupplier;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.metrics.HintsServiceMetrics;
 import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.service.StorageService;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
 
 import static org.apache.cassandra.distributed.action.GossipHelper.decommission;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.TWO;
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
@@ -46,6 +50,39 @@ import static org.apache.cassandra.distributed.shared.AssertUtils.row;
  */
 public class HintedHandoffAddRemoveNodesTest extends TestBaseImpl
 {
+    @SuppressWarnings("Convert2MethodRef")
+    @Test
+    public void shouldAvoidHintTransferOnDecommission() throws Exception
+    {
+        try (Cluster cluster = init(builder().withNodes(3)
+                                             .withConfig(config -> config.set("transfer_hints_on_decommission", false).with(GOSSIP))
+                                             .withoutVNodes()
+                                             .start()))
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.decom_no_hints_test (key int PRIMARY KEY, value int)"));
+
+            cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.decom_no_hints_test (key, value) VALUES (?, ?)"), ALL, 0, 0);
+            long hintsBeforeShutdown = countTotalHints(cluster.get(1));
+            assertThat(hintsBeforeShutdown).isEqualTo(0);
+            long hintsDelivered = countHintsDelivered(cluster.get(1));
+            assertThat(hintsDelivered).isEqualTo(0);
+
+            // Shutdown node 3 so hints can be written against it.
+            cluster.get(3).shutdown().get();
+
+            cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.decom_no_hints_test (key, value) VALUES (?, ?)"), TWO, 0, 0);
+            Awaitility.await().until(() -> countTotalHints(cluster.get(1)) > 0);
+            long hintsAfterShutdown = countTotalHints(cluster.get(1));
+            assertThat(hintsAfterShutdown).isEqualTo(1);
+
+            cluster.get(1).nodetoolResult("decommission", "--force").asserts().success();
+            long hintsDeliveredByDecom = countHintsDelivered(cluster.get(1));
+            String mode = cluster.get(1).callOnInstance(() -> StorageService.instance.getOperationMode());
+            assertEquals(StorageService.Mode.DECOMMISSIONED.toString(), mode);
+            assertThat(hintsDeliveredByDecom).isEqualTo(0);
+        }
+    }
+
     /**
      * Replaces Python dtest {@code hintedhandoff_test.py:TestHintedHandoff.test_hintedhandoff_decom()}.
      */
@@ -130,6 +167,12 @@ public class HintedHandoffAddRemoveNodesTest extends TestBaseImpl
         return instance.callOnInstance(() -> StorageMetrics.totalHints.getCount());
     }
 
+    @SuppressWarnings("Convert2MethodRef")
+    private long countHintsDelivered(IInvokableInstance instance)
+    {
+        return instance.callOnInstance(() -> HintsServiceMetrics.hintsSucceeded.getCount());
+    }
+
     @SuppressWarnings("SameParameterValue")
     private void populate(Cluster cluster, String table, int coordinator, int start, int count, ConsistencyLevel cl)
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org