You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2017/08/11 17:30:24 UTC

cassandra git commit: Fix race / ref leak in anticompaction

Repository: cassandra
Updated Branches:
  refs/heads/trunk f4da90aca -> e9cc805db


Fix race / ref leak in anticompaction

Patch by Blake Eggleston; Reviewed by Ariel Weisberg for CASSANDRA-13688


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e9cc805d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e9cc805d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e9cc805d

Branch: refs/heads/trunk
Commit: e9cc805db1133982c022657f8cab86cd24b3686f
Parents: f4da90a
Author: Blake Eggleston <bd...@gmail.com>
Authored: Wed Jul 12 14:47:48 2017 -0700
Committer: Blake Eggleston <bd...@gmail.com>
Committed: Fri Aug 11 10:24:17 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/AbstractCompactionTask.java   |  40 +++++
 .../db/compaction/CompactionManager.java        |  45 +++---
 .../db/compaction/PendingRepairManager.java     |  43 ++---
 .../db/compaction/AntiCompactionTest.java       |  40 +++++
 .../db/compaction/CompactionTaskTest.java       | 157 +++++++++++++++++++
 .../consistent/PendingAntiCompactionTest.java   |  23 +++
 7 files changed, 312 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9cc805d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 988f93d..7c9d79a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Fix race / ref leak in anticompaction (CASSANDRA-13688)
  * Expose tasks queue length via JMX (CASSANDRA-12758)
  * Fix race / ref leak in PendingRepairManager (CASSANDRA-13751)
  * Enable ppc64le runtime as unsupported architecture (CASSANDRA-13615)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9cc805d/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
index 430c916..c542a51 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
@@ -17,7 +17,11 @@
  */
 package org.apache.cassandra.db.compaction;
 
+import java.util.Iterator;
 import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.base.Preconditions;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
@@ -49,6 +53,42 @@ public abstract class AbstractCompactionTask extends WrappedRunnable
         Set<SSTableReader> compacting = transaction.tracker.getCompacting();
         for (SSTableReader sstable : transaction.originals())
             assert compacting.contains(sstable) : sstable.getFilename() + " is not correctly marked compacting";
