You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2017/02/07 03:11:44 UTC

[3/7] cassandra git commit: Fix consistency of incrementally repaired data

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index e7c6640..89e1954 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -18,29 +18,31 @@
 package org.apache.cassandra.service;
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
 import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.AbstractFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
+
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -59,18 +61,17 @@ import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.repair.AnticompactionTask;
 import org.apache.cassandra.repair.RepairJobDesc;
 import org.apache.cassandra.repair.RepairParallelism;
 import org.apache.cassandra.repair.RepairSession;
+import org.apache.cassandra.repair.consistent.CoordinatorSessions;
+import org.apache.cassandra.repair.consistent.LocalSessions;
 import org.apache.cassandra.repair.messages.*;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.utils.CassandraVersion;
 import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
-import org.apache.cassandra.utils.concurrent.Ref;
-import org.apache.cassandra.utils.concurrent.Refs;
 
 /**
  * ActiveRepairService is the starting point for manual "active" repairs.
@@ -86,7 +87,7 @@ import org.apache.cassandra.utils.concurrent.Refs;
  * The creation of a repair session is done through the submitRepairSession that
  * returns a future on the completion of that session.
  */
-public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
+public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener, ActiveRepairServiceMBean
 {
     /**
      * @deprecated this statuses are from the previous JMX notification service,
@@ -98,6 +99,15 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
     {
         STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED
     }
+
+    public class ConsistentSessions
+    {
+        public final LocalSessions local = new LocalSessions();
+        public final CoordinatorSessions coordinated = new CoordinatorSessions();
+    }
+
+    public final ConsistentSessions consistent = new ConsistentSessions();
+
     private boolean registeredForEndpointChanges = false;
 
     public static CassandraVersion SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION = new CassandraVersion("2.2.1");
@@ -107,6 +117,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
     public static final ActiveRepairService instance = new ActiveRepairService(FailureDetector.instance, Gossiper.instance);
 
     public static final long UNREPAIRED_SSTABLE = 0;
+    public static final UUID NO_PENDING_REPAIR = null;
 
     /**
      * A map of active coordinator session.
@@ -122,6 +133,37 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
     {
         this.failureDetector = failureDetector;
         this.gossiper = gossiper;
+
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        try
+        {
+            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void start()
+    {
+        consistent.local.start();
+        ScheduledExecutors.optionalTasks.scheduleAtFixedRate(consistent.local::cleanup, 0,
+                                                             LocalSessions.CLEANUP_INTERVAL,
+                                                             TimeUnit.SECONDS);
+    }
+
+    @Override
+    public List<Map<String, String>> getSessions(boolean all)
+    {
+        return consistent.local.sessionInfo(all);
+    }
+
+    @Override
+    public void failSession(String session, boolean force)
+    {
+        UUID sessionID = UUID.fromString(session);
+        consistent.local.cancelSession(sessionID, force);
     }
 
     /**
@@ -135,6 +177,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
                                              RepairParallelism parallelismDegree,
                                              Set<InetAddress> endpoints,
                                              long repairedAt,
+                                             boolean isConsistent,
                                              boolean pullRepair,
                                              ListeningExecutorService executor,
                                              String... cfnames)
@@ -145,7 +188,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         if (cfnames.length == 0)
             return null;
 
-        final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, repairedAt, pullRepair, cfnames);
+        final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, repairedAt, isConsistent, pullRepair, cfnames);
 
         sessions.put(session.getId(), session);
         // register listeners
@@ -283,8 +326,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
 
     public synchronized UUID prepareForRepair(UUID parentRepairSession, InetAddress coordinator, Set<InetAddress> endpoints, RepairOption options, List<ColumnFamilyStore> columnFamilyStores)
     {
-        long timestamp = Clock.instance.currentTimeMillis();
-        registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), timestamp, options.isGlobal());
+        // we only want repairedAt for incremental repairs, for non incremental repairs, UNREPAIRED_SSTABLE will preserve repairedAt on streamed sstables
+        long repairedAt = options.isIncremental() ? Clock.instance.currentTimeMillis() : ActiveRepairService.UNREPAIRED_SSTABLE;
+        registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal());
         final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size());
         final AtomicBoolean status = new AtomicBoolean(true);
         final Set<String> failedNodes = Collections.synchronizedSet(new HashSet<String>());
@@ -316,7 +360,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         {
             if (FailureDetector.instance.isAlive(neighbour))
             {
-                PrepareMessage message = new PrepareMessage(parentRepairSession, tableIds, options.getRanges(), options.isIncremental(), timestamp, options.isGlobal());
+                PrepareMessage message = new PrepareMessage(parentRepairSession, tableIds, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal());
                 MessageOut<RepairMessage> msg = message.createMessage();
                 MessagingService.instance().sendRR(msg, neighbour, callback, TimeUnit.HOURS.toMillis(1), true);
             }
@@ -346,8 +390,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         return parentRepairSession;
     }
 
-    public void registerParentRepairSession(UUID parentRepairSession, InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long timestamp, boolean isGlobal)
+    public void registerParentRepairSession(UUID parentRepairSession, InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal)
     {
+        assert isIncremental || repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE;
         if (!registeredForEndpointChanges)
         {
             Gossiper.instance.register(this);
@@ -355,41 +400,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
             registeredForEndpointChanges = true;
         }
 
-        parentRepairSessions.put(parentRepairSession, new ParentRepairSession(coordinator, columnFamilyStores, ranges, isIncremental, timestamp, isGlobal));
-    }
-
-    public Set<SSTableReader> currentlyRepairing(TableId tableId, UUID parentRepairSession)
-    {
-        Set<SSTableReader> repairing = new HashSet<>();
-        for (Map.Entry<UUID, ParentRepairSession> entry : parentRepairSessions.entrySet())
-        {
-            Collection<SSTableReader> sstables = entry.getValue().getActiveSSTables(tableId);
-            if (sstables != null && !entry.getKey().equals(parentRepairSession))
-                repairing.addAll(sstables);
-        }
-        return repairing;
-    }
-
-    /**
-     * Run final process of repair.
-     * This removes all resources held by parent repair session, after performing anti compaction if necessary.
-     *
-     * @param parentSession Parent session ID
-     * @param neighbors Repair participants (not including self)
-     * @param successfulRanges Ranges that repaired successfully
-     */
-    public synchronized ListenableFuture finishParentSession(UUID parentSession, Set<InetAddress> neighbors, Collection<Range<Token>> successfulRanges)
-    {
-        List<ListenableFuture<?>> tasks = new ArrayList<>(neighbors.size() + 1);
-        for (InetAddress neighbor : neighbors)
-        {
-            AnticompactionTask task = new AnticompactionTask(parentSession, neighbor, successfulRanges);
-            registerOnFdAndGossip(task);
-            tasks.add(task);
-            task.run(); // 'run' is just sending message
-        }
-        tasks.add(doAntiCompaction(parentSession, successfulRanges));
-        return Futures.successfulAsList(tasks);
+        parentRepairSessions.put(parentRepairSession, new ParentRepairSession(coordinator, columnFamilyStores, ranges, isIncremental, repairedAt, isGlobal));
     }
 
     public ParentRepairSession getParentRepairSession(UUID parentSessionId)
@@ -422,53 +433,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         return parentRepairSessions.remove(parentSessionId);
     }
 
