You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2022/01/17 08:20:42 UTC
[cassandra] branch cassandra-4.0 updated: Don't block gossip when clearing repair snapshots
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
new 98e798f Don't block gossip when clearing repair snapshots
98e798f is described below
commit 98e798f567368f826fc3a57ddb6cdc464e741fe3
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Tue Nov 23 15:55:48 2021 +0100
Don't block gossip when clearing repair snapshots
Patch by marcuse; reviewed by David Capwell for CASSANDRA-17168
---
CHANGES.txt | 1 +
.../cassandra/repair/RepairMessageVerbHandler.java | 1 +
.../repair/SystemDistributedKeyspace.java | 2 +-
.../cassandra/service/ActiveRepairService.java | 37 ++++-
.../distributed/test/ClearSnapshotTest.java | 170 +++++++++++++++++++++
5 files changed, 207 insertions(+), 4 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index c618000..0896d56 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0.2
+ * Don't block gossip when clearing repair snapshots (CASSANDRA-17168)
* Deduplicate warnings for deprecated parameters (changed names) (CASSANDRA-17160)
* Update ant-junit to version 1.10.12 (CASSANDRA-17218)
* Add droppable tombstone metrics to nodetool tablestats (CASSANDRA-16308)
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 2bf6d84..bfc2657 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -109,6 +109,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
}
ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId);
+ prs.setHasSnapshots();
TableRepairManager repairManager = cfs.getRepairManager();
if (prs.isGlobal)
{
diff --git a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
index 7e8d8bc..a3f8774 100644
--- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
@@ -365,7 +365,7 @@ public final class SystemDistributedKeyspace
{
valueList.add(bytes(v));
}
- QueryProcessor.process(fmtQry, ConsistencyLevel.ONE, valueList);
+ QueryProcessor.process(fmtQry, ConsistencyLevel.ANY, valueList);
}
catch (Throwable t)
{
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 165582b..f2e8b6e 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -24,6 +24,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.openmbean.CompositeData;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -36,6 +37,7 @@ import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.locator.EndpointsByRange;
import org.apache.cassandra.locator.EndpointsForRange;
@@ -196,6 +198,11 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
private final Gossiper gossiper;
private final Cache<Integer, Pair<ParentRepairStatus, List<String>>> repairStatusByCmd;
+ private final DebuggableThreadPoolExecutor clearSnapshotExecutor = DebuggableThreadPoolExecutor.createWithMaximumPoolSize("RepairClearSnapshot",
+ 1,
+ 1,
+ TimeUnit.HOURS);
+
public ActiveRepairService(IFailureDetector failureDetector, Gossiper gossiper)
{
this.failureDetector = failureDetector;
@@ -699,10 +706,22 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
ParentRepairSession session = parentRepairSessions.remove(parentSessionId);
if (session == null)
return null;
- for (ColumnFamilyStore cfs : session.columnFamilyStores.values())
+
+ if (session.hasSnapshots)
{
- if (cfs.snapshotExists(snapshotName))
- cfs.clearSnapshot(snapshotName);
+ clearSnapshotExecutor.submit(() -> {
+ logger.info("[repair #{}] Clearing snapshots for {}", parentSessionId,
+ session.columnFamilyStores.values()
+ .stream()
+ .map(cfs -> cfs.metadata().toString()).collect(Collectors.joining(", ")));
+ long startNanos = System.nanoTime();
+ for (ColumnFamilyStore cfs : session.columnFamilyStores.values())
+ {
+ if (cfs.snapshotExists(snapshotName))
+ cfs.clearSnapshot(snapshotName);
+ }
+ logger.info("[repair #{}] Cleared snapshots in {}ms", parentSessionId, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
+ });
}
return session;
}
@@ -744,6 +763,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
public final long repairedAt;
public final InetAddressAndPort coordinator;
public final PreviewKind previewKind;
+ public volatile boolean hasSnapshots = false;
public ParentRepairSession(InetAddressAndPort coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal, PreviewKind previewKind)
{
@@ -799,6 +819,11 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
", repairedAt=" + repairedAt +
'}';
}
+
+ public void setHasSnapshots()
+ {
+ hasSnapshots = true;
+ }
}
/*
@@ -866,4 +891,10 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
parentSessionsToRemove.forEach(this::removeParentRepairSession);
}
}
+
+ @VisibleForTesting
+ public int parentRepairSessionCount()
+ {
+ return parentRepairSessions.size();
+ }
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ClearSnapshotTest.java b/test/distributed/org/apache/cassandra/distributed/test/ClearSnapshotTest.java
new file mode 100644
index 0000000..3a1ba4e
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/ClearSnapshotTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.service.ActiveRepairService;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.junit.Assert.assertFalse;
+
+public class ClearSnapshotTest extends TestBaseImpl
+{
+ private static final Logger logger = LoggerFactory.getLogger(ClearSnapshotTest.class);
+
+ @Test
+ public void clearSnapshotSlowTest() throws IOException, InterruptedException, ExecutionException
+ {
+ try (Cluster cluster = init(Cluster.build(3).withConfig(config ->
+ config.with(GOSSIP)
+ .with(NETWORK))
+ .withInstanceInitializer(BB::install)
+ .start()))
+ {
+ int tableCount = 50;
+ for (int i = 0; i < tableCount; i++)
+ {
+ String ksname = "ks"+i;
+ cluster.schemaChange("create keyspace "+ksname+" with replication = {'class': 'SimpleStrategy', 'replication_factor': 3}");
+ cluster.schemaChange("create table "+ksname+".tbl (id int primary key, t int)");
+ cluster.get(1).executeInternal("insert into "+ksname+".tbl (id , t) values (?, ?)", i, i);
+ cluster.forEach((node) -> node.flush(ksname));
+ }
+ List<Thread> repairThreads = new ArrayList<>();
+ for (int i = 0; i < tableCount; i++)
+ {
+ String ksname = "ks"+i;
+ Thread t = new Thread(() -> cluster.get(1).nodetoolResult("repair", "-full", ksname).asserts().success());
+ t.start();
+ repairThreads.add(t);
+ }
+ AtomicBoolean gotExc = new AtomicBoolean(false);
+ AtomicBoolean exit = new AtomicBoolean(false);
+ Thread reads = new Thread(() -> {
+ while (!exit.get())
+ {
+ try
+ {
+ cluster.coordinator(1).execute("select * from ks1.tbl where id = 5", ConsistencyLevel.QUORUM);
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
+ }
+ catch (Exception e)
+ {
+ if (!gotExc.get())
+ logger.error("Unexpected exception querying table ks1.tbl", e);
+ gotExc.set(true);
+ }
+ }
+ });
+
+ reads.start();
+ long activeRepairs;
+ do
+ {
+ activeRepairs = cluster.get(1).callOnInstance(() -> ActiveRepairService.instance.parentRepairSessionCount());
+ Thread.sleep(50);
+ }
+ while (activeRepairs < 35);
+
+ cluster.setUncaughtExceptionsFilter((t) -> t.getMessage() != null && t.getMessage().contains("Parent repair session with id") );
+ cluster.get(2).shutdown().get();
+ repairThreads.forEach(t -> {
+ try
+ {
+ t.join();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+ exit.set(true);
+ reads.join();
+
+ assertFalse(gotExc.get());
+ }
+ }
+
+ public static class BB
+ {
+ public static void install(ClassLoader classLoader, Integer num)
+ {
+ new ByteBuddy().rebase(Directories.class)
+ .method(named("snapshotExists"))
+ .intercept(MethodDelegation.to(BB.class))
+ .make()
+ .load(classLoader, ClassLoadingStrategy.Default.INJECTION);
+
+ }
+
+ @SuppressWarnings("unused")
+ public static boolean snapshotExists(String name, @SuperCall Callable<Boolean> zuper)
+ {
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ try
+ {
+ return zuper.call();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Test
+ public void testSeqClearsSnapshot() throws IOException, TimeoutException
+ {
+ try(Cluster cluster = init(Cluster.build(3).withConfig(config ->
+ config.with(GOSSIP)
+ .with(NETWORK))
+ .withInstanceInitializer(BB::install)
+ .start()))
+ {
+ cluster.schemaChange(withKeyspace("create table %s.tbl (id int primary key, x int)"));
+ for (int i = 0; i < 10; i++)
+ cluster.get(1).executeInternal(withKeyspace("insert into %s.tbl (id, x) values (?, ?)"), i, i);
+ cluster.get(1).nodetoolResult("repair", "-seq", "-full", KEYSPACE).asserts().success();
+ cluster.get(1).logs().watchFor("Clearing snapshot");
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org