You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/08/24 07:12:44 UTC
[cassandra] branch trunk updated: Don't allow repair to overrun
compaction
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new dfd0aeb Don't allow repair to overrun compaction
dfd0aeb is described below
commit dfd0aebf73faa8f910e68ff5d50f91fae4f2669e
Author: Jeff Jirsa <jj...@apple.com>
AuthorDate: Fri May 15 16:46:41 2020 -0700
Don't allow repair to overrun compaction
Patch by Jeff Jirsa; reviewed by David Capwell and marcuse
for CASSANDRA-15817
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 10 +++
.../cassandra/repair/RepairMessageVerbHandler.java | 15 +++-
.../cassandra/service/ActiveRepairService.java | 29 ++++++++
.../service/ActiveRepairServiceMBean.java | 3 +
.../distributed/test/RepairOperationalTest.java | 83 ++++++++++++++++++++++
7 files changed, 141 insertions(+), 1 deletion(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index f4efcbd..77a0652 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-beta2
+ * Prevent repair from overrunning compaction (CASSANDRA-15817)
* fix cqlsh COPY functions in Python 3.8 on Mac (CASSANDRA-16053)
* Strip comment blocks from cqlsh input before processing statements (CASSANDRA-15802)
* Fix unicode chars error input (CASSANDRA-15990)
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 21c8e79..6abdfba 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -217,6 +217,7 @@ public class Config
public int min_free_space_per_drive_in_mb = 50;
public volatile int concurrent_materialized_view_builders = 1;
+ public volatile int reject_repair_compaction_threshold = Integer.MAX_VALUE;
/**
* @deprecated retry support removed on CASSANDRA-10992
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 397bf02..b47422b 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -3110,6 +3110,16 @@ public class DatabaseDescriptor
conf.check_for_duplicate_rows_during_compaction = enabled;
}
+ public static int getRepairPendingCompactionRejectThreshold()
+ {
+ return conf.reject_repair_compaction_threshold;
+ }
+
+ public static void setRepairPendingCompactionRejectThreshold(int value)
+ {
+ conf.reject_repair_compaction_threshold = value;
+ }
+
public static int getInitialRangeTombstoneListAllocationSize()
{
return conf.initial_range_tombstone_list_allocation_size;
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 47da8bb..51518cb 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -67,6 +67,14 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
case PREPARE_MSG:
PrepareMessage prepareMessage = (PrepareMessage) message.payload;
logger.debug("Preparing, {}", prepareMessage);
+
+ if (!ActiveRepairService.verifyCompactionsPendingThreshold(prepareMessage.parentRepairSession, prepareMessage.previewKind))
+ {
+ // error is logged in verifyCompactionsPendingThreshold
+ sendFailureResponse(message);
+ return;
+ }
+
List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(prepareMessage.tableIds.size());
for (TableId tableId : prepareMessage.tableIds)
{
@@ -221,7 +229,12 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
private void logErrorAndSendFailureResponse(String errorMessage, Message<?> respondTo)
{
logger.error(errorMessage);
- Message reply = respondTo.failureResponse(RequestFailureReason.UNKNOWN);
+ sendFailureResponse(respondTo);
+ }
+
+ private void sendFailureResponse(Message<?> respondTo)
+ {
+ Message<?> reply = respondTo.failureResponse(RequestFailureReason.UNKNOWN);
MessagingService.instance().send(reply, respondTo.from());
}
}
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index d386d37..e61bd38 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -34,6 +34,8 @@ import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+
+import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.locator.EndpointsByRange;
import org.apache.cassandra.locator.EndpointsForRange;
import org.slf4j.Logger;
@@ -433,8 +435,25 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
}
}
+ public static boolean verifyCompactionsPendingThreshold(UUID parentRepairSession, PreviewKind previewKind)
+ {
+ // Snapshot values so failure message is consistent with decision
+ int pendingCompactions = CompactionManager.instance.getPendingTasks();
+ int pendingThreshold = ActiveRepairService.instance.getRepairPendingCompactionRejectThreshold();
+ if (pendingCompactions > pendingThreshold)
+ {
+ logger.error("[{}] Rejecting incoming repair, pending compactions ({}) above threshold ({})",
+ previewKind.logPrefix(parentRepairSession), pendingCompactions, pendingThreshold);
+ return false;
+ }
+ return true;
+ }
+
public UUID prepareForRepair(UUID parentRepairSession, InetAddressAndPort coordinator, Set<InetAddressAndPort> endpoints, RepairOption options, boolean isForcedRepair, List<ColumnFamilyStore> columnFamilyStores)
{
+ if (!verifyCompactionsPendingThreshold(parentRepairSession, options.getPreviewKind()))
+ failRepair(parentRepairSession, "Rejecting incoming repair, pending compactions above threshold"); // failRepair throws exception
+
long repairedAt = getRepairedAt(options, isForcedRepair);
registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal(), options.getPreviewKind());
final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size());
@@ -710,4 +729,14 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
}
}
+ public int getRepairPendingCompactionRejectThreshold()
+ {
+ return DatabaseDescriptor.getRepairPendingCompactionRejectThreshold();
+ }
+
+ public void setRepairPendingCompactionRejectThreshold(int value)
+ {
+ DatabaseDescriptor.setRepairPendingCompactionRejectThreshold(value);
+ }
+
}
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java b/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
index d967280..f4d6c48 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
@@ -33,4 +33,7 @@ public interface ActiveRepairServiceMBean
public boolean getUseOffheapMerkleTrees();
public void setUseOffheapMerkleTrees(boolean value);
+
+ public int getRepairPendingCompactionRejectThreshold();
+ public void setRepairPendingCompactionRejectThreshold(int value);
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairOperationalTest.java b/test/distributed/org/apache/cassandra/distributed/test/RepairOperationalTest.java
new file mode 100644
index 0000000..499eae3
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairOperationalTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class RepairOperationalTest extends TestBaseImpl
+{
+ @Test
+ public void compactionBehindTest() throws IOException
+ {
+ try(Cluster cluster = init(Cluster.build(2)
+ .withConfig(config -> config.with(GOSSIP).with(NETWORK))
+ .withInstanceInitializer(ByteBuddyHelper::install)
+ .start()))
+ {
+ cluster.schemaChange("create table "+KEYSPACE+".tbl (id int primary key, x int)");
+ for (int i = 0; i < 10; i++)
+ cluster.coordinator(1).execute("insert into "+KEYSPACE+".tbl (id, x) VALUES (?,?)", ConsistencyLevel.ALL, i, i);
+ cluster.forEach(i -> i.flush(KEYSPACE));
+ cluster.forEach(i -> i.nodetoolResult("repair", "--full").asserts().success());
+ cluster.get(2).runOnInstance(() -> {
+ ByteBuddyHelper.pendingCompactions = 1000;
+ DatabaseDescriptor.setRepairPendingCompactionRejectThreshold(500);
+ });
+ // make sure repair gets rejected on both nodes if pendingCompactions > threshold:
+ cluster.forEach(i -> i.nodetoolResult("repair", "--full").asserts().failure());
+ cluster.get(2).runOnInstance(() -> ByteBuddyHelper.pendingCompactions = 499);
+ cluster.forEach(i -> i.nodetoolResult("repair", "--full").asserts().success());
+ }
+ }
+
+ public static class ByteBuddyHelper
+ {
+ public static volatile int pendingCompactions = 0;
+ static void install(ClassLoader cl, int nodeNumber)
+ {
+ if (nodeNumber == 2)
+ {
+ new ByteBuddy().redefine(CompactionManager.class)
+ .method(named("getPendingTasks"))
+ .intercept(MethodDelegation.to(ByteBuddyHelper.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ }
+ }
+
+ public static int getPendingTasks()
+ {
+ return pendingCompactions;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org