-    /**
-     * Submit anti-compaction jobs to CompactionManager.
-     * When all jobs are done, parent repair session is removed whether those are suceeded or not.
-     *
-     * @param parentRepairSession parent repair session ID
-     * @return Future result of all anti-compaction jobs.
-     */
-    @SuppressWarnings("resource")
-    public ListenableFuture<List<Object>> doAntiCompaction(final UUID parentRepairSession, Collection<Range<Token>> successfulRanges)
-    {
-        assert parentRepairSession != null;
-        ParentRepairSession prs = getParentRepairSession(parentRepairSession);
-        //A repair will be marked as not global if it is a subrange repair to avoid many small anti-compactions
-        //in addition to other scenarios such as repairs not involving all DCs or hosts
-        if (!prs.isGlobal)
-        {
-            logger.info("[repair #{}] Not a global repair, will not do anticompaction", parentRepairSession);
-            removeParentRepairSession(parentRepairSession);
-            return Futures.immediateFuture(Collections.emptyList());
-        }
-        assert prs.ranges.containsAll(successfulRanges) : "Trying to perform anticompaction on unknown ranges";
-
-        List<ListenableFuture<?>> futures = new ArrayList<>();
-        // if we don't have successful repair ranges, then just skip anticompaction
-        if (!successfulRanges.isEmpty())
-        {
-            for (Map.Entry<TableId, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet())
-            {
-                Refs<SSTableReader> sstables = prs.getActiveRepairedSSTableRefsForAntiCompaction(columnFamilyStoreEntry.getKey(), parentRepairSession);
-                ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue();
-                futures.add(CompactionManager.instance.submitAntiCompaction(cfs, successfulRanges, sstables, prs.repairedAt, parentRepairSession));
-            }
-        }
-
-        ListenableFuture<List<Object>> allAntiCompactionResults = Futures.successfulAsList(futures);
-        allAntiCompactionResults.addListener(new Runnable()
-        {
-            @Override
-            public void run()
-            {
-                removeParentRepairSession(parentRepairSession);
-            }
-        }, MoreExecutors.directExecutor());
-
-        return allAntiCompactionResults;
-    }
-
     public void handleMessage(InetAddress endpoint, RepairMessage message)
     {
         RepairJobDesc desc = message.desc;
@@ -495,27 +459,15 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
      * We keep a ParentRepairSession around for the duration of the entire repair, for example, on a 256 token vnode rf=3 cluster
      * we would have 768 RepairSession but only one ParentRepairSession. We use the PRS to avoid anticompacting the sstables
      * 768 times, instead we take all repaired ranges at the end of the repair and anticompact once.
-     *
-     * We do an optimistic marking of sstables - when we start an incremental repair we mark all unrepaired sstables as
-     * repairing (@see markSSTablesRepairing), then while the repair is ongoing compactions might remove those sstables,
-     * and when it is time for anticompaction we will only anticompact the sstables that are still on disk.
-     *
-     * Note that validation and streaming do not care about which sstables we have marked as repairing - they operate on
-     * all unrepaired sstables (if it is incremental), otherwise we would not get a correct repair.
      */
     public static class ParentRepairSession
     {
         private final Map<TableId, ColumnFamilyStore> columnFamilyStores = new HashMap<>();
         private final Collection<Range<Token>> ranges;
-        public final Map<TableId, Set<String>> sstableMap = new HashMap<>();
         public final boolean isIncremental;
         public final boolean isGlobal;
         public final long repairedAt;
         public final InetAddress coordinator;
-        /**
-         * Indicates whether we have marked sstables as repairing. Can only be done once per table per ParentRepairSession
-         */
-        private final Set<TableId> marked = new HashSet<>();
 
         public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal)
         {
@@ -523,7 +475,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
             for (ColumnFamilyStore cfs : columnFamilyStores)
             {
                 this.columnFamilyStores.put(cfs.metadata.id, cfs);
-                sstableMap.put(cfs.metadata.id, new HashSet<>());
             }
             this.ranges = ranges;
             this.repairedAt = repairedAt;
@@ -531,97 +482,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
             this.isGlobal = isGlobal;
         }
 
-        /**
-         * Mark sstables repairing - either all sstables or only the unrepaired ones depending on
-         *
-         * whether this is an incremental or full repair
-         *
-         * @param tableId the table
-         * @param parentSessionId the parent repair session id, used to make sure we don't start multiple repairs over the same sstables
-         */
-        public synchronized void markSSTablesRepairing(TableId tableId, UUID parentSessionId)
-        {
-            if (!marked.contains(tableId))
-            {
-                List<SSTableReader> sstables = columnFamilyStores.get(tableId).select(View.select(SSTableSet.CANONICAL, (s) -> !isIncremental || !s.isRepaired())).sstables;
-                Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(tableId, parentSessionId);
-                if (!Sets.intersection(currentlyRepairing, Sets.newHashSet(sstables)).isEmpty())
-                {
-                    logger.error("Cannot start multiple repair sessions over the same sstables");
-                    throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
-                }
-                addSSTables(tableId, sstables);
-                marked.add(tableId);
-            }
-        }
-
-        /**
-         * Get the still active sstables we should run anticompaction on
-         *
-         * note that validation and streaming do not call this method - they have to work on the actual active sstables on the node, we only call this
-         * to know which sstables are still there that were there when we started the repair
-         *
-         * @param tableId
-         * @param parentSessionId for checking if there exists a snapshot for this repair
-         * @return
-         */
-        @SuppressWarnings("resource")
-        public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefsForAntiCompaction(TableId tableId, UUID parentSessionId)
-        {
-            assert marked.contains(tableId);
-            if (!columnFamilyStores.containsKey(tableId))
-                throw new RuntimeException("Not possible to get sstables for anticompaction for " + tableId);
-            boolean isSnapshotRepair = columnFamilyStores.get(tableId).snapshotExists(parentSessionId.toString());
-            ImmutableMap.Builder<SSTableReader, Ref<SSTableReader>> references = ImmutableMap.builder();
-            Iterable<SSTableReader> sstables = isSnapshotRepair ? getSSTablesForSnapshotRepair(tableId, parentSessionId) : getActiveSSTables(tableId);
-            // we check this above - if columnFamilyStores contains the tableId sstables will not be null
-            assert sstables != null;
-            for (SSTableReader sstable : sstables)
-            {
-                Ref<SSTableReader> ref = sstable.tryRef();
-                if (ref == null)
-                    sstableMap.get(tableId).remove(sstable.getFilename());
-                else
-                    references.put(sstable, ref);
-            }
-            return new Refs<>(references.build());
-        }
-
-        /**
-         * If we are running a snapshot repair we need to find the 'real' sstables when we start anticompaction
-         *
-         * We use the generation of the sstables as identifiers instead of the file name to avoid having to parse out the
-         * actual filename.
-         *
-         * @param tableId
-         * @param parentSessionId
-         * @return
-         */
-        private Set<SSTableReader> getSSTablesForSnapshotRepair(TableId tableId, UUID parentSessionId)
-        {
-            Set<SSTableReader> activeSSTables = new HashSet<>();
-            ColumnFamilyStore cfs = columnFamilyStores.get(tableId);
-            if (cfs == null)
-                return null;
-
-            Set<Integer> snapshotGenerations = new HashSet<>();
-            try (Refs<SSTableReader> snapshottedSSTables = cfs.getSnapshotSSTableReader(parentSessionId.toString()))
-            {
-                for (SSTableReader sstable : snapshottedSSTables)
-                {
-                    snapshotGenerations.add(sstable.descriptor.generation);
-                }
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-            for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
-                if (snapshotGenerations.contains(sstable.descriptor.generation))
-                    activeSSTables.add(sstable);
-            return activeSSTables;
-        }
-
         public synchronized void maybeSnapshot(TableId tableId, UUID parentSessionId)
         {
             String snapshotName = parentSessionId.toString();
@@ -637,75 +497,29 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
                                new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges);
                     }
                 }, true, false);
