You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/08/24 07:12:44 UTC

[cassandra] branch trunk updated: Don't allow repair to overrun compaction

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dfd0aeb  Don't allow repair to overrun compaction
dfd0aeb is described below

commit dfd0aebf73faa8f910e68ff5d50f91fae4f2669e
Author: Jeff Jirsa <jj...@apple.com>
AuthorDate: Fri May 15 16:46:41 2020 -0700

    Don't allow repair to overrun compaction
    
    Patch by Jeff Jirsa; reviewed by David Capwell and marcuse
    for CASSANDRA-15817
---
 CHANGES.txt                                        |  1 +
 src/java/org/apache/cassandra/config/Config.java   |  1 +
 .../cassandra/config/DatabaseDescriptor.java       | 10 +++
 .../cassandra/repair/RepairMessageVerbHandler.java | 15 +++-
 .../cassandra/service/ActiveRepairService.java     | 29 ++++++++
 .../service/ActiveRepairServiceMBean.java          |  3 +
 .../distributed/test/RepairOperationalTest.java    | 83 ++++++++++++++++++++++
 7 files changed, 141 insertions(+), 1 deletion(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index f4efcbd..77a0652 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-beta2
+ * Prevent repair from overrunning compaction (CASSANDRA-15817)
  * fix cqlsh COPY functions in Python 3.8 on Mac (CASSANDRA-16053)
  * Strip comment blocks from cqlsh input before processing statements (CASSANDRA-15802)
  * Fix unicode chars error input (CASSANDRA-15990)
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 21c8e79..6abdfba 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -217,6 +217,7 @@ public class Config
     public int min_free_space_per_drive_in_mb = 50;
 
     public volatile int concurrent_materialized_view_builders = 1;
+    public volatile int reject_repair_compaction_threshold = Integer.MAX_VALUE;
 
     /**
      * @deprecated retry support removed on CASSANDRA-10992
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 397bf02..b47422b 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -3110,6 +3110,16 @@ public class DatabaseDescriptor
         conf.check_for_duplicate_rows_during_compaction = enabled;
     }
 
+    public static int getRepairPendingCompactionRejectThreshold()
+    {
+        return conf.reject_repair_compaction_threshold;
+    }
+
+    public static void setRepairPendingCompactionRejectThreshold(int value)
+    {
+        conf.reject_repair_compaction_threshold = value;
+    }
+
     public static int getInitialRangeTombstoneListAllocationSize()
     {
         return conf.initial_range_tombstone_list_allocation_size;
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 47da8bb..51518cb 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -67,6 +67,14 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                 case PREPARE_MSG:
                     PrepareMessage prepareMessage = (PrepareMessage) message.payload;
                     logger.debug("Preparing, {}", prepareMessage);
+
+                    if (!ActiveRepairService.verifyCompactionsPendingThreshold(prepareMessage.parentRepairSession, prepareMessage.previewKind))
+                    {
+                        // error is logged in verifyCompactionsPendingThreshold
+                        sendFailureResponse(message);
+                        return;
+                    }
+
                     List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(prepareMessage.tableIds.size());
                     for (TableId tableId : prepareMessage.tableIds)
                     {
@@ -221,7 +229,12 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
     private void logErrorAndSendFailureResponse(String errorMessage, Message<?> respondTo)
     {
         logger.error(errorMessage);
-        Message reply = respondTo.failureResponse(RequestFailureReason.UNKNOWN);
+        sendFailureResponse(respondTo);
+    }
+
+    private void sendFailureResponse(Message<?> respondTo)
+    {
+        Message<?> reply = respondTo.failureResponse(RequestFailureReason.UNKNOWN);
         MessagingService.instance().send(reply, respondTo.from());
     }
 }
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index d386d37..e61bd38 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -34,6 +34,8 @@ import com.google.common.util.concurrent.AbstractFuture;
 
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.locator.EndpointsByRange;
 import org.apache.cassandra.locator.EndpointsForRange;
 import org.slf4j.Logger;
@@ -433,8 +435,25 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         }
     }
 
+    public static boolean verifyCompactionsPendingThreshold(UUID parentRepairSession, PreviewKind previewKind)
+    {
+        // Snapshot values so failure message is consistent with decision
+        int pendingCompactions = CompactionManager.instance.getPendingTasks();
+        int pendingThreshold = ActiveRepairService.instance.getRepairPendingCompactionRejectThreshold();
+        if (pendingCompactions > pendingThreshold)
+        {
+            logger.error("[{}] Rejecting incoming repair, pending compactions ({}) above threshold ({})",
+                          previewKind.logPrefix(parentRepairSession), pendingCompactions, pendingThreshold);
+            return false;
+        }
+        return true;
+    }
+
     public UUID prepareForRepair(UUID parentRepairSession, InetAddressAndPort coordinator, Set<InetAddressAndPort> endpoints, RepairOption options, boolean isForcedRepair, List<ColumnFamilyStore> columnFamilyStores)
     {
+        if (!verifyCompactionsPendingThreshold(parentRepairSession, options.getPreviewKind()))
+            failRepair(parentRepairSession, "Rejecting incoming repair, pending compactions above threshold"); // failRepair throws exception
+
         long repairedAt = getRepairedAt(options, isForcedRepair);
         registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal(), options.getPreviewKind());
         final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size());
@@ -710,4 +729,14 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         }
     }
 
+    public int getRepairPendingCompactionRejectThreshold()
+    {
+        return DatabaseDescriptor.getRepairPendingCompactionRejectThreshold();
+    }
+
+    public void setRepairPendingCompactionRejectThreshold(int value)
+    {
+        DatabaseDescriptor.setRepairPendingCompactionRejectThreshold(value);
+    }
+
 }
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java b/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
index d967280..f4d6c48 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
@@ -33,4 +33,7 @@ public interface ActiveRepairServiceMBean
 
     public boolean getUseOffheapMerkleTrees();
     public void setUseOffheapMerkleTrees(boolean value);
+
+    public int getRepairPendingCompactionRejectThreshold();
+    public void setRepairPendingCompactionRejectThreshold(int value);
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairOperationalTest.java b/test/distributed/org/apache/cassandra/distributed/test/RepairOperationalTest.java
new file mode 100644
index 0000000..499eae3
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairOperationalTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class RepairOperationalTest extends TestBaseImpl
+{
+    @Test
+    public void compactionBehindTest() throws IOException
+    {
+        try(Cluster cluster = init(Cluster.build(2)
+                                          .withConfig(config -> config.with(GOSSIP).with(NETWORK))
+                                          .withInstanceInitializer(ByteBuddyHelper::install)
+                                          .start()))
+        {
+            cluster.schemaChange("create table "+KEYSPACE+".tbl (id int primary key, x int)");
+            for (int i = 0; i < 10; i++)
+                cluster.coordinator(1).execute("insert into "+KEYSPACE+".tbl (id, x) VALUES (?,?)", ConsistencyLevel.ALL, i, i);
+            cluster.forEach(i -> i.flush(KEYSPACE));
+            cluster.forEach(i -> i.nodetoolResult("repair", "--full").asserts().success());
+            cluster.get(2).runOnInstance(() -> {
+                ByteBuddyHelper.pendingCompactions = 1000;
+                DatabaseDescriptor.setRepairPendingCompactionRejectThreshold(500);
+            });
+            // make sure repair gets rejected on both nodes if pendingCompactions > threshold:
+            cluster.forEach(i -> i.nodetoolResult("repair", "--full").asserts().failure());
+            cluster.get(2).runOnInstance(() -> ByteBuddyHelper.pendingCompactions = 499);
+            cluster.forEach(i -> i.nodetoolResult("repair", "--full").asserts().success());
+        }
+    }
+
+    public static class ByteBuddyHelper
+    {
+        public static volatile int pendingCompactions = 0;
+        static void install(ClassLoader cl, int nodeNumber)
+        {
+            if (nodeNumber == 2)
+            {
+                new ByteBuddy().redefine(CompactionManager.class)
+                               .method(named("getPendingTasks"))
+                               .intercept(MethodDelegation.to(ByteBuddyHelper.class))
+                               .make()
+                               .load(cl, ClassLoadingStrategy.Default.INJECTION);
+            }
+        }
+
+        public static int getPendingTasks()
+        {
+            return pendingCompactions;
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org