You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2018/12/19 17:51:58 UTC

cassandra git commit: Make antiCompactGroup throw exception on failures and make anticompaction non cancellable again.

Repository: cassandra
Updated Branches:
  refs/heads/trunk a41b861fa -> 0a79f9f5c


Make antiCompactGroup throw exception on failures and make anticompaction
non cancellable again.

Patch by marcuse; reviewed by Blake Eggleston for CASSANDRA-14936


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0a79f9f5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0a79f9f5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0a79f9f5

Branch: refs/heads/trunk
Commit: 0a79f9f5c970dcb8265814cd5dc361eb2d4bec6b
Parents: a41b861
Author: Marcus Eriksson <ma...@apache.org>
Authored: Thu Dec 13 14:31:54 2018 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Dec 19 18:49:58 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../db/compaction/CompactionIterator.java       |  14 +-
 .../db/compaction/CompactionManager.java        |  15 +-
 .../db/repair/PendingAntiCompaction.java        |  17 +-
 .../db/compaction/CompactionIteratorTest.java   |  20 ++
 .../AbstractPendingAntiCompactionTest.java      | 112 +++++++++
 .../PendingAntiCompactionBytemanTest.java       |  96 ++++++++
 .../db/repair/PendingAntiCompactionTest.java    | 237 ++++++++++++++-----
 8 files changed, 442 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a79f9f5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bde5b52..66442e9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 4.0
+ * Make antiCompactGroup throw exception on error and anticompaction non cancellable
+   again (CASSANDRA-14936)
  * Catch empty/invalid bounds in SelectStatement (CASSANDRA-14849)
  * Auto-expand replication_factor for NetworkTopologyStrategy (CASSANDRA-14303)
  * Transient Replication: support EACH_QUORUM (CASSANDRA-14727)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a79f9f5/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index f8e32a8..c73520d 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -77,12 +77,17 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
 
     public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId)
     {
-        this(type, scanners, controller, nowInSec, compactionId, null);
+        this(type, scanners, controller, nowInSec, compactionId, null, true);
     }
 
-    @SuppressWarnings("resource") // We make sure to close mergedIterator in close() and CompactionIterator is itself an AutoCloseable
     public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId, CompactionMetrics metrics)
     {
+        this(type, scanners, controller, nowInSec, compactionId, metrics, true);
+    }
+
+    @SuppressWarnings("resource") // We make sure to close mergedIterator in close() and CompactionIterator is itself an AutoCloseable
+    public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId, CompactionMetrics metrics, boolean abortable)
+    {
         this.controller = controller;
         this.type = type;
         this.scanners = scanners;
@@ -105,7 +110,10 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
                                            : UnfilteredPartitionIterators.merge(scanners, listener());
         merged = Transformation.apply(merged, new GarbageSkipper(controller));
         merged = Transformation.apply(merged, new Purger(controller, nowInSec));
-        compacted = Transformation.apply(merged, new AbortableUnfilteredPartitionTransformation(this));
+        if (abortable)
+            compacted = Transformation.apply(merged, new AbortableUnfilteredPartitionTransformation(this));
+        else
+            compacted = merged;
     }
 
     public TableMetadata metadata()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a79f9f5/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index bc5a883..3eebd75 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1432,7 +1432,8 @@ public class CompactionManager implements CompactionManagerMBean
                                   LifecycleTransaction txn,
                                   UUID pendingRepair)
     {
-        logger.info("Performing anticompaction on {} sstables", txn.originals().size());
+        int originalCount = txn.originals().size();
+        logger.info("Performing anticompaction on {} sstables", originalCount);
 
         //Group SSTables
         Set<SSTableReader> sstables = txn.originals();
@@ -1457,7 +1458,7 @@ public class CompactionManager implements CompactionManagerMBean
         }
 
         String format = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s).";
-        logger.info(format, txn.originals().size(), antiCompactedSSTableCount);
+        logger.info(format, originalCount, antiCompactedSSTableCount);
     }
 
     private int antiCompactGroup(ColumnFamilyStore cfs,
@@ -1494,7 +1495,7 @@ public class CompactionManager implements CompactionManagerMBean
 
              AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(txn.originals());
              CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs, nowInSec));