-
-                if (isAlreadyRepairing(tableId, parentSessionId, snapshottedSSTables))
-                {
-                    columnFamilyStores.get(tableId).clearSnapshot(snapshotName);
-                    logger.error("Cannot start multiple repair sessions over the same sstables");
-                    throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
-                }
-                addSSTables(tableId, snapshottedSSTables);
-                marked.add(tableId);
             }
         }
 
-
-        /**
-         * Compares other repairing sstables *generation* to the ones we just snapshotted
-         *
-         * we compare generations since the sstables have different paths due to snapshot names
-         *
-         * @param tableId id of table store
-         * @param parentSessionId parent repair session
-         * @param sstables the newly snapshotted sstables
-         * @return
-         */
-        private boolean isAlreadyRepairing(TableId tableId, UUID parentSessionId, Collection<SSTableReader> sstables)
+        public long getRepairedAt()
         {
-            Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(tableId, parentSessionId);
-            Set<Integer> currentlyRepairingGenerations = new HashSet<>();
-            Set<Integer> newRepairingGenerations = new HashSet<>();
-            for (SSTableReader sstable : currentlyRepairing)
-                currentlyRepairingGenerations.add(sstable.descriptor.generation);
-            for (SSTableReader sstable : sstables)
-                newRepairingGenerations.add(sstable.descriptor.generation);
-
-            return !Sets.intersection(currentlyRepairingGenerations, newRepairingGenerations).isEmpty();
+            if (isGlobal)
+                return repairedAt;
+            return ActiveRepairService.UNREPAIRED_SSTABLE;
         }
 
-        private Set<SSTableReader> getActiveSSTables(TableId tableId)
+        public Collection<ColumnFamilyStore> getColumnFamilyStores()
         {
-            if (!columnFamilyStores.containsKey(tableId))
-                return null;
-
-            Set<String> repairedSSTables = sstableMap.get(tableId);
-            Set<SSTableReader> activeSSTables = new HashSet<>();
-            Set<String> activeSSTableNames = new HashSet<>();
-            ColumnFamilyStore cfs = columnFamilyStores.get(tableId);
-            for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
-            {
-                if (repairedSSTables.contains(sstable.getFilename()))
-                {
-                    activeSSTables.add(sstable);
-                    activeSSTableNames.add(sstable.getFilename());
-                }
-            }
-            sstableMap.put(tableId, activeSSTableNames);
-            return activeSSTables;
+            return ImmutableSet.<ColumnFamilyStore>builder().addAll(columnFamilyStores.values()).build();
         }
 
-        private void addSSTables(TableId tableId, Collection<SSTableReader> sstables)
+        public Set<TableId> getTableIds()
         {
-            for (SSTableReader sstable : sstables)
-                sstableMap.get(tableId).add(sstable.getFilename());
+            return ImmutableSet.copyOf(Iterables.transform(getColumnFamilyStores(), cfs -> cfs.metadata.id));
         }
 
-
-        public long getRepairedAt()
+        public Collection<Range<Token>> getRanges()
         {
-            if (isGlobal)
-                return repairedAt;
-            return ActiveRepairService.UNREPAIRED_SSTABLE;
+            return ImmutableSet.copyOf(ranges);
         }
 
         @Override
@@ -714,7 +528,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
             return "ParentRepairSession{" +
                     "columnFamilyStores=" + columnFamilyStores +
                     ", ranges=" + ranges +
-                    ", sstableMap=" + sstableMap +
                     ", repairedAt=" + repairedAt +
                     '}';
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java b/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
new file mode 100644
index 0000000..53b0acb
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.util.List;
+import java.util.Map;
+
+public interface ActiveRepairServiceMBean
+{
+    public static final String MBEAN_NAME = "org.apache.cassandra.db:type=RepairService";
+
+    public List<Map<String, String>> getSessions(boolean all);
+    public void failSession(String session, boolean force);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index fe84082..03156ae 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -322,6 +322,7 @@ public class CassandraDaemon
         }
 
         SystemKeyspace.finishStartup();
+        ActiveRepairService.instance.start();
 
         // Prepared statements
         QueryProcessor.preloadPreparedStatement();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 556748d..10c5827 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -198,7 +198,8 @@ public class ConnectionHandler
                     session.description(),
                     !isOutgoingHandler,
                     session.keepSSTableLevel(),
