You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2022/09/08 21:39:01 UTC
[cassandra] branch trunk updated: Optionally avoid hint transfer during decommission
This is an automated email from the ASF dual-hosted git repository.
maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new d6aee7e08c Optionally avoid hint transfer during decommission
d6aee7e08c is described below
commit d6aee7e08c658db9d394a6b7e3e27791b4d6854f
Author: Caleb Rackliffe <ca...@gmail.com>
AuthorDate: Thu Sep 1 11:47:22 2022 -0500
Optionally avoid hint transfer during decommission
patch by Caleb Rackliffe; reviewed by Jon Meredith and Stefan Miklosovic for CASSANDRA-17808
---
CHANGES.txt | 1 +
conf/cassandra.yaml | 5 +++
src/java/org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 10 +++++
.../cassandra/hints/HintsDispatchExecutor.java | 20 ++++++----
.../apache/cassandra/service/StorageService.java | 28 ++++++++++++--
.../cassandra/service/StorageServiceMBean.java | 3 ++
.../test/HintedHandoffAddRemoveNodesTest.java | 45 +++++++++++++++++++++-
8 files changed, 102 insertions(+), 11 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 3328974dad..e4566fa723 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.2
+ * Optionally avoid hint transfer during decommission (CASSANDRA-17808)
* Make disabling auto snapshot on selected tables possible (CASSANDRA-10383)
* Introduce compaction priorities to prevent upgrade compaction inability to finish (CASSANDRA-17851)
* Prevent a user from manually removing ephemeral snapshots (CASSANDRA-17757)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 55c8b5a756..93a581b058 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -99,6 +99,11 @@ max_hints_file_size: 128MiB
# Disable the option in order to preserve those hints on the disk.
auto_hints_cleanup_enabled: false
+# Enable/disable transfering hints to a peer during decommission. Even when enabled, this does not guarantee
+# consistency for logged batches, and it may delay decommission when coupled with a strict hinted_handoff_throttle.
+# Default: true
+# transfer_hints_on_decommission: true
+
# Compression to apply to the hint files. If omitted, hints files
# will be written uncompressed. LZ4, Snappy, and Deflate compressors
# are supported.
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 0281794759..c3a406b455 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -428,6 +428,7 @@ public class Config
public ParameterizedClass hints_compression;
public volatile boolean auto_hints_cleanup_enabled = false;
+ public volatile boolean transfer_hints_on_decommission = true;
public volatile boolean incremental_backups = false;
public boolean trickle_fsync = false;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 42b5e27fba..482e95fa75 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -3157,6 +3157,16 @@ public class DatabaseDescriptor
conf.auto_hints_cleanup_enabled = value;
}
+ public static boolean getTransferHintsOnDecommission()
+ {
+ return conf.transfer_hints_on_decommission;
+ }
+
+ public static void setTransferHintsOnDecommission(boolean enabled)
+ {
+ conf.transfer_hints_on_decommission = enabled;
+ }
+
public static boolean isIncrementalBackupsEnabled()
{
return conf.incremental_backups;
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
index 0f34db6161..540f5bd85d 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
@@ -192,7 +192,7 @@ final class HintsDispatchExecutor
private boolean transfer(UUID hostId)
{
catalog.stores()
- .map(store -> new DispatchHintsTask(store, hostId))
+ .map(store -> new DispatchHintsTask(store, hostId, true))
.forEach(Runnable::run);
return !catalog.hasFiles();
@@ -205,21 +205,27 @@ final class HintsDispatchExecutor
private final UUID hostId;
private final RateLimiter rateLimiter;
- DispatchHintsTask(HintsStore store, UUID hostId)
+ DispatchHintsTask(HintsStore store, UUID hostId, boolean isTransfer)
{
this.store = store;
this.hostId = hostId;
- // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
- // max rate is scaled by the number of nodes in the cluster (CASSANDRA-5272).
- // the goal is to bound maximum hints traffic going towards a particular node from the rest of the cluster,
- // not total outgoing hints traffic from this node - this is why the rate limiter is not shared between
+ // Rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
+ // Max rate is scaled by the number of nodes in the cluster (CASSANDRA-5272), unless we are transferring
+ // hints during decomission rather than dispatching them to their final destination.
+ // The goal is to bound maximum hints traffic going towards a particular node from the rest of the cluster,
+ // not total outgoing hints traffic from this node. This is why the rate limiter is not shared between
// all the dispatch tasks (as there will be at most one dispatch task for a particular host id at a time).
- int nodesCount = Math.max(1, StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1);
+ int nodesCount = isTransfer ? 1 : Math.max(1, StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1);
double throttleInBytes = DatabaseDescriptor.getHintedHandoffThrottleInKiB() * 1024.0 / nodesCount;
this.rateLimiter = RateLimiter.create(throttleInBytes == 0 ? Double.MAX_VALUE : throttleInBytes);
}
+ DispatchHintsTask(HintsStore store, UUID hostId)
+ {
+ this(store, hostId, false);
+ }
+
public void run()
{
try
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index f254c34221..47fad4a7e0 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -312,7 +312,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
/* the probability for tracing any particular request, 0 disables tracing and 1 enables for all */
private double traceProbability = 0.0;
- private enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED }
+ public enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED }
private volatile Mode operationMode = Mode.STARTING;
/* Used for tracking drain progress */
@@ -4980,9 +4980,20 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
logger.debug("waiting for batch log processing.");
batchlogReplay.get();
- setMode(Mode.LEAVING, "streaming hints to other nodes", true);
+ Future<?> hintsSuccess = ImmediateFuture.success(null);
- Future hintsSuccess = streamHints();
+ if (DatabaseDescriptor.getTransferHintsOnDecommission())
+ {
+ setMode(Mode.LEAVING, "streaming hints to other nodes", true);
+ hintsSuccess = streamHints();
+ }
+ else
+ {
+ setMode(Mode.LEAVING, "pausing dispatch and deleting hints", true);
+ DatabaseDescriptor.setHintedHandoffEnabled(false);
+ HintsService.instance.pauseDispatch();
+ HintsService.instance.deleteAllHints();
+ }
// wait for the transfer runnables to signal the latch.
logger.debug("waiting for stream acks.");
@@ -6305,6 +6316,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
logger.info("updated hinted_handoff_throttle to {} KiB", throttleInKB);
}
+ public boolean getTransferHintsOnDecommission()
+ {
+ return DatabaseDescriptor.getTransferHintsOnDecommission();
+ }
+
+ public void setTransferHintsOnDecommission(boolean enabled)
+ {
+ DatabaseDescriptor.setTransferHintsOnDecommission(enabled);
+ logger.info("updated transfer_hints_on_decommission to {}", enabled);
+ }
+
@Override
public void clearConnectionHistory()
{
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 02485274a3..7e512cdaf7 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -886,6 +886,9 @@ public interface StorageServiceMBean extends NotificationEmitter
/** Sets the hinted handoff throttle in KiB per second, per delivery thread. */
public void setHintedHandoffThrottleInKB(int throttleInKB);
+ public boolean getTransferHintsOnDecommission();
+ public void setTransferHintsOnDecommission(boolean enabled);
+
/**
* Resume bootstrap streaming when there is failed data streaming.
*
diff --git a/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java b/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java
index 5cf1ab66df..add6fdf500 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java
@@ -23,18 +23,22 @@ import org.junit.Test;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.action.GossipHelper;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.shared.NetworkTopology;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.metrics.HintsServiceMetrics;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.service.StorageService;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.apache.cassandra.distributed.action.GossipHelper.decommission;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.TWO;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
@@ -46,6 +50,39 @@ import static org.apache.cassandra.distributed.shared.AssertUtils.row;
*/
public class HintedHandoffAddRemoveNodesTest extends TestBaseImpl
{
+ @SuppressWarnings("Convert2MethodRef")
+ @Test
+ public void shouldAvoidHintTransferOnDecommission() throws Exception
+ {
+ try (Cluster cluster = init(builder().withNodes(3)
+ .withConfig(config -> config.set("transfer_hints_on_decommission", false).with(GOSSIP))
+ .withoutVNodes()
+ .start()))
+ {
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.decom_no_hints_test (key int PRIMARY KEY, value int)"));
+
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.decom_no_hints_test (key, value) VALUES (?, ?)"), ALL, 0, 0);
+ long hintsBeforeShutdown = countTotalHints(cluster.get(1));
+ assertThat(hintsBeforeShutdown).isEqualTo(0);
+ long hintsDelivered = countHintsDelivered(cluster.get(1));
+ assertThat(hintsDelivered).isEqualTo(0);
+
+ // Shutdown node 3 so hints can be written against it.
+ cluster.get(3).shutdown().get();
+
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.decom_no_hints_test (key, value) VALUES (?, ?)"), TWO, 0, 0);
+ Awaitility.await().until(() -> countTotalHints(cluster.get(1)) > 0);
+ long hintsAfterShutdown = countTotalHints(cluster.get(1));
+ assertThat(hintsAfterShutdown).isEqualTo(1);
+
+ cluster.get(1).nodetoolResult("decommission", "--force").asserts().success();
+ long hintsDeliveredByDecom = countHintsDelivered(cluster.get(1));
+ String mode = cluster.get(1).callOnInstance(() -> StorageService.instance.getOperationMode());
+ assertEquals(StorageService.Mode.DECOMMISSIONED.toString(), mode);
+ assertThat(hintsDeliveredByDecom).isEqualTo(0);
+ }
+ }
+
/**
* Replaces Python dtest {@code hintedhandoff_test.py:TestHintedHandoff.test_hintedhandoff_decom()}.
*/
@@ -130,6 +167,12 @@ public class HintedHandoffAddRemoveNodesTest extends TestBaseImpl
return instance.callOnInstance(() -> StorageMetrics.totalHints.getCount());
}
+ @SuppressWarnings("Convert2MethodRef")
+ private long countHintsDelivered(IInvokableInstance instance)
+ {
+ return instance.callOnInstance(() -> HintsServiceMetrics.hintsSucceeded.getCount());
+ }
+
@SuppressWarnings("SameParameterValue")
private void populate(Cluster cluster, String table, int coordinator, int start, int count, ConsistencyLevel cl)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org