-             CompactionIterator ci = new CompactionIterator(OperationType.ANTICOMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID(), metrics))
+             CompactionIterator ci = getAntiCompactionIterator(scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID(), metrics))
         {
             int expectedBloomFilterSize = Math.max(cfs.metadata().params.minIndexInterval, (int)(SSTableReader.getApproximateKeyCount(sstableAsSet)));
 
@@ -1550,8 +1551,14 @@ public class CompactionManager implements CompactionManagerMBean
         {
             JVMStabilityInspector.inspectThrowable(e);
             logger.error("Error anticompacting " + txn, e);
+            throw e;
         }
-        return 0;
+    }
+
+    @VisibleForTesting
+    public static CompactionIterator getAntiCompactionIterator(List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID timeUUID, CompactionMetrics metrics)
+    {
+        return new CompactionIterator(OperationType.ANTICOMPACTION, scanners, controller, nowInSec, timeUUID, metrics, false);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a79f9f5/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
index 2829f0f..1bc2fce 100644
--- a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
+++ b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.AsyncFunction;
@@ -267,12 +268,24 @@ public class PendingAntiCompaction
         for (ColumnFamilyStore cfs : tables)
         {
             cfs.forceBlockingFlush();
-            ListenableFutureTask<AcquireResult> task = ListenableFutureTask.create(new AcquisitionCallable(cfs, tokenRanges.ranges(), prsId));
+            ListenableFutureTask<AcquireResult> task = ListenableFutureTask.create(getAcquisitionCallable(cfs, tokenRanges.ranges(), prsId));
             executor.submit(task);
             tasks.add(task);
         }
         ListenableFuture<List<AcquireResult>> acquisitionResults = Futures.successfulAsList(tasks);
-        ListenableFuture compactionResult = Futures.transformAsync(acquisitionResults, new AcquisitionCallback(prsId, tokenRanges), MoreExecutors.directExecutor());
+        ListenableFuture compactionResult = Futures.transformAsync(acquisitionResults, getAcquisitionCallback(prsId, tokenRanges), MoreExecutors.directExecutor());
         return compactionResult;
     }
+
+    @VisibleForTesting
+    protected AcquisitionCallable getAcquisitionCallable(ColumnFamilyStore cfs, Set<Range<Token>> ranges, UUID prsId)
+    {
+        return new AcquisitionCallable(cfs, ranges, prsId);
+    }
+
+    @VisibleForTesting
+    protected AcquisitionCallback getAcquisitionCallback(UUID prsId, RangesAtEndpoint tokenRanges)
+    {
+        return new AcquisitionCallback(prsId, tokenRanges);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a79f9f5/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
index d5ea56c..864ef3e 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
@@ -371,6 +371,26 @@ public class CompactionIteratorTest
         }
     }
 