-                    session.isIncremental());
+                    session.isIncremental(),
+                    session.getPendingRepair());
             ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
             DataOutputStreamPlus out = getWriteChannel(socket);
             out.write(messageBuf);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index 2cb75f7..81d0498 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -49,15 +49,17 @@ public class StreamCoordinator
     private final boolean keepSSTableLevel;
     private final boolean isIncremental;
     private Iterator<StreamSession> sessionsToConnect = null;
+    private final UUID pendingRepair;
 
     public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, boolean isIncremental,
-                             StreamConnectionFactory factory, boolean connectSequentially)
+                             StreamConnectionFactory factory, boolean connectSequentially, UUID pendingRepair)
     {
         this.connectionsPerHost = connectionsPerHost;
         this.factory = factory;
         this.keepSSTableLevel = keepSSTableLevel;
         this.isIncremental = isIncremental;
         this.connectSequentially = connectSequentially;
+        this.pendingRepair = pendingRepair;
     }
 
     public void setConnectionFactory(StreamConnectionFactory factory)
@@ -288,7 +290,7 @@ public class StreamCoordinator
             // create
             if (streamSessions.size() < connectionsPerHost)
             {
-                StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel, isIncremental);
+                StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel, isIncremental, pendingRepair);
                 streamSessions.put(++lastReturned, session);
                 return session;
             }
@@ -320,7 +322,7 @@ public class StreamCoordinator
             StreamSession session = streamSessions.get(id);
             if (session == null)
             {
-                session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel, isIncremental);
+                session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel, isIncremental, pendingRepair);
                 streamSessions.put(id, session);
             }
             return session;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index e9d43cb..5526da8 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -48,21 +48,21 @@ public class StreamPlan
      */
     public StreamPlan(String description)
     {
-        this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false, false, false);
+        this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false, false, false, null);
     }
 
     public StreamPlan(String description, boolean keepSSTableLevels, boolean connectSequentially)
     {
-        this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels, false, connectSequentially);
+        this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels, false, connectSequentially, null);
     }
 
     public StreamPlan(String description, long repairedAt, int connectionsPerHost, boolean keepSSTableLevels,
-                      boolean isIncremental, boolean connectSequentially)
+                      boolean isIncremental, boolean connectSequentially, UUID pendingRepair)
     {
         this.description = description;
         this.repairedAt = repairedAt;
         this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, isIncremental, new DefaultConnectionFactory(),
-                                                 connectSequentially);
+                                                 connectSequentially, pendingRepair);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index baf5ec9..fdc2ae2 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -21,6 +21,7 @@ import java.io.*;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.util.Collection;
+import java.util.UUID;
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.UnmodifiableIterator;
@@ -95,16 +96,16 @@ public class StreamReader
             throw new IOException("CF " + tableId + " was dropped during streaming");
         }
 
-        logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.",
+        logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}', pendingRepair = '{}'.",
                      session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
-                     cfs.getTableName());
+                     cfs.getTableName(), session.getPendingRepair());
 
         TrackedInputStream in = new TrackedInputStream(new LZFInputStream(Channels.newInputStream(channel)));
         StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata()));
         SSTableMultiWriter writer = null;
         try
         {
-            writer = createWriter(cfs, totalSize, repairedAt, format);
+            writer = createWriter(cfs, totalSize, repairedAt, session.getPendingRepair(), format);
             while (in.getBytesRead() < totalSize)
             {
                 writePartition(deserializer, writer);
@@ -138,13 +139,13 @@ public class StreamReader
         return header != null? header.toHeader(metadata) : null; //pre-3.0 sstable have no SerializationHeader
     }
 
-    protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, SSTableFormat.Type format) throws IOException
+    protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, UUID pendingRepair, SSTableFormat.Type format) throws IOException
     {
         Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
         if (localDir == null)
             throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize)));
 
-        RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, format, sstableLevel, totalSize, session.getTransaction(tableId), getHeader(cfs.metadata()));
+        RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, pendingRepair, format, sstableLevel, totalSize, session.getTransaction(tableId), getHeader(cfs.metadata()));
         StreamHook.instance.reportIncomingFile(cfs, writer, session, fileSeqNum);
         return writer;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 61a1c8c..6d0c03b 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -71,10 +71,9 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
             set(getCurrentState());
     }
 
-    private StreamResultFuture(UUID planId, String description, boolean keepSSTableLevels, boolean isIncremental)
+    private StreamResultFuture(UUID planId, String description, boolean keepSSTableLevels, boolean isIncremental, UUID pendingRepair)
     {
-        this(planId, description, new StreamCoordinator(0, keepSSTableLevels, isIncremental,
-                                                        new DefaultConnectionFactory(), false));
+        this(planId, description, new StreamCoordinator(0, keepSSTableLevels, isIncremental, new DefaultConnectionFactory(), false, pendingRepair));
     }
 
     static StreamResultFuture init(UUID planId, String description, Collection<StreamEventHandler> listeners,
@@ -108,7 +107,8 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
                                                                     boolean isForOutgoing,
                                                                     int version,
                                                                     boolean keepSSTableLevel,
-                                                                    boolean isIncremental) throws IOException
+                                                                    boolean isIncremental,
+                                                                    UUID pendingRepair) throws IOException
     {
         StreamResultFuture future = StreamManager.instance.getReceivingStream(planId);
         if (future == null)
@@ -116,7 +116,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
             logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, description);
 
             // The main reason we create a StreamResultFuture on the receiving side is for JMX exposure.
-            future = new StreamResultFuture(planId, description, keepSSTableLevel, isIncremental);
+            future = new StreamResultFuture(planId, description, keepSSTableLevel, isIncremental, pendingRepair);
             StreamManager.instance.registerReceiving(future);
         }
         future.attachConnection(from, sessionIndex, connection, isForOutgoing, version);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index faa05d1..b7db2b2 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -163,6 +163,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     private final boolean keepSSTableLevel;
     private final boolean isIncremental;
     private ScheduledFuture<?> keepAliveFuture = null;
+    private final UUID pendingRepair;
 
     public static enum State
     {
@@ -184,7 +185,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
      * @param connecting Actual connecting address
      * @param factory is used for establishing connection
      */
-    public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, boolean isIncremental)
+    public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, boolean isIncremental, UUID pendingRepair)
     {
         this.peer = peer;
         this.connecting = connecting;
@@ -196,6 +197,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         this.metrics = StreamingMetrics.get(connecting);
         this.keepSSTableLevel = keepSSTableLevel;
         this.isIncremental = isIncremental;
+        this.pendingRepair = pendingRepair;
     }
 
     public UUID planId()
@@ -223,6 +225,10 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         return isIncremental;
     }
 
