You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2018/12/19 17:51:58 UTC
cassandra git commit: Make antiCompactGroup throw exception on
failures and make anticompaction non cancellable again.
Repository: cassandra
Updated Branches:
refs/heads/trunk a41b861fa -> 0a79f9f5c
Make antiCompactGroup throw exception on failures and make anticompaction
non cancellable again.
Patch by marcuse; reviewed by Blake Eggleston for CASSANDRA-14936
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0a79f9f5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0a79f9f5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0a79f9f5
Branch: refs/heads/trunk
Commit: 0a79f9f5c970dcb8265814cd5dc361eb2d4bec6b
Parents: a41b861
Author: Marcus Eriksson <ma...@apache.org>
Authored: Thu Dec 13 14:31:54 2018 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Dec 19 18:49:58 2018 +0100
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../db/compaction/CompactionIterator.java | 14 +-
.../db/compaction/CompactionManager.java | 15 +-
.../db/repair/PendingAntiCompaction.java | 17 +-
.../db/compaction/CompactionIteratorTest.java | 20 ++
.../AbstractPendingAntiCompactionTest.java | 112 +++++++++
.../PendingAntiCompactionBytemanTest.java | 96 ++++++++
.../db/repair/PendingAntiCompactionTest.java | 237 ++++++++++++++-----
8 files changed, 442 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a79f9f5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bde5b52..66442e9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
4.0
+ * Make antiCompactGroup throw exception on error and anticompaction non cancellable
+ again (CASSANDRA-14936)
* Catch empty/invalid bounds in SelectStatement (CASSANDRA-14849)
* Auto-expand replication_factor for NetworkTopologyStrategy (CASSANDRA-14303)
* Transient Replication: support EACH_QUORUM (CASSANDRA-14727)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a79f9f5/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index f8e32a8..c73520d 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -77,12 +77,17 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId)
{
- this(type, scanners, controller, nowInSec, compactionId, null);
+ this(type, scanners, controller, nowInSec, compactionId, null, true);
}
- @SuppressWarnings("resource") // We make sure to close mergedIterator in close() and CompactionIterator is itself an AutoCloseable
public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId, CompactionMetrics metrics)
{
+ this(type, scanners, controller, nowInSec, compactionId, metrics, true);
+ }
+
+ @SuppressWarnings("resource") // We make sure to close mergedIterator in close() and CompactionIterator is itself an AutoCloseable
+ public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId, CompactionMetrics metrics, boolean abortable)
+ {
this.controller = controller;
this.type = type;
this.scanners = scanners;
@@ -105,7 +110,10 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
: UnfilteredPartitionIterators.merge(scanners, listener());
merged = Transformation.apply(merged, new GarbageSkipper(controller));
merged = Transformation.apply(merged, new Purger(controller, nowInSec));
- compacted = Transformation.apply(merged, new AbortableUnfilteredPartitionTransformation(this));
+ if (abortable)
+ compacted = Transformation.apply(merged, new AbortableUnfilteredPartitionTransformation(this));
+ else
+ compacted = merged;
}
public TableMetadata metadata()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a79f9f5/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index bc5a883..3eebd75 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1432,7 +1432,8 @@ public class CompactionManager implements CompactionManagerMBean
LifecycleTransaction txn,
UUID pendingRepair)
{
- logger.info("Performing anticompaction on {} sstables", txn.originals().size());
+ int originalCount = txn.originals().size();
+ logger.info("Performing anticompaction on {} sstables", originalCount);
//Group SSTables
Set<SSTableReader> sstables = txn.originals();
@@ -1457,7 +1458,7 @@ public class CompactionManager implements CompactionManagerMBean
}
String format = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s).";
- logger.info(format, txn.originals().size(), antiCompactedSSTableCount);
+ logger.info(format, originalCount, antiCompactedSSTableCount);
}
private int antiCompactGroup(ColumnFamilyStore cfs,
@@ -1494,7 +1495,7 @@ public class CompactionManager implements CompactionManagerMBean
AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(txn.originals());
CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs, nowInSec));
- CompactionIterator ci = new CompactionIterator(OperationType.ANTICOMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID(), metrics))
+ CompactionIterator ci = getAntiCompactionIterator(scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID(), metrics))
{
int expectedBloomFilterSize = Math.max(cfs.metadata().params.minIndexInterval, (int)(SSTableReader.getApproximateKeyCount(sstableAsSet)));
@@ -1550,8 +1551,14 @@ public class CompactionManager implements CompactionManagerMBean
{
JVMStabilityInspector.inspectThrowable(e);
logger.error("Error anticompacting " + txn, e);
+ throw e;
}
- return 0;
+ }
+
+ @VisibleForTesting
+ public static CompactionIterator getAntiCompactionIterator(List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID timeUUID, CompactionMetrics metrics)
+ {
+ return new CompactionIterator(OperationType.ANTICOMPACTION, scanners, controller, nowInSec, timeUUID, metrics, false);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a79f9f5/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
index 2829f0f..1bc2fce 100644
--- a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
+++ b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AsyncFunction;
@@ -267,12 +268,24 @@ public class PendingAntiCompaction
for (ColumnFamilyStore cfs : tables)
{
cfs.forceBlockingFlush();
- ListenableFutureTask<AcquireResult> task = ListenableFutureTask.create(new AcquisitionCallable(cfs, tokenRanges.ranges(), prsId));
+ ListenableFutureTask<AcquireResult> task = ListenableFutureTask.create(getAcquisitionCallable(cfs, tokenRanges.ranges(), prsId));
executor.submit(task);
tasks.add(task);
}
ListenableFuture<List<AcquireResult>> acquisitionResults = Futures.successfulAsList(tasks);
- ListenableFuture compactionResult = Futures.transformAsync(acquisitionResults, new AcquisitionCallback(prsId, tokenRanges), MoreExecutors.directExecutor());
+ ListenableFuture compactionResult = Futures.transformAsync(acquisitionResults, getAcquisitionCallback(prsId, tokenRanges), MoreExecutors.directExecutor());
return compactionResult;
}
+
+ @VisibleForTesting
+ protected AcquisitionCallable getAcquisitionCallable(ColumnFamilyStore cfs, Set<Range<Token>> ranges, UUID prsId)
+ {
+ return new AcquisitionCallable(cfs, ranges, prsId);
+ }
+
+ @VisibleForTesting
+ protected AcquisitionCallback getAcquisitionCallback(UUID prsId, RangesAtEndpoint tokenRanges)
+ {
+ return new AcquisitionCallback(prsId, tokenRanges);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a79f9f5/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
index d5ea56c..864ef3e 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
@@ -371,6 +371,26 @@ public class CompactionIteratorTest
}
}
+ @Test
+ public void noTransformPartitionTest()
+ {
+ UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false);
+ List<List<Unfiltered>> inputLists = parse(new String[] {"10[100] 11[100] 12[100]"}, generator);
+ List<List<Unfiltered>> tombstoneLists = parse(new String[] {}, generator);
+ List<Iterable<UnfilteredRowIterator>> content = ImmutableList.copyOf(Iterables.transform(inputLists, list -> ImmutableList.of(listToIterator(list, kk))));
+ Map<DecoratedKey, Iterable<UnfilteredRowIterator>> transformedSources = new TreeMap<>();
+ transformedSources.put(kk, Iterables.transform(tombstoneLists, list -> listToIterator(list, kk)));
+ try (CompactionController controller = new Controller(Keyspace.openAndGetStore(metadata), transformedSources, GC_BEFORE);
+ CompactionIterator iter = new CompactionIterator(OperationType.COMPACTION,
+ Lists.transform(content, x -> new Scanner(x)),
+ controller, NOW, null, null, false))
+ {
+ iter.stop();
+ // not abortable CompactionIterator
+ assertTrue(iter.hasNext());
+ }
+ }
+
class Controller extends CompactionController
{
private final Map<DecoratedKey, Iterable<UnfilteredRowIterator>> tombstoneSources;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a79f9f5/test/unit/org/apache/cassandra/db/repair/AbstractPendingAntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/repair/AbstractPendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/db/repair/AbstractPendingAntiCompactionTest.java
new file mode 100644
index 0000000..5adb7d6
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/repair/AbstractPendingAntiCompactionTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.repair;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.repair.AbstractRepairTest;
+import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ActiveRepairService;
+
+@Ignore
+public abstract class AbstractPendingAntiCompactionTest
+{
+
+ static final Collection<Range<Token>> FULL_RANGE;
+ static final Collection<Range<Token>> NO_RANGES = Collections.emptyList();
+ static InetAddressAndPort local;
+
+ static
+ {
+ DatabaseDescriptor.daemonInitialization();
+ Token minToken = DatabaseDescriptor.getPartitioner().getMinimumToken();
+ FULL_RANGE = Collections.singleton(new Range<>(minToken, minToken));
+ }
+
+ String ks;
+ final String tbl = "tbl";
+ final String tbl2 = "tbl2";
+
+ TableMetadata cfm;
+ ColumnFamilyStore cfs;
+ ColumnFamilyStore cfs2;
+
+ @BeforeClass
+ public static void setupClass() throws Throwable
+ {
+ SchemaLoader.prepareServer();
+ local = InetAddressAndPort.getByName("127.0.0.1");
+ ActiveRepairService.instance.consistent.local.start();
+ }
+
+ @Before
+ public void setup()
+ {
+ ks = "ks_" + System.currentTimeMillis();
+ cfm = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl), ks).build();
+ TableMetadata cfm2 = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl2), ks).build();
+ SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm, cfm2);
+ cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id);
+ cfs2 = Schema.instance.getColumnFamilyStoreInstance(cfm2.id);
+ }
+
+ void makeSSTables(int num)
+ {
+ makeSSTables(num, cfs, 2);
+ }
+
+ void makeSSTables(int num, ColumnFamilyStore cfs, int rowsPerSSTable)
+ {
+ for (int i = 0; i < num; i++)
+ {
+ int val = i * rowsPerSSTable; // multiplied to prevent ranges from overlapping
+ for (int j = 0; j < rowsPerSSTable; j++)
+ QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, cfs.getTableName()), val + j, val + j);
+ cfs.forceBlockingFlush();
+ }
+ Assert.assertEquals(num, cfs.getLiveSSTables().size());
+ }
+
+ UUID prepareSession()
+ {
+ UUID sessionID = AbstractRepairTest.registerSession(cfs, true, true);
+ LocalSessionAccessor.prepareUnsafe(sessionID, AbstractRepairTest.COORDINATOR, Sets.newHashSet(AbstractRepairTest.COORDINATOR));
+ return sessionID;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a79f9f5/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionBytemanTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionBytemanTest.java b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionBytemanTest.java
new file mode 100644
index 0000000..2f2612a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionBytemanTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.repair;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.Replica;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(BMUnitRunner.class)
+public class PendingAntiCompactionBytemanTest extends AbstractPendingAntiCompactionTest
+{
+ @BMRules(rules = { @BMRule(name = "Throw exception anticompaction",
+ targetClass = "Range$OrderedRangeContainmentChecker",
+ targetMethod = "test",
+ action = "throw new org.apache.cassandra.db.compaction.CompactionInterruptedException(null);")} )
+ @Test
+ public void testExceptionAnticompaction() throws InterruptedException
+ {
+ cfs.disableAutoCompaction();
+ cfs2.disableAutoCompaction();
+ ExecutorService es = Executors.newFixedThreadPool(1);
+ makeSSTables(4, cfs, 5);
+ makeSSTables(4, cfs2, 5);
+ List<Range<Token>> ranges = new ArrayList<>();
+
+ for (SSTableReader sstable : cfs.getLiveSSTables())
+ {
+ ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
+ }
+ UUID prsid = prepareSession();
+ try
+ {
+ PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Lists.newArrayList(cfs, cfs2), atEndpoint(ranges, NO_RANGES), es);
+ pac.run().get();
+ fail("PAC should throw exception when anticompaction throws exception!");
+ }
+ catch (ExecutionException e)
+ {
+ assertTrue(e.getCause() instanceof CompactionInterruptedException);
+ }
+ // and make sure nothing is marked compacting
+ assertTrue(cfs.getTracker().getCompacting().isEmpty());
+ assertTrue(cfs2.getTracker().getCompacting().isEmpty());
+ assertEquals(4, cfs.getLiveSSTables().size());
+ assertEquals(4, cfs2.getLiveSSTables().size());
+ }
+
+ private static RangesAtEndpoint atEndpoint(Collection<Range<Token>> full, Collection<Range<Token>> trans)
+ {
+ RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(local);
+ for (Range<Token> range : full)
+ builder.add(new Replica(local, range, true));
+
+ for (Range<Token> range : trans)
+ builder.add(new Replica(local, range, false));
+
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a79f9f5/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
index ab32e5b..12a429b 100644
--- a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
@@ -25,93 +25,58 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import javax.annotation.Nullable;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.CompactionController;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+import org.apache.cassandra.db.compaction.CompactionIterator;
import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.dht.ByteOrderedPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.repair.AbstractRepairTest;
import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
-import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.utils.concurrent.Transactional;
-public class PendingAntiCompactionTest
+public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
{
- private static final Logger logger = LoggerFactory.getLogger(PendingAntiCompactionTest.class);
- private static final Collection<Range<Token>> FULL_RANGE;
- private static final Collection<Range<Token>> NO_RANGES = Collections.emptyList();
- private static InetAddressAndPort local;
-
- static
- {
- DatabaseDescriptor.daemonInitialization();
- Token minToken = DatabaseDescriptor.getPartitioner().getMinimumToken();
- FULL_RANGE = Collections.singleton(new Range<>(minToken, minToken));
- }
-
- private String ks;
- private final String tbl = "tbl";
- private TableMetadata cfm;
- private ColumnFamilyStore cfs;
-
- @BeforeClass
- public static void setupClass() throws Throwable
- {
- SchemaLoader.prepareServer();
- local = InetAddressAndPort.getByName("127.0.0.1");
- ActiveRepairService.instance.consistent.local.start();
- }
-
- @Before
- public void setup()
- {
- ks = "ks_" + System.currentTimeMillis();
- cfm = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl), ks).build();
- SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm);
- cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id);
-
- }
-
- private void makeSSTables(int num)
- {
- for (int i = 0; i < num; i++)
- {
- int val = i * 2; // multiplied to prevent ranges from overlapping
- QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), val, val);
- QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), val+1, val+1);
- cfs.forceBlockingFlush();
- }
- Assert.assertEquals(num, cfs.getLiveSSTables().size());
- }
+ static final Logger logger = LoggerFactory.getLogger(PendingAntiCompactionTest.class);
private static class InstrumentedAcquisitionCallback extends PendingAntiCompaction.AcquisitionCallback
{
@@ -130,13 +95,6 @@ public class PendingAntiCompactionTest
}
}
- private UUID prepareSession()
- {
- UUID sessionID = AbstractRepairTest.registerSession(cfs, true, true);
- LocalSessionAccessor.prepareUnsafe(sessionID, AbstractRepairTest.COORDINATOR, Sets.newHashSet(AbstractRepairTest.COORDINATOR));
- return sessionID;
- }
-
/**
* verify the pending anti compaction happy path
*/
@@ -405,6 +363,161 @@ public class PendingAntiCompactionTest
}
+ /**
+ * Makes sure that PendingAntiCompaction fails when anticompaction throws exception
+ */
+ @Test
+ public void antiCompactionException()
+ {
+ cfs.disableAutoCompaction();
+ makeSSTables(2);
+ UUID prsid = UUID.randomUUID();
+ ListeningExecutorService es = MoreExecutors.listeningDecorator(MoreExecutors.newDirectExecutorService());
+ PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), es) {
+ @Override
+ protected AcquisitionCallback getAcquisitionCallback(UUID prsId, RangesAtEndpoint tokenRanges)
+ {
+ return new AcquisitionCallback(prsid, tokenRanges)
+ {
+ @Override
+ ListenableFuture<?> submitPendingAntiCompaction(AcquireResult result)
+ {
+ Runnable r = new WrappedRunnable()
+ {
+ protected void runMayThrow()
+ {
+ throw new CompactionInterruptedException(null);
+ }
+ };
+ return es.submit(r);
+ }
+ };
+ }
+ };
+ ListenableFuture<?> fut = pac.run();
+ try
+ {
+ fut.get();
+ Assert.fail("Should throw exception");
+ }
+ catch(Throwable t)
+ {
+ }
+ }
+
+ @Test
+ public void testBlockedAcquisition() throws ExecutionException, InterruptedException
+ {
+ cfs.disableAutoCompaction();
+ ExecutorService es = Executors.newFixedThreadPool(1);
+
+ makeSSTables(2);
+ UUID prsid = UUID.randomUUID();
+ Set<SSTableReader> sstables = cfs.getLiveSSTables();
+ List<ISSTableScanner> scanners = sstables.stream().map(SSTableReader::getScanner).collect(Collectors.toList());
+ try
+ {
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
+ CompactionController controller = new CompactionController(cfs, sstables, 0);
+ CompactionIterator ci = CompactionManager.getAntiCompactionIterator(scanners, controller, 0, UUID.randomUUID(), CompactionManager.instance.getMetrics()))
+ {
+ // `ci` is our imaginary ongoing anticompaction which makes no progress until after 30s
+ // now we try to start a new AC, which will try to cancel all ongoing compactions
+
+ CompactionManager.instance.getMetrics().beginCompaction(ci);
+ PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), es);
+ ListenableFuture fut = pac.run();
+ try
+ {
+ fut.get(30, TimeUnit.SECONDS);
+ }
+ catch (TimeoutException e)
+ {
+ // expected, we wait 1 minute for compactions to get cancelled in runWithCompactionsDisabled
+ }
+ Assert.assertTrue(ci.hasNext());
+ ci.next(); // this would throw exception if the CompactionIterator was abortable
+ try
+ {
+ fut.get();
+ Assert.fail("We should get exception when trying to start a new anticompaction with the same sstables");
+ }
+ catch (Throwable t)
+ {
+
+ }
+ }
+ }
+ finally
+ {
+ es.shutdown();
+ ISSTableScanner.closeAllAndPropagate(scanners, null);
+ }
+ }
+
+ @Test
+ public void testUnblockedAcquisition() throws ExecutionException, InterruptedException
+ {
+ cfs.disableAutoCompaction();
+ ExecutorService es = Executors.newFixedThreadPool(1);
+ makeSSTables(2);
+ UUID prsid = prepareSession();
+ Set<SSTableReader> sstables = cfs.getLiveSSTables();
+ List<ISSTableScanner> scanners = sstables.stream().map(SSTableReader::getScanner).collect(Collectors.toList());
+ try
+ {
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
+ CompactionController controller = new CompactionController(cfs, sstables, 0);
+ CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners, controller, 0, UUID.randomUUID()))
+ {
+ // `ci` is our imaginary ongoing anticompaction which makes no progress until after 5s
+ // now we try to start a new AC, which will try to cancel all ongoing compactions
+
+ CompactionManager.instance.getMetrics().beginCompaction(ci);
+ PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), es);
+ ListenableFuture fut = pac.run();
+ try
+ {
+ fut.get(5, TimeUnit.SECONDS);
+ }
+ catch (TimeoutException e)
+ {
+ // expected, we wait 1 minute for compactions to get cancelled in runWithCompactionsDisabled, but we are not iterating
+ // CompactionIterator so the compaction is not actually cancelled
+ }
+ try
+ {
+ Assert.assertTrue(ci.hasNext());
+ ci.next();
+ Assert.fail("CompactionIterator should be abortable");
+ }
+ catch (CompactionInterruptedException e)
+ {
+ CompactionManager.instance.getMetrics().finishCompaction(ci);
+ txn.abort();
+ // expected
+ }
+ CountDownLatch cdl = new CountDownLatch(1);
+ Futures.addCallback(fut, new FutureCallback<Object>()
+ {
+ public void onSuccess(@Nullable Object o)
+ {
+ cdl.countDown();
+ }
+
+ public void onFailure(Throwable throwable)
+ {
+ }
+ });
+ Assert.assertTrue(cdl.await(1, TimeUnit.MINUTES));
+ }
+ }
+ finally
+ {
+ es.shutdown();
+ }
+ }
+
private static RangesAtEndpoint atEndpoint(Collection<Range<Token>> full, Collection<Range<Token>> trans)
{
RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(local);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org