+    @Test
+    public void noTransformPartitionTest()
+    {
+        UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false);
+        List<List<Unfiltered>> inputLists = parse(new String[] {"10[100] 11[100] 12[100]"}, generator);
+        List<List<Unfiltered>> tombstoneLists = parse(new String[] {}, generator);
+        List<Iterable<UnfilteredRowIterator>> content = ImmutableList.copyOf(Iterables.transform(inputLists, list -> ImmutableList.of(listToIterator(list, kk))));
+        Map<DecoratedKey, Iterable<UnfilteredRowIterator>> transformedSources = new TreeMap<>();
+        transformedSources.put(kk, Iterables.transform(tombstoneLists, list -> listToIterator(list, kk)));
+        try (CompactionController controller = new Controller(Keyspace.openAndGetStore(metadata), transformedSources, GC_BEFORE);
+             CompactionIterator iter = new CompactionIterator(OperationType.COMPACTION,
+                                                              Lists.transform(content, x -> new Scanner(x)),
+                                                              controller, NOW, null, null, false))
+        {
+            iter.stop();
+            // not abortable CompactionIterator
+            assertTrue(iter.hasNext());
+        }
+    }
+
     class Controller extends CompactionController
     {
         private final Map<DecoratedKey, Iterable<UnfilteredRowIterator>> tombstoneSources;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a79f9f5/test/unit/org/apache/cassandra/db/repair/AbstractPendingAntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/repair/AbstractPendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/db/repair/AbstractPendingAntiCompactionTest.java
new file mode 100644
index 0000000..5adb7d6
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/repair/AbstractPendingAntiCompactionTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.repair;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.repair.AbstractRepairTest;
+import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ActiveRepairService;
+
+@Ignore
+public abstract class AbstractPendingAntiCompactionTest
+{
+
+    static final Collection<Range<Token>> FULL_RANGE;
+    static final Collection<Range<Token>> NO_RANGES = Collections.emptyList();
+    static InetAddressAndPort local;
+
+    static
+    {
+        DatabaseDescriptor.daemonInitialization();
+        Token minToken = DatabaseDescriptor.getPartitioner().getMinimumToken();
+        FULL_RANGE = Collections.singleton(new Range<>(minToken, minToken));
+    }
+
+    String ks;
+    final String tbl = "tbl";
+    final String tbl2 = "tbl2";
+
+    TableMetadata cfm;
+    ColumnFamilyStore cfs;
+    ColumnFamilyStore cfs2;
+
+    @BeforeClass
+    public static void setupClass() throws Throwable
+    {
+        SchemaLoader.prepareServer();
+        local = InetAddressAndPort.getByName("127.0.0.1");
+        ActiveRepairService.instance.consistent.local.start();
+    }
+
+    @Before
+    public void setup()
+    {
+        ks = "ks_" + System.currentTimeMillis();
+        cfm = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl), ks).build();
+        TableMetadata cfm2 = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl2), ks).build();
+        SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm, cfm2);
+        cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id);
+        cfs2 = Schema.instance.getColumnFamilyStoreInstance(cfm2.id);
+    }
+
+    void makeSSTables(int num)
+    {
+        makeSSTables(num, cfs, 2);
+    }
+
+    void makeSSTables(int num, ColumnFamilyStore cfs, int rowsPerSSTable)
+    {
+        for (int i = 0; i < num; i++)
+        {
+            int val = i * rowsPerSSTable;  // multiplied to prevent ranges from overlapping
+            for (int j = 0; j < rowsPerSSTable; j++)
+                QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, cfs.getTableName()), val + j, val + j);
+            cfs.forceBlockingFlush();
+        }
+        Assert.assertEquals(num, cfs.getLiveSSTables().size());
+    }
+
+    UUID prepareSession()
+    {
+        UUID sessionID = AbstractRepairTest.registerSession(cfs, true, true);
+        LocalSessionAccessor.prepareUnsafe(sessionID, AbstractRepairTest.COORDINATOR, Sets.newHashSet(AbstractRepairTest.COORDINATOR));
+        return sessionID;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a79f9f5/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionBytemanTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionBytemanTest.java b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionBytemanTest.java
new file mode 100644
index 0000000..2f2612a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionBytemanTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.repair;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.Replica;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(BMUnitRunner.class)
+public class PendingAntiCompactionBytemanTest extends AbstractPendingAntiCompactionTest
+{
+    @BMRules(rules = { @BMRule(name = "Throw exception anticompaction",
+                               targetClass = "Range$OrderedRangeContainmentChecker",
+                               targetMethod = "test",
+                               action = "throw new org.apache.cassandra.db.compaction.CompactionInterruptedException(null);")} )
+    @Test
+    public void testExceptionAnticompaction() throws InterruptedException
+    {
+        cfs.disableAutoCompaction();
+        cfs2.disableAutoCompaction();
+        ExecutorService es = Executors.newFixedThreadPool(1);
+        makeSSTables(4, cfs, 5);
+        makeSSTables(4, cfs2, 5);
+        List<Range<Token>> ranges = new ArrayList<>();
+
+        for (SSTableReader sstable : cfs.getLiveSSTables())
+        {
+            ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
+        }
+        UUID prsid = prepareSession();
+        try
+        {
+            PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Lists.newArrayList(cfs, cfs2), atEndpoint(ranges, NO_RANGES), es);
+            pac.run().get();
+            fail("PAC should throw exception when anticompaction throws exception!");
+        }
+        catch (ExecutionException e)
+        {
+            assertTrue(e.getCause() instanceof CompactionInterruptedException);
+        }
+        // and make sure nothing is marked compacting
+        assertTrue(cfs.getTracker().getCompacting().isEmpty());
+        assertTrue(cfs2.getTracker().getCompacting().isEmpty());
+        assertEquals(4, cfs.getLiveSSTables().size());
+        assertEquals(4, cfs2.getLiveSSTables().size());
+    }
+
+    private static RangesAtEndpoint atEndpoint(Collection<Range<Token>> full, Collection<Range<Token>> trans)
+    {
+        RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(local);
+        for (Range<Token> range : full)
+            builder.add(new Replica(local, range, true));
+
+        for (Range<Token> range : trans)
+            builder.add(new Replica(local, range, false));
+
+        return builder.build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a79f9f5/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
index ab32e5b..12a429b 100644
--- a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
@@ -25,93 +25,58 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import javax.annotation.Nullable;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.CompactionController;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+import org.apache.cassandra.db.compaction.CompactionIterator;
 import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.dht.ByteOrderedPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.RangesAtEndpoint;
 import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.repair.AbstractRepairTest;
 import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
-import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.WrappedRunnable;
 import org.apache.cassandra.utils.concurrent.Transactional;
 
-public class PendingAntiCompactionTest
+public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
 {
-    private static final Logger logger = LoggerFactory.getLogger(PendingAntiCompactionTest.class);
-    private static final Collection<Range<Token>> FULL_RANGE;
-    private static final Collection<Range<Token>> NO_RANGES = Collections.emptyList();
-    private static InetAddressAndPort local;
-
-    static
-    {
-        DatabaseDescriptor.daemonInitialization();
-        Token minToken = DatabaseDescriptor.getPartitioner().getMinimumToken();
-        FULL_RANGE = Collections.singleton(new Range<>(minToken, minToken));
-    }
-
-    private String ks;
-    private final String tbl = "tbl";
-    private TableMetadata cfm;
-    private ColumnFamilyStore cfs;
-
-    @BeforeClass
-    public static void setupClass() throws Throwable
-    {
-        SchemaLoader.prepareServer();
-        local = InetAddressAndPort.getByName("127.0.0.1");
-        ActiveRepairService.instance.consistent.local.start();
-    }
-
-    @Before
-    public void setup()
-    {
-        ks = "ks_" + System.currentTimeMillis();
-        cfm = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl), ks).build();
-        SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm);
-        cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id);
-
-    }
-
-    private void makeSSTables(int num)
-    {
-        for (int i = 0; i < num; i++)
-        {
-            int val = i * 2;  // multiplied to prevent ranges from overlapping
-            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), val, val);
-            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), val+1, val+1);
-            cfs.forceBlockingFlush();
-        }
-        Assert.assertEquals(num, cfs.getLiveSSTables().size());
-    }
+    static final Logger logger = LoggerFactory.getLogger(PendingAntiCompactionTest.class);
 
     private static class InstrumentedAcquisitionCallback extends PendingAntiCompaction.AcquisitionCallback
     {
@@ -130,13 +95,6 @@ public class PendingAntiCompactionTest
         }
     }
 