+    public UUID getPendingRepair()
+    {
+        return pendingRepair;
+    }
 
     public LifecycleTransaction getTransaction(TableId tableId)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index a15d2ff..d8e329c 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -73,7 +73,7 @@ public class CompressedStreamReader extends StreamReader
         }
 
         logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.",
-                     session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
+                     session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), session.getPendingRepair(),
                      cfs.getTableName());
 
         CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo,
@@ -84,7 +84,7 @@ public class CompressedStreamReader extends StreamReader
         SSTableMultiWriter writer = null;
         try
         {
-            writer = createWriter(cfs, totalSize, repairedAt, format);
+            writer = createWriter(cfs, totalSize, repairedAt, session.getPendingRepair(), format);
             String filename = writer.getFilename();
             int sectionIdx = 0;
             for (Pair<Long, Long> section : sections)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index 6d807e9..3b4b512 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -49,8 +49,9 @@ public class StreamInitMessage
     public final boolean isForOutgoing;
     public final boolean keepSSTableLevel;
     public final boolean isIncremental;
+    public final UUID pendingRepair;
 
-    public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, String description, boolean isForOutgoing, boolean keepSSTableLevel, boolean isIncremental)
+    public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, String description, boolean isForOutgoing, boolean keepSSTableLevel, boolean isIncremental, UUID pendingRepair)
     {
         this.from = from;
         this.sessionIndex = sessionIndex;
@@ -59,6 +60,7 @@ public class StreamInitMessage
         this.isForOutgoing = isForOutgoing;
         this.keepSSTableLevel = keepSSTableLevel;
         this.isIncremental = isIncremental;
+        this.pendingRepair = pendingRepair;
     }
 
     /**
@@ -114,6 +116,12 @@ public class StreamInitMessage
             out.writeBoolean(message.isForOutgoing);
             out.writeBoolean(message.keepSSTableLevel);
             out.writeBoolean(message.isIncremental);
+
+            out.writeBoolean(message.pendingRepair != null);
+            if (message.pendingRepair != null)
+            {
+                UUIDSerializer.serializer.serialize(message.pendingRepair, out, MessagingService.current_version);
+            }
         }
 
         public StreamInitMessage deserialize(DataInputPlus in, int version) throws IOException
@@ -124,8 +132,10 @@ public class StreamInitMessage
             String description = in.readUTF();
             boolean sentByInitiator = in.readBoolean();
             boolean keepSSTableLevel = in.readBoolean();
+
             boolean isIncremental = in.readBoolean();
-            return new StreamInitMessage(from, sessionIndex, planId, description, sentByInitiator, keepSSTableLevel, isIncremental);
+            UUID pendingRepair = in.readBoolean() ? UUIDSerializer.serializer.deserialize(in, version) : null;
+            return new StreamInitMessage(from, sessionIndex, planId, description, sentByInitiator, keepSSTableLevel, isIncremental, pendingRepair);
         }
 
         public long serializedSize(StreamInitMessage message, int version)
@@ -137,6 +147,11 @@ public class StreamInitMessage
             size += TypeSizes.sizeof(message.isForOutgoing);
             size += TypeSizes.sizeof(message.keepSSTableLevel);
             size += TypeSizes.sizeof(message.isIncremental);
+            size += TypeSizes.sizeof(message.pendingRepair != null);
+            if (message.pendingRepair != null)
+            {
+                size += UUIDSerializer.serializer.serializedSize(message.pendingRepair, MessagingService.current_version);
+            }
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 5463255..865665c 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -74,6 +74,7 @@ import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.metrics.ThreadPoolMetrics;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.MessagingServiceMBean;
+import org.apache.cassandra.service.ActiveRepairServiceMBean;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.CacheServiceMBean;
 import org.apache.cassandra.service.GCInspector;
@@ -122,6 +123,7 @@ public class NodeProbe implements AutoCloseable
     private StorageProxyMBean spProxy;
     private HintedHandOffManagerMBean hhProxy;
     private BatchlogManagerMBean bmProxy;
+    private ActiveRepairServiceMBean arsProxy;
     private boolean failed;
 
     /**
@@ -214,6 +216,8 @@ public class NodeProbe implements AutoCloseable
             gossProxy = JMX.newMBeanProxy(mbeanServerConn, name, GossiperMBean.class);
             name = new ObjectName(BatchlogManager.MBEAN_NAME);
             bmProxy = JMX.newMBeanProxy(mbeanServerConn, name, BatchlogManagerMBean.class);
+            name = new ObjectName(ActiveRepairServiceMBean.MBEAN_NAME);
+            arsProxy = JMX.newMBeanProxy(mbeanServerConn, name, ActiveRepairServiceMBean.class);
         }
         catch (MalformedObjectNameException e)
         {
@@ -1511,6 +1515,11 @@ public class NodeProbe implements AutoCloseable
             throw new RuntimeException(e);
         }
     }
+
+    public ActiveRepairServiceMBean getRepairServiceProxy()
+    {
+        return arsProxy;
+    }
 }
 
 class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 0c55f76..6812a27 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -99,6 +99,7 @@ public class NodeTool
                 RemoveNode.class,
                 Assassinate.class,
                 Repair.class,
+                RepairAdmin.class,
                 ReplayBatchlog.class,
                 SetCacheCapacity.class,
                 SetHintedHandoffThrottleInKB.class,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
index 019e053..63c7f96 100644
--- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
+++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
@@ -135,6 +135,7 @@ public class SSTableMetadataViewer
                     out.printf("Estimated droppable tombstones: %s%n", stats.getEstimatedDroppableTombstoneRatio((int) (System.currentTimeMillis() / 1000) - gcgs));
                     out.printf("SSTable Level: %d%n", stats.sstableLevel);
                     out.printf("Repaired at: %d%n", stats.repairedAt);
+                    out.printf("Pending repair: %s%n", stats.pendingRepair);
                     out.printf("Replay positions covered: %s%n", stats.commitLogIntervals);
                     out.printf("totalColumnsSet: %s%n", stats.totalColumnsSet);
                     out.printf("totalRows: %s%n", stats.totalRows);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java b/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java
index a130177..8056ff8 100644
--- a/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java
+++ b/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java
@@ -27,7 +27,6 @@ import java.util.List;
 
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.service.ActiveRepairService;
 
 /**
  * Set repairedAt status on a given set of sstables.
@@ -89,11 +88,11 @@ public class SSTableRepairedAtSetter
             if (setIsRepaired)
             {
                 FileTime f = Files.getLastModifiedTime(new File(descriptor.filenameFor(Component.DATA)).toPath());
-                descriptor.getMetadataSerializer().mutateRepairedAt(descriptor, f.toMillis());
+                descriptor.getMetadataSerializer().mutateRepaired(descriptor, f.toMillis(), null);
             }
             else
             {
-                descriptor.getMetadataSerializer().mutateRepairedAt(descriptor, ActiveRepairService.UNREPAIRED_SSTABLE);
+                descriptor.getMetadataSerializer().mutateRepaired(descriptor, 0, null);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java b/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java
new file mode 100644
index 0000000..bb201a2
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import io.airlift.command.Command;
+import io.airlift.command.Option;
+import org.apache.cassandra.repair.consistent.LocalSessionInfo;
+import org.apache.cassandra.service.ActiveRepairServiceMBean;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Supports listing and failing incremental repair sessions
+ */
+@Command(name = "repair_admin", description = "list and fail incremental repair sessions")
+public class RepairAdmin extends NodeTool.NodeToolCmd
+{
+    @Option(title = "list", name = {"-l", "--list"}, description = "list repair sessions (default behavior)")
+    private boolean list = false;
+
+    @Option(title = "all", name = {"-a", "--all"}, description = "include completed and failed sessions")
+    private boolean all = false;
+
+    @Option(title = "cancel", name = {"-x", "--cancel"}, description = "cancel an incremental repair session")
+    private String cancel = null;
+
+    @Option(title = "force", name = {"-f", "--force"}, description = "cancel repair session from a node other than the repair coordinator." +
+                                                                     " Attempting to cancel FINALIZED or FAILED sessions is an error.")
+    private boolean force = false;
+
+    private static final List<String> header = Lists.newArrayList("id",
+                                                                  "state",
+                                                                  "last activity",
+                                                                  "coordinator",
+                                                                  "participants");
+
+
+    private List<String> sessionValues(Map<String, String> session, int now)
+    {
+        int updated = Integer.parseInt(session.get(LocalSessionInfo.LAST_UPDATE));
+        return Lists.newArrayList(session.get(LocalSessionInfo.SESSION_ID),
+                                  session.get(LocalSessionInfo.STATE),
+                                  Integer.toString(now - updated) + " (s)",
+                                  session.get(LocalSessionInfo.COORDINATOR),
+                                  session.get(LocalSessionInfo.PARTICIPANTS));
+    }
+
+    private void listSessions(ActiveRepairServiceMBean repairServiceProxy)
+    {
+        Preconditions.checkArgument(cancel == null);
+        Preconditions.checkArgument(!force, "-f/--force only valid for session cancel");
+        List<Map<String, String>> sessions = repairServiceProxy.getSessions(all);
+        if (sessions.isEmpty())
+        {
+            System.out.println("no sessions");
+
+        }
+        else
+        {
+            List<List<String>> rows = new ArrayList<>();
+            rows.add(header);
+            int now = FBUtilities.nowInSeconds();
+            for (Map<String, String> session : sessions)
+            {
+                rows.add(sessionValues(session, now));
+            }
+
+            // get max col widths
+            int[] widths = new int[header.size()];
+            for (List<String> row : rows)
+            {
+                assert row.size() == widths.length;
+                for (int i = 0; i < widths.length; i++)
+                {
+                    widths[i] = Math.max(widths[i], row.get(i).length());
+                }
+            }
+
+            List<String> fmts = new ArrayList<>(widths.length);
+            for (int i = 0; i < widths.length; i++)
+            {
+                fmts.add("%-" + Integer.toString(widths[i]) + "s");
+            }
+
+
+            // print
+            for (List<String> row : rows)
+            {
+                List<String> formatted = new ArrayList<>(row.size());
+                for (int i = 0; i < widths.length; i++)
+                {
+                    formatted.add(String.format(fmts.get(i), row.get(i)));
+                }
+                System.out.println(Joiner.on(" | ").join(formatted));
+            }
+        }
+    }
+
+    private void cancelSession(ActiveRepairServiceMBean repairServiceProxy)
+    {
+        Preconditions.checkArgument(!list);
+        Preconditions.checkArgument(!all, "-a/--all only valid for session list");
+        repairServiceProxy.failSession(cancel, force);
+    }
+
+    protected void execute(NodeProbe probe)
+    {
+        if (list && cancel != null)
+        {
+            throw new RuntimeException("Can either list, or cancel sessions, not both");
+        }
+        else if (cancel != null)
+        {
+            cancelSession(probe.getRepairServiceProxy());
+        }
+        else
+        {
+            // default
+            listSessions(probe.getRepairServiceProxy());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java b/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java
index b814ea6..e01088d 100644
--- a/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java
+++ b/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java
@@ -308,7 +308,7 @@ public class RepairedDataTombstonesTest extends CQLTester
 
     public static void repair(ColumnFamilyStore cfs, SSTableReader sstable) throws IOException
     {
-        sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, 1);
+        sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 1, null);
         sstable.reloadSSTableMetadata();
         cfs.getTracker().notifySSTableRepairedStatusChanged(Collections.singleton(sstable));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index d9f6433..f0873b9 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -642,7 +642,7 @@ public class ScrubTest
     {
         SerializationHeader header = new SerializationHeader(true, metadata.get(), metadata.get().regularAndStaticColumns(), EncodingStats.NO_STATS);
         MetadataCollector collector = new MetadataCollector(metadata.get().comparator).sstableLevel(0);
-        return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, metadata, collector, header, txn), txn);
+        return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, null, metadata, collector, header, txn), txn);
     }
 
     private static class TestMultiWriter extends SimpleSSTableMultiWriter
