You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2022/01/17 08:20:42 UTC

[cassandra] branch cassandra-4.0 updated: Don't block gossip when clearing repair snapshots

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
     new 98e798f  Don't block gossip when clearing repair snapshots
98e798f is described below

commit 98e798f567368f826fc3a57ddb6cdc464e741fe3
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Tue Nov 23 15:55:48 2021 +0100

    Don't block gossip when clearing repair snapshots
    
    Patch by marcuse; reviewed by David Capwell for CASSANDRA-17168
---
 CHANGES.txt                                        |   1 +
 .../cassandra/repair/RepairMessageVerbHandler.java |   1 +
 .../repair/SystemDistributedKeyspace.java          |   2 +-
 .../cassandra/service/ActiveRepairService.java     |  37 ++++-
 .../distributed/test/ClearSnapshotTest.java        | 170 +++++++++++++++++++++
 5 files changed, 207 insertions(+), 4 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index c618000..0896d56 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0.2
+ * Don't block gossip when clearing repair snapshots (CASSANDRA-17168)
  * Deduplicate warnings for deprecated parameters (changed names) (CASSANDRA-17160)
  * Update ant-junit to version 1.10.12 (CASSANDRA-17218)
  * Add droppable tombstone metrics to nodetool tablestats (CASSANDRA-16308)
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 2bf6d84..bfc2657 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -109,6 +109,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                     }
 
                     ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId);