-    private UUID prepareSession()
-    {
-        UUID sessionID = AbstractRepairTest.registerSession(cfs, true, true);
-        LocalSessionAccessor.prepareUnsafe(sessionID, AbstractRepairTest.COORDINATOR, Sets.newHashSet(AbstractRepairTest.COORDINATOR));
-        return sessionID;
-    }
-
     /**
      * verify the pending anti compaction happy path
      */
@@ -405,6 +363,161 @@ public class PendingAntiCompactionTest
 
     }
 
+    /**
+     * Makes sure that PendingAntiCompaction fails when anticompaction throws exception
+     */
+    @Test
+    public void antiCompactionException()
+    {
+        cfs.disableAutoCompaction();
+        makeSSTables(2);
+        UUID prsid = UUID.randomUUID();
+        ListeningExecutorService es = MoreExecutors.listeningDecorator(MoreExecutors.newDirectExecutorService());
+        PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), es) {
+            @Override
+            protected AcquisitionCallback getAcquisitionCallback(UUID prsId, RangesAtEndpoint tokenRanges)
+            {
+                return new AcquisitionCallback(prsid, tokenRanges)
+                {
+                    @Override
+                    ListenableFuture<?> submitPendingAntiCompaction(AcquireResult result)
+                    {
+                        Runnable r = new WrappedRunnable()
+                        {
+                            protected void runMayThrow()
+                            {
+                                throw new CompactionInterruptedException(null);
+                            }
+                        };
+                        return es.submit(r);
+                    }
+                };
+            }
+        };
+        ListenableFuture<?> fut = pac.run();
+        try
+        {
+            fut.get();
+            Assert.fail("Should throw exception");
+        }
+        catch(Throwable t)
+        {
+        }
+    }
+
+    @Test
+    public void testBlockedAcquisition() throws ExecutionException, InterruptedException
+    {
+        cfs.disableAutoCompaction();
+        ExecutorService es = Executors.newFixedThreadPool(1);
+
+        makeSSTables(2);
+        UUID prsid = UUID.randomUUID();
+        Set<SSTableReader> sstables = cfs.getLiveSSTables();
+        List<ISSTableScanner> scanners = sstables.stream().map(SSTableReader::getScanner).collect(Collectors.toList());
+        try
+        {
+            try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
+                 CompactionController controller = new CompactionController(cfs, sstables, 0);
+                 CompactionIterator ci = CompactionManager.getAntiCompactionIterator(scanners, controller, 0, UUID.randomUUID(), CompactionManager.instance.getMetrics()))
+            {
+                // `ci` is our imaginary ongoing anticompaction which makes no progress until after 30s
+                // now we try to start a new AC, which will try to cancel all ongoing compactions
+
+                CompactionManager.instance.getMetrics().beginCompaction(ci);
+                PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), es);
+                ListenableFuture fut = pac.run();
+                try
+                {
+                    fut.get(30, TimeUnit.SECONDS);
+                }
+                catch (TimeoutException e)
+                {
+                    // expected, we wait 1 minute for compactions to get cancelled in runWithCompactionsDisabled
+                }
+                Assert.assertTrue(ci.hasNext());
+                ci.next(); // this would throw exception if the CompactionIterator was abortable
+                try
+                {
+                    fut.get();
+                    Assert.fail("We should get exception when trying to start a new anticompaction with the same sstables");
+                }
+                catch (Throwable t)
+                {
+
+                }
+            }
+        }
+        finally
+        {
+            es.shutdown();
+            ISSTableScanner.closeAllAndPropagate(scanners, null);
+        }
+    }
+
+    @Test
+    public void testUnblockedAcquisition() throws ExecutionException, InterruptedException
+    {
+        cfs.disableAutoCompaction();
+        ExecutorService es = Executors.newFixedThreadPool(1);
+        makeSSTables(2);
+        UUID prsid = prepareSession();
+        Set<SSTableReader> sstables = cfs.getLiveSSTables();
+        List<ISSTableScanner> scanners = sstables.stream().map(SSTableReader::getScanner).collect(Collectors.toList());
+        try
+        {
+            try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
+                 CompactionController controller = new CompactionController(cfs, sstables, 0);
+                 CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners, controller, 0, UUID.randomUUID()))
+            {
+                // `ci` is our imaginary ongoing anticompaction which makes no progress until after 5s
+                // now we try to start a new AC, which will try to cancel all ongoing compactions
+
+                CompactionManager.instance.getMetrics().beginCompaction(ci);
+                PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), es);
+                ListenableFuture fut = pac.run();
+                try
+                {
+                    fut.get(5, TimeUnit.SECONDS);
+                }
+                catch (TimeoutException e)
+                {
+                    // expected, we wait 1 minute for compactions to get cancelled in runWithCompactionsDisabled, but we are not iterating
+                    // CompactionIterator so the compaction is not actually cancelled
+                }
+                try
+                {
+                    Assert.assertTrue(ci.hasNext());
+                    ci.next();
+                    Assert.fail("CompactionIterator should be abortable");
+                }
+                catch (CompactionInterruptedException e)
+                {
+                    CompactionManager.instance.getMetrics().finishCompaction(ci);
+                    txn.abort();
+                    // expected
+                }
+                CountDownLatch cdl = new CountDownLatch(1);
+                Futures.addCallback(fut, new FutureCallback<Object>()
+                {
+                    public void onSuccess(@Nullable Object o)
+                    {
+                        cdl.countDown();
+                    }
+
+                    public void onFailure(Throwable throwable)
+                    {
+                    }
+                });
+                Assert.assertTrue(cdl.await(1, TimeUnit.MINUTES));
+            }
+        }
+        finally
+        {
+            es.shutdown();
+        }
+    }
+
     private static RangesAtEndpoint atEndpoint(Collection<Range<Token>> full, Collection<Range<Token>> trans)
     {
         RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(local);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org