@@ -658,10 +658,10 @@ public class ScrubTest
      */
     private static class TestWriter extends BigTableWriter
     {
-        TestWriter(Descriptor descriptor, long keyCount, long repairedAt, TableMetadataRef metadata,
+        TestWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, TableMetadataRef metadata,
                    MetadataCollector collector, SerializationHeader header, LifecycleTransaction txn)
         {
-            super(descriptor, keyCount, repairedAt, metadata, collector, header, Collections.emptySet(), txn);
+            super(descriptor, keyCount, repairedAt, pendingRepair, metadata, collector, header, Collections.emptySet(), txn);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
new file mode 100644
index 0000000..08be550
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.collect.Iterables;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.net.IMessageSink;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.consistent.AbstractConsistentSessionTest;
+import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.ActiveRepairService;
+
+@Ignore
+public class AbstractPendingRepairTest extends AbstractConsistentSessionTest
+{
+    protected String ks;
+    protected final String tbl = "tbl";
+    protected TableMetadata cfm;
+    protected ColumnFamilyStore cfs;
+    protected CompactionStrategyManager csm;
+    protected static ActiveRepairService ARS;
+
+    private int nextSSTableKey = 0;
+
+    @BeforeClass
+    public static void setupClass()
+    {
+        SchemaLoader.prepareServer();
+        ARS = ActiveRepairService.instance;
+        LocalSessionAccessor.startup();
+
+        // cutoff messaging service
+        MessagingService.instance().addMessageSink(new IMessageSink()
+        {
+            public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
+            {
+                return false;
+            }
+
+            public boolean allowIncomingMessage(MessageIn message, int id)
+            {
+                return false;
+            }
+        });
+    }
+
+    @Before
+    public void setup()
+    {
+        ks = "ks_" + System.currentTimeMillis();
+        cfm = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl), ks).build();
+        SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm);
+        cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id);
+        csm = cfs.getCompactionStrategyManager();
+        nextSSTableKey = 0;
+    }
+
+    /**
+     * creates and returns an sstable
+     *
+     * @param orphan if true, the sstable will be removed from the unrepaired strategy
+     */
+    SSTableReader makeSSTable(boolean orphan)
+    {
+        int pk = nextSSTableKey++;
+        Set<SSTableReader> pre = cfs.getLiveSSTables();
+        QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES(?, ?)", ks, tbl), pk, pk);
+        cfs.forceBlockingFlush();
+        Set<SSTableReader> post = cfs.getLiveSSTables();
+        Set<SSTableReader> diff = new HashSet<>(post);
+        diff.removeAll(pre);
+        assert diff.size() == 1;
+        SSTableReader sstable = diff.iterator().next();
+        if (orphan)
+        {
+            Iterables.any(csm.getUnrepaired(), s -> s.getSSTables().contains(sstable));
+            csm.getUnrepaired().forEach(s -> s.removeSSTable(sstable));
+        }
+        return sstable;
+    }
+
+    protected static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair)
+    {
+        try
+        {
+            sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, repairedAt, pendingRepair);
+            sstable.reloadSSTableMetadata();
+        }
+        catch (IOException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    protected static void mutateRepaired(SSTableReader sstable, long repairedAt)
+    {
+        mutateRepaired(sstable, repairedAt, ActiveRepairService.NO_PENDING_REPAIR);
+    }
+
+    protected static void mutateRepaired(SSTableReader sstable, UUID pendingRepair)
+    {
+        mutateRepaired(sstable, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 5a7bfed..41c090e 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import org.junit.BeforeClass;
 import org.junit.After;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.cassandra.schema.TableMetadata;
@@ -47,9 +48,11 @@ import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.concurrent.Refs;
 import org.apache.cassandra.UpdateBuilder;
 
+import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -57,6 +60,8 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
+import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
+
 
 public class AntiCompactionTest
 {
@@ -80,9 +85,10 @@ public class AntiCompactionTest
         store.truncateBlocking();
     }
 
-    @Test
-    public void antiCompactOne() throws Exception
+    private void antiCompactOne(long repairedAt, UUID pendingRepair) throws Exception
     {
+        assert repairedAt != UNREPAIRED_SSTABLE || pendingRepair != null;
+
         ColumnFamilyStore store = prepareColumnFamilyStore();
         Collection<SSTableReader> sstables = getUnrepairedSSTables(store);
         assertEquals(store.getLiveSSTables().size(), sstables.size());
@@ -90,15 +96,15 @@ public class AntiCompactionTest
         List<Range<Token>> ranges = Arrays.asList(range);
 
         int repairedKeys = 0;
+        int pendingKeys = 0;
         int nonRepairedKeys = 0;
         try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
              Refs<SSTableReader> refs = Refs.ref(sstables))
         {
             if (txn == null)
                 throw new IllegalStateException();
-            long repairedAt = 1000;
             UUID parentRepairSession = UUID.randomUUID();
-            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, parentRepairSession);
+            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, pendingRepair, parentRepairSession);
         }
 
         assertEquals(2, store.getLiveSSTables().size());
@@ -109,10 +115,11 @@ public class AntiCompactionTest
                 while (scanner.hasNext())
                 {
                     UnfilteredRowIterator row = scanner.next();
-                    if (sstable.isRepaired())
+                    if (sstable.isRepaired() || sstable.isPendingRepair())
                     {
                         assertTrue(range.contains(row.partitionKey().getToken()));
-                        repairedKeys++;
+                        repairedKeys += sstable.isRepaired() ? 1 : 0;
+                        pendingKeys += sstable.isPendingRepair() ? 1 : 0;
                     }
                     else
                     {
@@ -128,11 +135,25 @@ public class AntiCompactionTest
             assertEquals(1, sstable.selfRef().globalCount());
         }
         assertEquals(0, store.getTracker().getCompacting().size());
-        assertEquals(repairedKeys, 4);
+        assertEquals(repairedKeys, repairedAt != UNREPAIRED_SSTABLE ? 4 : 0);
+        assertEquals(pendingKeys, pendingRepair != NO_PENDING_REPAIR ? 4 : 0);
         assertEquals(nonRepairedKeys, 6);
     }
 
     @Test
+    public void antiCompactOneRepairedAt() throws Exception
+    {
+        antiCompactOne(1000, NO_PENDING_REPAIR);
+    }
+
+    @Test
+    public void antiCompactOnePendingRepair() throws Exception
+    {
+        antiCompactOne(UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID());
+    }
+
+    @Ignore
+    @Test
     public void antiCompactionSizeTest() throws InterruptedException, IOException
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
@@ -147,7 +168,7 @@ public class AntiCompactionTest
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
              Refs<SSTableReader> refs = Refs.ref(sstables))
         {
-            CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), refs, txn, 12345, parentRepairSession);
+            CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), refs, txn, 12345, NO_PENDING_REPAIR, parentRepairSession);
         }
         long sum = 0;
         long rows = 0;