+
+        validateSSTables(transaction.originals());
+    }
+
+    /**
+     * Confirm that we're not attempting to compact repaired/unrepaired/pending repair sstables together
+     */
+    private void validateSSTables(Set<SSTableReader> sstables)
+    {
+        // do not allow  to be compacted together
+        if (!sstables.isEmpty())
+        {
+            Iterator<SSTableReader> iter = sstables.iterator();
+            SSTableReader first = iter.next();
+            boolean isRepaired = first.isRepaired();
+            UUID pendingRepair = first.getPendingRepair();
+            while (iter.hasNext())
+            {
+                SSTableReader next = iter.next();
+                Preconditions.checkArgument(isRepaired == next.isRepaired(),
+                                            "Cannot compact repaired and unrepaired sstables");
+
+                if (pendingRepair == null)
+                {
+                    Preconditions.checkArgument(!next.isPendingRepair(),
+                                                "Cannot compact pending repair and non-pending repair sstables");
+                }
+                else
+                {
+                    Preconditions.checkArgument(next.isPendingRepair(),
+                                                "Cannot compact pending repair and non-pending repair sstables");
+                    Preconditions.checkArgument(pendingRepair.equals(next.getPendingRepair()),
+                                                "Cannot compact sstables from different pending repairs");
+                }
+            }
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9cc805d/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index bc372f5..c7c86c6 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -129,6 +129,12 @@ public class CompactionManager implements CompactionManagerMBean
 
     private final RateLimiter compactionRateLimiter = RateLimiter.create(Double.MAX_VALUE);
 
+    @VisibleForTesting
+    CompactionMetrics getMetrics()
+    {
+        return metrics;
+    }
+
     /**
      * Gets compaction rate limiter.
      * Rate unit is bytes per sec.
@@ -592,16 +598,19 @@ public class CompactionManager implements CompactionManagerMBean
             }
         };
 
-        ListenableFutureTask<?> task = ListenableFutureTask.create(runnable, null);
+        ListenableFuture<?> task = null;
         try
         {
-            executor.submitIfRunning(task, "pending anticompaction");
+            task = executor.submitIfRunning(runnable, "pending anticompaction");
             return task;
         }
         finally
         {
-            if (task.isCancelled())
+            if (task == null || task.isCancelled())
+            {
                 sstables.release();
+                txn.abort();
+            }
         }
     }
 
@@ -625,24 +634,24 @@ public class CompactionManager implements CompactionManagerMBean
                                       UUID pendingRepair,
                                       UUID parentRepairSession) throws InterruptedException, IOException
     {
-        ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(parentRepairSession);
-        Preconditions.checkArgument(!prs.isPreview(), "Cannot anticompact for previews");
+        try
+        {
+            ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(parentRepairSession);
+            Preconditions.checkArgument(!prs.isPreview(), "Cannot anticompact for previews");
 
-        logger.info("{} Starting anticompaction for {}.{} on {}/{} sstables", PreviewKind.NONE.logPrefix(parentRepairSession), cfs.keyspace.getName(), cfs.getTableName(), validatedForRepair.size(), cfs.getLiveSSTables().size());
-        logger.trace("{} Starting anticompaction for ranges {}", PreviewKind.NONE.logPrefix(parentRepairSession), ranges);
-        Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
-        Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
-        // we should only notify that repair status changed if it actually did:
-        Set<SSTableReader> mutatedRepairStatusToNotify = new HashSet<>();
-        Map<SSTableReader, Boolean> wasRepairedBefore = new HashMap<>();
-        for (SSTableReader sstable : sstables)
-            wasRepairedBefore.put(sstable, sstable.isRepaired());
+            logger.info("{} Starting anticompaction for {}.{} on {}/{} sstables", PreviewKind.NONE.logPrefix(parentRepairSession), cfs.keyspace.getName(), cfs.getTableName(), validatedForRepair.size(), cfs.getLiveSSTables().size());
+            logger.trace("{} Starting anticompaction for ranges {}", PreviewKind.NONE.logPrefix(parentRepairSession), ranges);
+            Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
+            Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
+            // we should only notify that repair status changed if it actually did:
+            Set<SSTableReader> mutatedRepairStatusToNotify = new HashSet<>();
+            Map<SSTableReader, Boolean> wasRepairedBefore = new HashMap<>();
+            for (SSTableReader sstable : sstables)
+                wasRepairedBefore.put(sstable, sstable.isRepaired());
 
-        Set<SSTableReader> nonAnticompacting = new HashSet<>();
+            Set<SSTableReader> nonAnticompacting = new HashSet<>();
 
-        Iterator<SSTableReader> sstableIterator = sstables.iterator();
-        try
-        {
+            Iterator<SSTableReader> sstableIterator = sstables.iterator();
             List<Range<Token>> normalizedRanges = Range.normalize(ranges);
 
             while (sstableIterator.hasNext())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9cc805d/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
index 183af7a..2786396 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
@@ -134,7 +134,7 @@ class PendingRepairManager
 
     private synchronized void removeSession(UUID sessionID)
     {
-        if (!strategies.containsKey(sessionID))
+        if (!strategies.containsKey(sessionID) || !strategies.get(sessionID).getSSTables().isEmpty())
             return;
 
         logger.debug("Removing compaction strategy for pending repair {} on  {}.{}", sessionID, cfs.metadata.keyspace, cfs.metadata.name);
@@ -424,14 +424,31 @@ class PendingRepairManager
 
         protected void runMayThrow() throws Exception
         {
-            for (SSTableReader sstable : transaction.originals())
+            boolean completed = false;
+            try
             {
-                logger.debug("Setting repairedAt to {} on {} for {}", repairedAt, sstable, sessionID);
-                sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, repairedAt, ActiveRepairService.NO_PENDING_REPAIR);
-                sstable.reloadSSTableMetadata();
+                logger.debug("Setting repairedAt to {} on {} for {}", repairedAt, transaction.originals(), sessionID);
+                for (SSTableReader sstable : transaction.originals())
+                {
+                    sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, repairedAt, ActiveRepairService.NO_PENDING_REPAIR);
+                    sstable.reloadSSTableMetadata();
+                }
+                completed = true;
+            }
+            finally
+            {
+                // even if we weren't able to rewrite all the sstable metedata, we should still move the ones that were
+                cfs.getTracker().notifySSTableRepairedStatusChanged(transaction.originals());
+
+                // we always abort because mutating metadata isn't guarded by LifecycleTransaction, so this won't roll
+                // anything back. Also, we don't want to obsolete the originals. We're only using it to prevent other
+                // compactions from marking these sstables compacting, and unmarking them when we're done
+                transaction.abort();
+                if (completed)
+                {
+                    removeSession(sessionID);
+                }
             }
-            cfs.getTracker().notifySSTableRepairedStatusChanged(transaction.originals());
-            transaction.abort();
         }
 
         public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
@@ -444,18 +461,6 @@ class PendingRepairManager
             run();
             return transaction.originals().size();
         }
-
-        public int execute(CompactionManager.CompactionExecutorStatsCollector collector)
-        {
-            try
-            {
-                return super.execute(collector);
-            }
-            finally
-            {
-                removeSession(sessionID);
-            }
-        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9cc805d/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 8db194b..5f05fab 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -29,6 +29,7 @@ import java.util.UUID;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.After;
 import org.junit.Ignore;
@@ -55,6 +56,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.concurrent.Refs;
 import org.apache.cassandra.UpdateBuilder;
+import org.apache.cassandra.utils.concurrent.Transactional;
 
 import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
 import static org.hamcrest.CoreMatchers.is;
@@ -397,4 +399,42 @@ public class AntiCompactionTest
         return ImmutableSet.copyOf(cfs.getTracker().getView().sstables(SSTableSet.LIVE, (s) -> !s.isRepaired()));
     }
 
+    /**
+     * If the parent repair session is missing, we should still clean up
+     */
+    @Test
+    public void missingParentRepairSession() throws Exception
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+        store.disableAutoCompaction();
+
+        for (int table = 0; table < 10; table++)
+        {
+            generateSStable(store,Integer.toString(table));
+        }
+        Collection<SSTableReader> sstables = getUnrepairedSSTables(store);
+        assertEquals(10, sstables.size());
+
+        Range<Token> range = new Range<Token>(new BytesToken("-1".getBytes()), new BytesToken("-10".getBytes()));
+        List<Range<Token>> ranges = Arrays.asList(range);
+
+        UUID missingRepairSession = UUIDGen.getTimeUUID();
+        try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
+             Refs<SSTableReader> refs = Refs.ref(sstables))
+        {
+            Assert.assertFalse(refs.isEmpty());
+            try
+            {
+                CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, missingRepairSession, missingRepairSession);
+                Assert.fail("expected RuntimeException");
+            }
+            catch (RuntimeException e)
+            {
+                // expected
+            }
+            Assert.assertEquals(Transactional.AbstractTransactional.State.ABORTED, txn.state());
+            Assert.assertTrue(refs.isEmpty());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9cc805d/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java
new file mode 100644
index 0000000..4640248
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+public class CompactionTaskTest
+{
+    private static TableMetadata cfm;
+    private static ColumnFamilyStore cfs;
+
+    @BeforeClass
+    public static void setUpClass() throws Exception
+    {
+        SchemaLoader.prepareServer();
+        cfm = CreateTableStatement.parse("CREATE TABLE tbl (k INT PRIMARY KEY, v INT)", "coordinatorsessiontest").build();
+        SchemaLoader.createKeyspace("ks", KeyspaceParams.simple(1), cfm);
+        cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id);
+    }
+
+    @Before
+    public void setUp() throws Exception
+    {
+        cfs.getCompactionStrategyManager().enable();
+        cfs.truncateBlocking();
+    }
+
+    @Test
+    public void compactionInterruption() throws Exception
+    {
+        cfs.getCompactionStrategyManager().disable();
+        QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, v) VALUES (1, 1);");
+        QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, v) VALUES (2, 2);");
+        cfs.forceBlockingFlush();
+        QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, v) VALUES (3, 3);");
+        QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, v) VALUES (4, 4);");
+        cfs.forceBlockingFlush();
+        Set<SSTableReader> sstables = cfs.getLiveSSTables();
+
+        Assert.assertEquals(2, sstables.size());
+
+        LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
+        Assert.assertNotNull(txn);
+        CompactionTask task = new CompactionTask(cfs, txn, 0);
+        Assert.assertNotNull(task);
+        cfs.getCompactionStrategyManager().pause();
+        try
+        {
+            task.execute(CompactionManager.instance.getMetrics());
+            Assert.fail("Expected CompactionInterruptedException");
+        }
+        catch (CompactionInterruptedException e)
+        {
+            // expected
+        }
+        Assert.assertEquals(Transactional.AbstractTransactional.State.ABORTED, txn.state());
+    }
+
+    private static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair) throws IOException
+    {
+        sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, repairedAt, pendingRepair);
+        sstable.reloadSSTableMetadata();
+    }
+
+    /**
+     * If we try to create a compaction task that will mix
+     * repaired/unrepaired/pending repair sstables, it should fail
+     */
+    @Test
+    public void mixedSSTableFailure() throws Exception
+    {
+        cfs.getCompactionStrategyManager().disable();
+        QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, v) VALUES (1, 1);");
+        cfs.forceBlockingFlush();
+        QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, v) VALUES (2, 2);");
+        cfs.forceBlockingFlush();
+        QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, v) VALUES (3, 3);");
+        cfs.forceBlockingFlush();
+        QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, v) VALUES (4, 4);");
+        cfs.forceBlockingFlush();
+
+        List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
+        Assert.assertEquals(4, sstables.size());
+
+        SSTableReader unrepaired = sstables.get(0);
+        SSTableReader repaired = sstables.get(1);
+        SSTableReader pending1 = sstables.get(2);
+        SSTableReader pending2 = sstables.get(3);
+
+        mutateRepaired(repaired, FBUtilities.nowInSeconds(), ActiveRepairService.NO_PENDING_REPAIR);
+        mutateRepaired(pending1, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID());
+        mutateRepaired(pending2, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID());
+
+        LifecycleTransaction txn = null;
+        List<SSTableReader> toCompact = new ArrayList<>(sstables);
+        for (int i=0; i<sstables.size(); i++)
+        {
+            try
+            {
+                txn = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
+                Assert.assertNotNull(txn);
+                CompactionTask task = new CompactionTask(cfs, txn, 0);
+                Assert.fail("Expected IllegalArgumentException");
+            }
+            catch (IllegalArgumentException e)
+            {
+                // expected
+            }
+            finally
+            {
+                if (txn != null)
+                    txn.abort();
+            }
+            Collections.rotate(toCompact, 1);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9cc805d/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
index 3119453..5aeab3e 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.Schema;
@@ -339,4 +340,26 @@ public class PendingAntiCompactionTest
         Assert.assertTrue(cb.submittedCompactions.contains(cfm.id));
         Assert.assertFalse(cb.submittedCompactions.contains(cfs2.metadata.id));
     }
+
+
+    @Test
+    public void singleAnticompaction() throws Exception
+    {
+        cfs.disableAutoCompaction();
+        makeSSTables(2);
+
+        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
+        PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
+        UUID sessionID = UUIDGen.getTimeUUID();
+        ActiveRepairService.instance.registerParentRepairSession(sessionID,
+                                                                 InetAddress.getByName("127.0.0.1"),
+                                                                 Lists.newArrayList(cfs),
+                                                                 FULL_RANGE,
+                                                                 true,0,
+                                                                 true,
+                                                                 PreviewKind.NONE);
+        CompactionManager.instance.performAnticompaction(result.cfs, FULL_RANGE, result.refs, result.txn,
+                                                         ActiveRepairService.UNREPAIRED_SSTABLE, sessionID, sessionID);
+
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org