+                    prs.setHasSnapshots();
                     TableRepairManager repairManager = cfs.getRepairManager();
                     if (prs.isGlobal)
                     {
diff --git a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
index 7e8d8bc..a3f8774 100644
--- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
@@ -365,7 +365,7 @@ public final class SystemDistributedKeyspace
             {
                 valueList.add(bytes(v));
             }
-            QueryProcessor.process(fmtQry, ConsistencyLevel.ONE, valueList);
+            QueryProcessor.process(fmtQry, ConsistencyLevel.ANY, valueList);
         }
         catch (Throwable t)
         {
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 165582b..f2e8b6e 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -24,6 +24,7 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.management.openmbean.CompositeData;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -36,6 +37,7 @@ import com.google.common.util.concurrent.AbstractFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.locator.EndpointsByRange;
 import org.apache.cassandra.locator.EndpointsForRange;
@@ -196,6 +198,11 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
     private final Gossiper gossiper;
     private final Cache<Integer, Pair<ParentRepairStatus, List<String>>> repairStatusByCmd;
 
+    private final DebuggableThreadPoolExecutor clearSnapshotExecutor = DebuggableThreadPoolExecutor.createWithMaximumPoolSize("RepairClearSnapshot",
+                                                                                                                              1,
+                                                                                                                              1,
+                                                                                                                              TimeUnit.HOURS);
+
     public ActiveRepairService(IFailureDetector failureDetector, Gossiper gossiper)
     {
         this.failureDetector = failureDetector;
@@ -699,10 +706,22 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         ParentRepairSession session = parentRepairSessions.remove(parentSessionId);
         if (session == null)
             return null;
-        for (ColumnFamilyStore cfs : session.columnFamilyStores.values())
+
+        if (session.hasSnapshots)
         {
-            if (cfs.snapshotExists(snapshotName))
-                cfs.clearSnapshot(snapshotName);
+            clearSnapshotExecutor.submit(() -> {
+                logger.info("[repair #{}] Clearing snapshots for {}", parentSessionId,
+                            session.columnFamilyStores.values()
+                                                      .stream()
+                                                      .map(cfs -> cfs.metadata().toString()).collect(Collectors.joining(", ")));
+                long startNanos = System.nanoTime();
+                for (ColumnFamilyStore cfs : session.columnFamilyStores.values())
+                {
+                    if (cfs.snapshotExists(snapshotName))
+                        cfs.clearSnapshot(snapshotName);
+                }
+                logger.info("[repair #{}] Cleared snapshots in {}ms", parentSessionId, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
+            });
         }
         return session;
     }
@@ -744,6 +763,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         public final long repairedAt;
         public final InetAddressAndPort coordinator;
         public final PreviewKind previewKind;
+        public volatile boolean hasSnapshots = false;
 
         public ParentRepairSession(InetAddressAndPort coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal, PreviewKind previewKind)
         {
@@ -799,6 +819,11 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
                     ", repairedAt=" + repairedAt +
                     '}';
         }
+
+        public void setHasSnapshots()
+        {
+            hasSnapshots = true;
+        }
     }
 
     /*
@@ -866,4 +891,10 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
             parentSessionsToRemove.forEach(this::removeParentRepairSession);
         }
     }
+
+    @VisibleForTesting
+    public int parentRepairSessionCount()
+    {
+        return parentRepairSessions.size();
+    }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ClearSnapshotTest.java b/test/distributed/org/apache/cassandra/distributed/test/ClearSnapshotTest.java
new file mode 100644
index 0000000..3a1ba4e
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/ClearSnapshotTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.service.ActiveRepairService;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.junit.Assert.assertFalse;
+
+public class ClearSnapshotTest extends TestBaseImpl
+{
+    private static final Logger logger = LoggerFactory.getLogger(ClearSnapshotTest.class);
+
+    @Test
+    public void clearSnapshotSlowTest() throws IOException, InterruptedException, ExecutionException
+    {
+        try (Cluster cluster = init(Cluster.build(3).withConfig(config ->
+                                                                config.with(GOSSIP)
+                                                                      .with(NETWORK))
+                                          .withInstanceInitializer(BB::install)
+                                          .start()))
+        {
+            int tableCount = 50;
+            for (int i = 0; i < tableCount; i++)
+            {
+                String ksname = "ks"+i;
+                cluster.schemaChange("create keyspace "+ksname+" with replication = {'class': 'SimpleStrategy', 'replication_factor': 3}");
+                cluster.schemaChange("create table "+ksname+".tbl (id int primary key, t int)");
+                cluster.get(1).executeInternal("insert into "+ksname+".tbl (id , t) values (?, ?)", i, i);
+                cluster.forEach((node) -> node.flush(ksname));
+            }
+            List<Thread> repairThreads = new ArrayList<>();
+            for (int i = 0; i < tableCount; i++)
+            {
+                String ksname = "ks"+i;
+                Thread t = new Thread(() -> cluster.get(1).nodetoolResult("repair", "-full", ksname).asserts().success());
+                t.start();
+                repairThreads.add(t);
+            }
+            AtomicBoolean gotExc = new AtomicBoolean(false);
+            AtomicBoolean exit = new AtomicBoolean(false);
+            Thread reads = new Thread(() -> {
+                while (!exit.get())
+                {
+                    try
+                    {
+                        cluster.coordinator(1).execute("select * from ks1.tbl where id = 5", ConsistencyLevel.QUORUM);
+                        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
+                    }
+                    catch (Exception e)
+                    {
+                        if (!gotExc.get())
+                            logger.error("Unexpected exception querying table ks1.tbl", e);
+                        gotExc.set(true);
+                    }
+                }
+            });
+
+            reads.start();
+            long activeRepairs;
+            do
+            {
+                activeRepairs = cluster.get(1).callOnInstance(() -> ActiveRepairService.instance.parentRepairSessionCount());
+                Thread.sleep(50);
+            }
+            while (activeRepairs < 35);
+
+            cluster.setUncaughtExceptionsFilter((t) -> t.getMessage() != null && t.getMessage().contains("Parent repair session with id") );
+            cluster.get(2).shutdown().get();
+            repairThreads.forEach(t -> {
+                try
+                {
+                    t.join();
+                }
+                catch (InterruptedException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            });
+            exit.set(true);
+            reads.join();
+
+            assertFalse(gotExc.get());
+        }
+    }
+
+    public static class BB
+    {
+        public static void install(ClassLoader classLoader, Integer num)
+        {
+            new ByteBuddy().rebase(Directories.class)
+                           .method(named("snapshotExists"))
+                           .intercept(MethodDelegation.to(BB.class))
+                           .make()
+                           .load(classLoader, ClassLoadingStrategy.Default.INJECTION);
+
+        }
+
+        @SuppressWarnings("unused")
+        public static boolean snapshotExists(String name, @SuperCall Callable<Boolean> zuper)
+        {
+            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+            try
+            {
+                return zuper.call();
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @Test
+    public void testSeqClearsSnapshot() throws IOException, TimeoutException
+    {
+        try(Cluster cluster = init(Cluster.build(3).withConfig(config ->
+                                                               config.with(GOSSIP)
+                                                                     .with(NETWORK))
+                                          .withInstanceInitializer(BB::install)
+                                          .start()))
+        {
+            cluster.schemaChange(withKeyspace("create table %s.tbl (id int primary key, x int)"));
+            for (int i = 0; i < 10; i++)
+                cluster.get(1).executeInternal(withKeyspace("insert into %s.tbl (id, x) values (?, ?)"), i, i);
+            cluster.get(1).nodetoolResult("repair", "-seq", "-full", KEYSPACE).asserts().success();
+            cluster.get(1).logs().watchFor("Clearing snapshot");
+        }
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org