@@ -166,7 +187,7 @@ public class AntiCompactionTest
         File dir = cfs.getDirectories().getDirectoryForNewSSTables();
         Descriptor desc = cfs.newSSTableDescriptor(dir);
 
-        try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc, 0, 0, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS)))
+        try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc, 0, 0, NO_PENDING_REPAIR, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS)))
         {
             for (int i = 0; i < count; i++)
             {
@@ -230,7 +251,7 @@ public class AntiCompactionTest
         try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
              Refs<SSTableReader> refs = Refs.ref(sstables))
         {
-            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, parentRepairSession);
+            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, NO_PENDING_REPAIR, parentRepairSession);
         }
         /*
         Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time
@@ -267,8 +288,7 @@ public class AntiCompactionTest
         assertEquals(nonRepairedKeys, 60);
     }
 
-    @Test
-    public void shouldMutateRepairedAt() throws InterruptedException, IOException
+    private void shouldMutate(long repairedAt, UUID pendingRepair) throws InterruptedException, IOException
     {
         ColumnFamilyStore store = prepareColumnFamilyStore();
         Collection<SSTableReader> sstables = getUnrepairedSSTables(store);
@@ -280,15 +300,27 @@ public class AntiCompactionTest
         try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
              Refs<SSTableReader> refs = Refs.ref(sstables))
         {
-            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, parentRepairSession);
+            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, pendingRepair, parentRepairSession);
         }
 
         assertThat(store.getLiveSSTables().size(), is(1));
-        assertThat(Iterables.get(store.getLiveSSTables(), 0).isRepaired(), is(true));
+        assertThat(Iterables.get(store.getLiveSSTables(), 0).isRepaired(), is(repairedAt != UNREPAIRED_SSTABLE));
+        assertThat(Iterables.get(store.getLiveSSTables(), 0).isPendingRepair(), is(pendingRepair != NO_PENDING_REPAIR));
         assertThat(Iterables.get(store.getLiveSSTables(), 0).selfRef().globalCount(), is(1));
         assertThat(store.getTracker().getCompacting().size(), is(0));
     }
 
+    @Test
+    public void shouldMutateRepairedAt() throws InterruptedException, IOException
+    {
+        shouldMutate(1, NO_PENDING_REPAIR);
+    }
+
+    @Test
+    public void shouldMutatePendingRepair() throws InterruptedException, IOException
+    {
+        shouldMutate(UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID());
+    }
 
     @Test
     public void shouldSkipAntiCompactionForNonIntersectingRange() throws InterruptedException, IOException
@@ -311,7 +343,7 @@ public class AntiCompactionTest
         try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
              Refs<SSTableReader> refs = Refs.ref(sstables))
         {
-            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, parentRepairSession);
+            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, NO_PENDING_REPAIR, parentRepairSession);
         }
 
         assertThat(store.getLiveSSTables().size(), is(10));
@@ -348,5 +380,4 @@ public class AntiCompactionTest
         return ImmutableSet.copyOf(cfs.getTracker().getView().sstables(SSTableSet.LIVE, (s) -> !s.isRepaired()));
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java
new file mode 100644
index 0000000..0ee85c6
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.repair.Validator;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+/**
+ * Tests correct sstables are returned from CompactionManager.getSSTablesForValidation
+ * for consistent, legacy incremental, and full repairs
+ */
+public class CompactionManagerGetSSTablesForValidationTest
+{
+    private String ks;
+    private static final String tbl = "tbl";
+    private ColumnFamilyStore cfs;
+    private static InetAddress coordinator;
+
+    private static Token MT;
+
+    private SSTableReader repaired;
+    private SSTableReader unrepaired;
+    private SSTableReader pendingRepair;
+
+    private UUID sessionID;
+    private RepairJobDesc desc;
+
+    @BeforeClass
+    public static void setupClass() throws Exception
+    {
+        SchemaLoader.prepareServer();
+        coordinator = InetAddress.getByName("10.0.0.1");
+        MT = DatabaseDescriptor.getPartitioner().getMinimumToken();
+    }
+
+    @Before
+    public void setup() throws Exception
+    {
+        ks = "ks_" + System.currentTimeMillis();
+        TableMetadata cfm = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl), ks).build();
+        SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm);
+        cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id);
+    }
+
+    private void makeSSTables()
+    {
+        for (int i=0; i<3; i++)
+        {
+            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES(?, ?)", ks, tbl), i, i);
+            cfs.forceBlockingFlush();
+        }
+        Assert.assertEquals(3, cfs.getLiveSSTables().size());
+
+    }
+
+    private void registerRepair(boolean incremental) throws Exception
+    {
+        sessionID = UUIDGen.getTimeUUID();
+        Range<Token> range = new Range<>(MT, MT);
+        ActiveRepairService.instance.registerParentRepairSession(sessionID,
+                                                                 coordinator,
+                                                                 Lists.newArrayList(cfs),
+                                                                 Sets.newHashSet(range),
+                                                                 incremental,
+                                                                 incremental ? System.currentTimeMillis() : ActiveRepairService.UNREPAIRED_SSTABLE,
+                                                                 true);
+        desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), ks, tbl, Collections.singleton(range));
+    }
+
+    private void modifySSTables() throws Exception
+    {
+        Iterator<SSTableReader> iter = cfs.getLiveSSTables().iterator();
+
+        repaired = iter.next();
+        repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, System.currentTimeMillis(), null);
+        repaired.reloadSSTableMetadata();
+
+        pendingRepair = iter.next();
+        pendingRepair.descriptor.getMetadataSerializer().mutateRepaired(pendingRepair.descriptor, ActiveRepairService.UNREPAIRED_SSTABLE, sessionID);
+        pendingRepair.reloadSSTableMetadata();
+
+        unrepaired = iter.next();
+
+        Assert.assertFalse(iter.hasNext());
+    }
+
+    @Test
+    public void consistentRepair() throws Exception
+    {
+        makeSSTables();
+        registerRepair(true);
+        modifySSTables();
+
+        // get sstables for repair
+        Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), true);
+        Set<SSTableReader> sstables = Sets.newHashSet(CompactionManager.instance.getSSTablesToValidate(cfs, validator));
+        Assert.assertNotNull(sstables);
+        Assert.assertEquals(1, sstables.size());
+        Assert.assertTrue(sstables.contains(pendingRepair));
+    }
+
+    @Test
+    public void legacyIncrementalRepair() throws Exception
+    {
+        makeSSTables();
+        registerRepair(true);
+        modifySSTables();
+
+        // get sstables for repair
+        Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), false);
+        Set<SSTableReader> sstables = Sets.newHashSet(CompactionManager.instance.getSSTablesToValidate(cfs, validator));
+        Assert.assertNotNull(sstables);
+        Assert.assertEquals(2, sstables.size());
+        Assert.assertTrue(sstables.contains(pendingRepair));
+        Assert.assertTrue(sstables.contains(unrepaired));
+    }
+
+    @Test
+    public void fullRepair() throws Exception
+    {
+        makeSSTables();
+        registerRepair(false);
+        modifySSTables();
+
+        // get sstables for repair
+        Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), false);
+        Set<SSTableReader> sstables = Sets.newHashSet(CompactionManager.instance.getSSTablesToValidate(cfs, validator));
+        Assert.assertNotNull(sstables);
+        Assert.assertEquals(3, sstables.size());
+        Assert.assertTrue(sstables.contains(pendingRepair));
+        Assert.assertTrue(sstables.contains(unrepaired));
+        Assert.assertTrue(sstables.contains(repaired));
+    }
+}