You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2020/08/25 22:29:15 UTC

[cassandra] branch trunk updated: Add addition incremental repair visibility to nodetool repair_admin

This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c343175  Add addition incremental repair visibility to nodetool repair_admin
c343175 is described below

commit c34317526fc6dbe559beb36cf44e24278656bdf2
Author: Blake Eggleston <bd...@gmail.com>
AuthorDate: Tue Aug 4 16:38:33 2020 -0700

    Add addition incremental repair visibility to nodetool repair_admin
    
    Patch by Blake Eggleston; Reviewed by Marcus Eriksson for CASSANDRA-14939
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  48 +++
 .../db/compaction/CompactionStrategyManager.java   |  25 ++
 .../db/compaction/PendingRepairManager.java        |  70 ++++-
 src/java/org/apache/cassandra/dht/Range.java       |  17 ++
 .../repair/consistent/ConsistentSession.java       |   5 +
 .../cassandra/repair/consistent/LocalSession.java  |   6 +-
 .../cassandra/repair/consistent/LocalSessions.java | 145 ++++++++-
 .../cassandra/repair/consistent/RepairedState.java | 339 +++++++++++++++++++++
 .../repair/consistent/admin/CleanupSummary.java    | 143 +++++++++
 .../repair/consistent/admin/PendingStat.java       | 143 +++++++++
 .../repair/consistent/admin/PendingStats.java      | 104 +++++++
 .../repair/consistent/admin/RepairStats.java       | 187 ++++++++++++
 .../repair/consistent/admin/SchemaArgsParser.java  | 117 +++++++
 .../cassandra/repair/messages/RepairOption.java    |  48 +--
 .../cassandra/service/ActiveRepairService.java     |  81 ++++-
 .../service/ActiveRepairServiceMBean.java          |   7 +-
 src/java/org/apache/cassandra/tools/NodeTool.java  |  10 +-
 .../cassandra/tools/nodetool/RepairAdmin.java      | 322 ++++++++++++++-----
 test/unit/org/apache/cassandra/dht/RangeTest.java  |  17 ++
 .../cassandra/repair/AbstractRepairTest.java       |   5 +
 .../repair/consistent/LocalSessionTest.java        |  22 +-
 .../repair/consistent/PendingRepairStatTest.java   | 184 +++++++++++
 .../repair/consistent/RepairStateTest.java         | 168 ++++++++++
 .../consistent/admin/SchemaArgsParserTest.java     |  91 ++++++
 25 files changed, 2198 insertions(+), 107 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index b8ace5b..123efdf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-beta2
+ * Add addition incremental repair visibility to nodetool repair_admin (CASSANDRA-14939)
  * Always access system properties and environment variables via the new CassandraRelevantProperties and CassandraRelevantEnv classes (CASSANDRA-15876)
  * Remove deprecated HintedHandOffManager (CASSANDRA-15939)
  * Prevent repair from overrunning compaction (CASSANDRA-15817)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 77a8cdd..824005b 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -75,6 +75,8 @@ import org.apache.cassandra.metrics.Sampler.Sample;
 import org.apache.cassandra.metrics.Sampler.SamplerType;
 import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.repair.TableRepairManager;
+import org.apache.cassandra.repair.consistent.admin.CleanupSummary;
+import org.apache.cassandra.repair.consistent.admin.PendingStat;
 import org.apache.cassandra.schema.*;
 import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
 import org.apache.cassandra.service.CacheService;
@@ -1555,6 +1557,52 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return data.getUncompacting();
     }
 
+    public Map<UUID, PendingStat> getPendingRepairStats()
+    {
+        Map<UUID, PendingStat.Builder> builders = new HashMap<>();
+        for (SSTableReader sstable : getLiveSSTables())
+        {
+            UUID session = sstable.getPendingRepair();
+            if (session == null)
+                continue;
+
+            if (!builders.containsKey(session))
+                builders.put(session, new PendingStat.Builder());
+
+            builders.get(session).addSSTable(sstable);
+        }
+
+        Map<UUID, PendingStat> stats = new HashMap<>();
+        for (Map.Entry<UUID, PendingStat.Builder> entry : builders.entrySet())
+        {
+            stats.put(entry.getKey(), entry.getValue().build());
+        }
+        return stats;
+    }
+
+    /**
+     * promotes (or demotes) data attached to an incremental repair session that has either completed successfully,
+     * or failed
+     *
+     * @return session ids whose data could not be released
+     */
+    public CleanupSummary releaseRepairData(Collection<UUID> sessions, boolean force)
+    {
+        if (force)
+        {
+            Predicate<SSTableReader> predicate = sst -> {
+                UUID session = sst.getPendingRepair();
+                return session != null && sessions.contains(session);
+            };
+            return runWithCompactionsDisabled(() -> compactionStrategyManager.releaseRepairData(sessions),
+                                              predicate, false, true, true);
+        }
+        else
+        {
+            return compactionStrategyManager.releaseRepairData(sessions);
+        }
+    }
+
     public boolean isFilterFullyCoveredBy(ClusteringIndexFilter filter,
                                           DataLimits limits,
                                           CachedPartition cached,
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 708faff..3a05e50 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -39,6 +39,8 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.primitives.Longs;
+
+import org.apache.cassandra.db.compaction.PendingRepairManager.CleanupTask;
 import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,6 +71,7 @@ import org.apache.cassandra.notifications.SSTableDeletingNotification;
 import org.apache.cassandra.notifications.SSTableListChangedNotification;
 import org.apache.cassandra.notifications.SSTableMetadataChanged;
 import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
+import org.apache.cassandra.repair.consistent.admin.CleanupSummary;
 import org.apache.cassandra.schema.CompactionParams;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -1223,4 +1226,26 @@ public class CompactionStrategyManager implements INotificationConsumer
         if (isTransient != sstable.isTransient())
             throw new IllegalStateException(String.format("Failed setting isTransient to %b on %s (isTransient is %b)", isTransient, sstable, sstable.isTransient()));
     }
+
+    public CleanupSummary releaseRepairData(Collection<UUID> sessions)
+    {
+        List<CleanupTask> cleanupTasks = new ArrayList<>();
+        readLock.lock();
+        try
+        {
+            for (PendingRepairManager prm : Iterables.concat(pendingRepairs.getManagers(), transientRepairs.getManagers()))
+                cleanupTasks.add(prm.releaseSessionData(sessions));
+        }
+        finally
+        {
+            readLock.unlock();
+        }
+
+        CleanupSummary summary = new CleanupSummary(cfs, Collections.emptySet(), Collections.emptySet());
+
+        for (CleanupTask task : cleanupTasks)
+            summary = CleanupSummary.add(summary, task.cleanup());
+
+        return summary;
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
index 764a4dc..aefa40b 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.repair.consistent.admin.CleanupSummary;
 import org.apache.cassandra.schema.CompactionParams;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.Pair;
@@ -263,12 +264,79 @@ class PendingRepairManager
     @SuppressWarnings("resource")
     private RepairFinishedCompactionTask getRepairFinishedCompactionTask(UUID sessionID)
     {
-        Set<SSTableReader> sstables = get(sessionID).getSSTables();
+        Preconditions.checkState(canCleanup(sessionID));
+        AbstractCompactionStrategy compactionStrategy = get(sessionID);
+        if (compactionStrategy == null)
+            return null;
+        Set<SSTableReader> sstables = compactionStrategy.getSSTables();
         long repairedAt = ActiveRepairService.instance.consistent.local.getFinalSessionRepairedAt(sessionID);
         LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
         return txn == null ? null : new RepairFinishedCompactionTask(cfs, txn, sessionID, repairedAt);
     }
 
+    public static class CleanupTask
+    {
+        private final ColumnFamilyStore cfs;
+        private final List<Pair<UUID, RepairFinishedCompactionTask>> tasks;
+
+        public CleanupTask(ColumnFamilyStore cfs, List<Pair<UUID, RepairFinishedCompactionTask>> tasks)
+        {
+            this.cfs = cfs;
+            this.tasks = tasks;
+        }
+
+        public CleanupSummary cleanup()
+        {
+            Set<UUID> successful = new HashSet<>();
+            Set<UUID> unsuccessful = new HashSet<>();
+            for (Pair<UUID, RepairFinishedCompactionTask> pair : tasks)
+            {
+                UUID session = pair.left;
+                RepairFinishedCompactionTask task = pair.right;
+
+                if (task != null)
+                {
+                    try
+                    {
+                        task.run();
+                        successful.add(session);
+                    }
+                    catch (Throwable t)
+                    {
+                        t = task.transaction.abort(t);
+                        logger.error("Failed cleaning up " + session, t);
+                        unsuccessful.add(session);
+                    }
+                }
+                else
+                {
+                    unsuccessful.add(session);
+                }
+            }
+            return new CleanupSummary(cfs, successful, unsuccessful);
+        }
+
+        public Throwable abort(Throwable accumulate)
+        {
+            for (Pair<UUID, RepairFinishedCompactionTask> pair : tasks)
+                accumulate = pair.right.transaction.abort(accumulate);
+            return accumulate;
+        }
+    }
+
+    public CleanupTask releaseSessionData(Collection<UUID> sessionIDs)
+    {
+        List<Pair<UUID, RepairFinishedCompactionTask>> tasks = new ArrayList<>(sessionIDs.size());
+        for (UUID session : sessionIDs)
+        {
+            if (hasDataForSession(session))
+            {
+                tasks.add(Pair.create(session, getRepairFinishedCompactionTask(session)));
+            }
+        }
+        return new CleanupTask(cfs, tasks);
+    }
+
     synchronized int getNumPendingRepairFinishedTasks()
     {
         int count = 0;
diff --git a/src/java/org/apache/cassandra/dht/Range.java b/src/java/org/apache/cassandra/dht/Range.java
index 80c9ef1..5b2f3d9 100644
--- a/src/java/org/apache/cassandra/dht/Range.java
+++ b/src/java/org/apache/cassandra/dht/Range.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
 import java.util.*;
 import java.util.function.Predicate;
 
+import com.google.common.collect.Iterables;
 import org.apache.commons.lang3.ObjectUtils;
 
 import org.apache.cassandra.db.PartitionPosition;
@@ -139,6 +140,11 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen
         return contains(that.left) || (!that.left.equals(that.right) && intersects(new Range<T>(that.left, that.right)));
     }
 
+    public static boolean intersects(Iterable<Range<Token>> l, Iterable<Range<Token>> r)
+    {
+        return Iterables.any(l, rng -> rng.intersects(r));
+    }
+
     @SafeVarargs
     public static <T extends RingPosition<T>> Set<Range<T>> rangeSet(Range<T> ... ranges)
     {
@@ -335,6 +341,17 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen
 
         return result;
     }
+
+    public static <T extends RingPosition<T>> Set<Range<T>> subtract(Collection<Range<T>> ranges, Collection<Range<T>> subtract)
+    {
+        Set<Range<T>> result = new HashSet<>();
+        for (Range<T> range : ranges)
+        {
+            result.addAll(range.subtractAll(subtract));
+        }
+        return result;
+    }
+
     /**
      * Calculate set of the difference ranges of given two ranges
      * (as current (A, B] and rhs is (C, D])
diff --git a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
index d9ac927..e4d8ff0 100644
--- a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
@@ -216,6 +216,11 @@ public abstract class ConsistentSession
         this.state = state;
     }
 
+    public boolean intersects(Iterable<Range<Token>> otherRanges)
+    {
+        return Iterables.any(ranges, r -> r.intersects(otherRanges));
+    }
+
     public boolean equals(Object o)
     {
         if (this == o) return true;
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSession.java b/src/java/org/apache/cassandra/repair/consistent/LocalSession.java
index e116a43..a6f81d7 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSession.java
@@ -98,14 +98,16 @@ public class LocalSession extends ConsistentSession
         private int startedAt;
         private int lastUpdate;
 
-        public void withStartedAt(int startedAt)
+        public Builder withStartedAt(int startedAt)
         {
             this.startedAt = startedAt;
+            return this;
         }
 
-        public void withLastUpdate(int lastUpdate)
+        public Builder withLastUpdate(int lastUpdate)
         {
             this.lastUpdate = lastUpdate;
+            return this;
         }
 
         void validate()
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index a35c50a..55fe2f0 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -41,10 +41,12 @@ import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.common.primitives.Ints;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -59,6 +61,9 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.repair.KeyspaceRepairManager;
+import org.apache.cassandra.repair.consistent.admin.CleanupSummary;
+import org.apache.cassandra.repair.consistent.admin.PendingStat;
+import org.apache.cassandra.repair.consistent.admin.PendingStats;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -148,6 +153,7 @@ public class LocalSessions
     private final String table = SystemKeyspace.REPAIRS;
     private boolean started = false;
     private volatile ImmutableMap<UUID, LocalSession> sessions = ImmutableMap.of();
+    private volatile ImmutableMap<TableId, RepairedState> repairedStates = ImmutableMap.of();
 
     @VisibleForTesting
     int getNumSessions()
@@ -173,16 +179,137 @@ public class LocalSessions
         return StorageService.instance.isInitialized();
     }
 
-    public List<Map<String, String>> sessionInfo(boolean all)
+    public List<Map<String, String>> sessionInfo(boolean all, Set<Range<Token>> ranges)
     {
         Iterable<LocalSession> currentSessions = sessions.values();
+
         if (!all)
-        {
             currentSessions = Iterables.filter(currentSessions, s -> !s.isCompleted());
-        }
+
+        if (!ranges.isEmpty())
+            currentSessions = Iterables.filter(currentSessions, s -> s.intersects(ranges));
+
         return Lists.newArrayList(Iterables.transform(currentSessions, LocalSessionInfo::sessionToMap));
     }
 
+    private RepairedState getRepairedState(TableId tid)
+    {
+        if (!repairedStates.containsKey(tid))
+        {
+            synchronized (this)
+            {
+                if (!repairedStates.containsKey(tid))
+                {
+                    repairedStates = ImmutableMap.<TableId, RepairedState>builder()
+                                     .putAll(repairedStates)
+                                     .put(tid, new RepairedState())
+                                     .build();
+                }
+            }
+        }
+        return Verify.verifyNotNull(repairedStates.get(tid));
+    }
+
+    private void maybeUpdateRepairedState(LocalSession session)
+    {
+        if (session.getState() != FINALIZED)
+            return;
+
+        // if the session is finalized but has repairedAt set to 0, it was
+        // a forced repair, and we shouldn't update the repaired state
+        if (session.repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
+            return;
+
+        for (TableId tid : session.tableIds)
+        {
+            RepairedState state = getRepairedState(tid);
+            state.add(session.ranges, session.repairedAt);
+        }
+    }
+
+    /**
+     * Determine if all ranges and tables covered by this session
+     * have since been re-repaired by a more recent session
+     */
+    private boolean isSuperseded(LocalSession session)
+    {
+        for (TableId tid : session.tableIds)
+        {
+            RepairedState state = repairedStates.get(tid);
+
+            if (state == null)
+                return false;
+
+            long minRepaired = state.minRepairedAt(session.ranges);
+            if (minRepaired <= session.repairedAt)
+                return false;
+        }
+
+        return true;
+    }
+
+    public RepairedState.Stats getRepairedStats(TableId tid, Collection<Range<Token>> ranges)
+    {
+        RepairedState state = repairedStates.get(tid);
+
+        if (state == null)
+            return RepairedState.Stats.EMPTY;
+
+        return state.getRepairedStats(ranges);
+    }
+
+    public PendingStats getPendingStats(TableId tid, Collection<Range<Token>> ranges)
+    {
+        ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(tid);
+        Preconditions.checkArgument(cfs != null);
+
+        PendingStat.Builder pending = new PendingStat.Builder();
+        PendingStat.Builder finalized = new PendingStat.Builder();
+        PendingStat.Builder failed = new PendingStat.Builder();
+
+        Map<UUID, PendingStat> stats = cfs.getPendingRepairStats();
+        for (Map.Entry<UUID, PendingStat> entry : stats.entrySet())
+        {
+            UUID sessionID = entry.getKey();
+            PendingStat stat = entry.getValue();
+            Verify.verify(sessionID.equals(Iterables.getOnlyElement(stat.sessions)));
+
+            LocalSession session = sessions.get(sessionID);
+            Verify.verifyNotNull(session);
+
+            if (!Iterables.any(ranges, r -> r.intersects(session.ranges)))
+                continue;
+
+            switch (session.getState())
+            {
+                case FINALIZED:
+                    finalized.addStat(stat);
+                    break;
+                case FAILED:
+                    failed.addStat(stat);
+                    break;
+                default:
+                    pending.addStat(stat);
+            }
+        }
+
+        return new PendingStats(cfs.keyspace.getName(), cfs.name, pending.build(), finalized.build(), failed.build());
+    }
+
+    public CleanupSummary cleanup(TableId tid, Collection<Range<Token>> ranges, boolean force)
+    {
+        Iterable<LocalSession> candidates = Iterables.filter(sessions.values(),
+                                                             ls -> ls.isCompleted()
+                                                                   && ls.tableIds.contains(tid)
+                                                                   && Range.intersects(ls.ranges, ranges));
+
+        ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(tid);
+        Set<UUID> sessionIds = Sets.newHashSet(Iterables.transform(candidates, s -> s.sessionID));
+
+
+        return cfs.releaseRepairData(sessionIds, force);
+    }
+
     /**
      * hook for operators to cancel sessions, cancelling from a non-coordinator is an error, unless
      * force is set to true. Messages are sent out to other participants, but we don't wait for a response
@@ -219,6 +346,7 @@ public class LocalSessions
             try
             {
                 LocalSession session = load(row);
+                maybeUpdateRepairedState(session);
                 loadedSessions.put(session.sessionID, session);
             }
             catch (IllegalArgumentException | NullPointerException e)
@@ -275,7 +403,14 @@ public class LocalSessions
                 }
                 else if (shouldDelete(session, now))
                 {
-                    if (!sessionHasData(session))
+                    if (session.getState() == FINALIZED && !isSuperseded(session))
+                    {
+                        // if we delete a non-superseded session, some ranges will be mis-reported as
+                        // not having been repaired in repair_admin after a restart
+                        logger.info("Skipping delete of FINALIZED LocalSession {} because it has " +
+                                    "not been superseded by a more recent session", session.sessionID);
+                    }
+                    else if (!sessionHasData(session))
                     {
                         logger.info("Auto deleting repair session {}", session);
                         deleteSession(session.sessionID);
@@ -370,6 +505,8 @@ public class LocalSessions
                                        session.participants.stream().map(participant -> participant.toString()).collect(Collectors.toSet()),
                                        serializeRanges(session.ranges),
                                        tableIdToUuid(session.tableIds));
+
+        maybeUpdateRepairedState(session);
     }
 
     private static int dateToSeconds(Date d)
diff --git a/src/java/org/apache/cassandra/repair/consistent/RepairedState.java b/src/java/org/apache/cassandra/repair/consistent/RepairedState.java
new file mode 100644
index 0000000..ac0e7cb
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/consistent/RepairedState.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
+
+/**
+ * Tracks the repaired state of token ranges per table, and is effectively an
+ * in memory representation of the on-disk local incremental repair state.
+ *
+ * The main purpose of this class is to provide metrics via nodetool repair_admin. To make sure those metrics
+ * are accurate, it also determines when a completed IR session can be deleted, which is explained in a bit
+ * more detail in LocalSessions#cleanup, by the call to isSuperseded.
+ */
+public class RepairedState
+{
+    static class Level
+    {
+        final List<Range<Token>> ranges;
+        final long repairedAt;
+
+        private static final Comparator<Level> timeComparator = Comparator.comparingLong(l -> -l.repairedAt);
+
+        Level(Collection<Range<Token>> ranges, long repairedAt)
+        {
+            this.ranges = Range.normalize(ranges);
+            this.repairedAt = repairedAt;
+        }
+
+        Level subtract(Collection<Range<Token>> ranges)
+        {
+            if (ranges.isEmpty())
+                return this;
+
+            Set<Range<Token>> difference = Range.subtract(this.ranges, ranges);
+            if (difference.isEmpty())
+                return null;
+
+            return new Level(difference, repairedAt);
+        }
+
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Level level = (Level) o;
+            return repairedAt == level.repairedAt &&
+                   Objects.equals(ranges, level.ranges);
+        }
+
+        public int hashCode()
+        {
+            return Objects.hash(ranges, repairedAt);
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Level{" +
+                   "ranges=" + ranges +
+                   ", repairedAt=" + repairedAt +
+                   '}';
+        }
+    }
+
+    public static class Section
+    {
+        public final Range<Token> range;
+        public final long repairedAt;
+        private static final Comparator<Section> tokenComparator = (l, r) -> l.range.left.compareTo(r.range.left);
+
+        Section(Range<Token> range, long repairedAt)
+        {
+            this.range = range;
+            this.repairedAt = repairedAt;
+        }
+
+        Section makeSubsection(Range<Token> subrange)
+        {
+            Preconditions.checkArgument(range.contains(subrange));
+            return new Section(subrange, repairedAt);
+        }
+
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Section section = (Section) o;
+            return repairedAt == section.repairedAt &&
+                   Objects.equals(range, section.range);
+        }
+
+        public int hashCode()
+        {
+
+            return Objects.hash(range, repairedAt);
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Section{" +
+                   "range=" + range +
+                   ", repairedAt=" + repairedAt +
+                   '}';
+        }
+    }
+
+    public static class Stats
+    {
+        public static final Stats EMPTY = new Stats(UNREPAIRED_SSTABLE, UNREPAIRED_SSTABLE, Collections.emptyList());
+
+        public final long minRepaired;
+        public final long maxRepaired;
+        public final List<Section> sections;
+
+        public Stats(long minRepaired, long maxRepaired, List<Section> sections)
+        {
+            this.minRepaired = minRepaired;
+            this.maxRepaired = maxRepaired;
+            this.sections = sections;
+        }
+
+
+    }
+
+    static class State
+    {
+        final ImmutableList<Level> levels;
+        final ImmutableList<Range<Token>> covered;
+        final ImmutableList<Section> sections;
+
+        State(List<Level> levels, List<Range<Token>> covered, List<Section> sections)
+        {
+            this.levels = ImmutableList.copyOf(levels);
+            this.covered = ImmutableList.copyOf(covered);
+            this.sections = ImmutableList.copyOf(sections);
+        }
+
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            State state = (State) o;
+            return Objects.equals(levels, state.levels) &&
+                   Objects.equals(covered, state.covered);
+        }
+
+        public int hashCode()
+        {
+            return Objects.hash(levels, covered);
+        }
+
+        @Override
+        public String toString()
+        {
+            return "State{" +
+                   "levels=" + levels +
+                   ", covered=" + covered +
+                   '}';
+        }
+    }
+
+    private volatile State state = new State(Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
+
+    State state()
+    {
+        return state;
+    }
+
+    private static List<Section> levelsToSections(List<Level> levels)
+    {
+        List<Section> sections = new ArrayList<>();
+        for (Level level : levels)
+        {
+            for (Range<Token> range : level.ranges)
+            {
+                sections.add(new Section(range, level.repairedAt));
+            }
+        }
+        sections.sort(Section.tokenComparator);
+        return sections;
+    }
+
+    public synchronized void add(Collection<Range<Token>> ranges, long repairedAt)
+    {
+        Level newLevel = new Level(ranges, repairedAt);
+
+        State lastState = state;
+
+        List<Level> tmp = new ArrayList<>(lastState.levels.size() + 1);
+        tmp.addAll(lastState.levels);
+        tmp.add(newLevel);
+        tmp.sort(Level.timeComparator);
+
+        List<Level> levels = new ArrayList<>(lastState.levels.size() + 1);
+        List<Range<Token>> covered = new ArrayList<>();
+
+        for (Level level : tmp)
+        {
+            Level subtracted = level.subtract(covered);
+            if (subtracted == null)
+                continue;
+
+            levels.add(subtracted);
+
+            covered.addAll(subtracted.ranges);
+            covered = Range.normalize(covered);
+        }
+
+        List<Section> sections = new ArrayList<>();
+        for (Level level : levels)
+        {
+            for (Range<Token> range : level.ranges)
+            {
+                sections.add(new Section(range, level.repairedAt));
+            }
+        }
+        sections.sort(Section.tokenComparator);
+
+        state = new State(levels, covered, sections);
+    }
+
+    public long minRepairedAt(Collection<Range<Token>> ranges)
+    {
+        State current = state;
+
+        Set<Range<Token>> remainingRanges = new HashSet<>(ranges);
+        long minTime = Long.MAX_VALUE;
+        for (Section section : current.sections)
+        {
+            if (section.range.intersects(remainingRanges))
+            {
+                minTime = Math.min(minTime, section.repairedAt);
+                remainingRanges = Range.subtract(remainingRanges, Collections.singleton(section.range));
+            }
+
+            if (remainingRanges.isEmpty())
+                break;
+        }
+        // if there are still ranges we don't have data for, part of the requested ranges is unrepaired
+        return remainingRanges.isEmpty() ? minTime : UNREPAIRED_SSTABLE;
+    }
+
+    static List<Section> getRepairedStats(List<Section> sections, Collection<Range<Token>> ranges)
+    {
+        if (ranges.isEmpty())
+            return Collections.emptyList();
+
+        Set<Range<Token>> remaining = Sets.newHashSet(Range.normalize(ranges));
+        List<Section> results = new ArrayList<>();
+
+        for (Section section : sections)
+        {
+            if (remaining.isEmpty())
+                break;
+
+            Set<Range<Token>> sectionRanges = Range.rangeSet(section.range);
+            for (Range<Token> range : remaining)
+            {
+                if (sectionRanges.isEmpty())
+                    break;
+
+                Set<Range<Token>> intersection = new HashSet<>();
+                sectionRanges.forEach(r -> intersection.addAll(r.intersectionWith(range)));
+
+                if (intersection.isEmpty())
+                    continue;
+
+                intersection.forEach(r -> results.add(section.makeSubsection(r)));
+                sectionRanges = Range.subtract(sectionRanges, intersection);
+            }
+
+            remaining = Range.subtract(remaining, Collections.singleton(section.range));
+        }
+
+        remaining.forEach(r -> results.add(new Section(r, UNREPAIRED_SSTABLE)));
+        results.sort(Section.tokenComparator);
+
+        return results;
+    }
+
+    public Stats getRepairedStats(Collection<Range<Token>> ranges)
+    {
+        List<Section> sections = getRepairedStats(state.sections, ranges);
+
+        if (sections.isEmpty())
+            return new Stats(UNREPAIRED_SSTABLE, UNREPAIRED_SSTABLE, Collections.emptyList());
+
+        long minTime = Long.MAX_VALUE;
+        long maxTime = Long.MIN_VALUE;
+
+        for (Section section : sections)
+        {
+            minTime = Math.min(minTime, section.repairedAt);
+            maxTime = Math.max(maxTime, section.repairedAt);
+        }
+
+        return new Stats(minTime, maxTime, sections);
+    }
+}
diff --git a/src/java/org/apache/cassandra/repair/consistent/admin/CleanupSummary.java b/src/java/org/apache/cassandra/repair/consistent/admin/CleanupSummary.java
new file mode 100644
index 0000000..f984411
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/consistent/admin/CleanupSummary.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent.admin;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import javax.management.openmbean.ArrayType;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+
+public class CleanupSummary
+{
+    private static final String[] COMPOSITE_NAMES = new String[] { "keyspace", "table", "successful", "unsuccessful" };
+    private static final OpenType<?>[] COMPOSITE_TYPES;
+    private static final CompositeType COMPOSITE_TYPE;
+
+    static
+    {
+        try
+        {
+            COMPOSITE_TYPES = new OpenType[] { SimpleType.STRING,
+                                               SimpleType.STRING,
+                                               ArrayType.getArrayType(SimpleType.STRING),
+                                               ArrayType.getArrayType(SimpleType.STRING) };
+            COMPOSITE_TYPE = new CompositeType(RepairStats.Section.class.getName(), "PendingStats",
+                                               COMPOSITE_NAMES, COMPOSITE_NAMES, COMPOSITE_TYPES);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public final String keyspace;
+    public final String table;
+
+    public final Set<UUID> successful;
+    public final Set<UUID> unsuccessful;
+
+    public CleanupSummary(String keyspace, String table, Set<UUID> successful, Set<UUID> unsuccessful)
+    {
+        this.keyspace = keyspace;
+        this.table = table;
+        this.successful = successful;
+        this.unsuccessful = unsuccessful;
+    }
+
+    public CleanupSummary(ColumnFamilyStore cfs, Set<UUID> successful, Set<UUID> unsuccessful)
+    {
+        this(cfs.keyspace.getName(), cfs.name, successful, unsuccessful);
+    }
+
+    public static CleanupSummary add(CleanupSummary l, CleanupSummary r)
+    {
+        Preconditions.checkArgument(l.keyspace.equals(r.keyspace));
+        Preconditions.checkArgument(l.table.equals(r.table));
+
+        Set<UUID> unsuccessful = new HashSet<>(l.unsuccessful);
+        unsuccessful.addAll(r.unsuccessful);
+
+        Set<UUID> successful = new HashSet<>(l.successful);
+        successful.addAll(r.successful);
+        successful.removeAll(unsuccessful);
+
+        return new CleanupSummary(l.keyspace, l.table, successful, unsuccessful);
+    }
+
+    private static String[] uuids2Strings(Set<UUID> uuids)
+    {
+        String[] strings = new String[uuids.size()];
+        int idx = 0;
+        for (UUID uuid : uuids)
+            strings[idx++] = uuid.toString();
+        return strings;
+    }
+
+    private static Set<UUID> strings2Uuids(String[] strings)
+    {
+        Set<UUID> uuids = Sets.newHashSetWithExpectedSize(strings.length);
+        for (String string : strings)
+            uuids.add(UUID.fromString(string));
+
+        return uuids;
+    }
+
+    public CompositeData toComposite()
+    {
+        Map<String, Object> values = new HashMap<>();
+        values.put(COMPOSITE_NAMES[0], keyspace);
+        values.put(COMPOSITE_NAMES[1], table);
+        values.put(COMPOSITE_NAMES[2], uuids2Strings(successful));
+        values.put(COMPOSITE_NAMES[3], uuids2Strings(unsuccessful));
+        try
+        {
+            return new CompositeDataSupport(COMPOSITE_TYPE, values);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static CleanupSummary fromComposite(CompositeData cd)
+    {
+        Preconditions.checkArgument(cd.getCompositeType().equals(COMPOSITE_TYPE));
+        Object[] values = cd.getAll(COMPOSITE_NAMES);
+        return new CleanupSummary((String) values[0],
+                                  (String) values[1],
+                                  strings2Uuids((String[]) values[2]),
+                                  strings2Uuids((String[]) values[3]));
+
+    }
+}
diff --git a/src/java/org/apache/cassandra/repair/consistent/admin/PendingStat.java b/src/java/org/apache/cassandra/repair/consistent/admin/PendingStat.java
new file mode 100644
index 0000000..0c4424e
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/consistent/admin/PendingStat.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent.admin;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import javax.management.openmbean.ArrayType;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileUtils;
+
+public class PendingStat
+{
+    private static final String[] COMPOSITE_NAMES = new String[] {"dataSize", "numSSTables", "sessions"};
+    private static final OpenType<?>[] COMPOSITE_TYPES;
+    public static final CompositeType COMPOSITE_TYPE;
+
+    static
+    {
+        try
+        {
+            COMPOSITE_TYPES = new OpenType[] { SimpleType.LONG, SimpleType.INTEGER, ArrayType.getArrayType(SimpleType.STRING) };
+            COMPOSITE_TYPE = new CompositeType(PendingStat.class.getName(),
+                                               PendingStat.class.getSimpleName(),
+                                               COMPOSITE_NAMES, COMPOSITE_NAMES, COMPOSITE_TYPES);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+
+
+    public final long dataSize;
+    public final int numSSTables;
+    public final Set<UUID> sessions;
+
+    public PendingStat(long dataSize, int numSSTables, Set<UUID> sessions)
+    {
+        this.dataSize = dataSize;
+        this.numSSTables = numSSTables;
+        this.sessions = Collections.unmodifiableSet(sessions);
+    }
+
+    public String sizeString()
+    {
+        return String.format("%s (%s sstables / %s sessions)", FileUtils.stringifyFileSize(dataSize), numSSTables, sessions.size());
+    }
+
+    public CompositeData toComposite()
+    {
+        Map<String, Object> values = new HashMap<>();
+        values.put(COMPOSITE_NAMES[0], dataSize);
+        values.put(COMPOSITE_NAMES[1], numSSTables);
+        String[] sessionIds = new String[sessions.size()];
+        int idx = 0;
+        for (UUID session : sessions)
+            sessionIds[idx++] = session.toString();
+        values.put(COMPOSITE_NAMES[2], sessionIds);
+
+        try
+        {
+            return new CompositeDataSupport(COMPOSITE_TYPE, values);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static PendingStat fromComposite(CompositeData cd)
+    {
+        Preconditions.checkArgument(cd.getCompositeType().equals(COMPOSITE_TYPE));
+        Object[] values = cd.getAll(COMPOSITE_NAMES);
+        Set<UUID> sessions = new HashSet<>();
+        for (String session : (String[]) values[2])
+            sessions.add(UUID.fromString(session));
+        return new PendingStat((long) values[0], (int) values[1], sessions);
+    }
+
+    public static class Builder
+    {
+        public long dataSize = 0;
+        public int numSSTables = 0;
+        public Set<UUID> sessions = new HashSet<>();
+
+        public Builder addSSTable(SSTableReader sstable)
+        {
+            UUID sessionID = sstable.getPendingRepair();
+            if (sessionID == null)
+                return this;
+            dataSize += sstable.onDiskLength();
+            sessions.add(sessionID);
+            numSSTables++;
+            return this;
+        }
+
+        public Builder addStat(PendingStat stat)
+        {
+            dataSize += stat.dataSize;
+            numSSTables += stat.numSSTables;
+            sessions.addAll(stat.sessions);
+            return this;
+        }
+
+        public PendingStat build()
+        {
+            return new PendingStat(dataSize, numSSTables, sessions);
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/repair/consistent/admin/PendingStats.java b/src/java/org/apache/cassandra/repair/consistent/admin/PendingStats.java
new file mode 100644
index 0000000..9dbe0df
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/consistent/admin/PendingStats.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent.admin;
+
+import java.util.HashMap;
+import java.util.Map;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+
+public class PendingStats
+{
+
+    private static final String[] COMPOSITE_NAMES = new String[] { "keyspace", "table", "total", "pending", "finalized", "failed" };
+    private static final OpenType<?>[] COMPOSITE_TYPES;
+    private static final CompositeType COMPOSITE_TYPE;
+
+    static
+    {
+        try
+        {
+            COMPOSITE_TYPES = new OpenType[] { SimpleType.STRING,
+                                               SimpleType.STRING,
+                                               PendingStat.COMPOSITE_TYPE,
+                                               PendingStat.COMPOSITE_TYPE,
+                                               PendingStat.COMPOSITE_TYPE};
+            COMPOSITE_TYPE = new CompositeType(RepairStats.Section.class.getName(), "PendingStats", COMPOSITE_NAMES, COMPOSITE_NAMES, COMPOSITE_TYPES);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public final String keyspace;
+    public final String table;
+    public final PendingStat total;
+    public final PendingStat pending;
+    public final PendingStat finalized;
+    public final PendingStat failed;
+
+    public PendingStats(String keyspace, String table, PendingStat pending, PendingStat finalized, PendingStat failed)
+    {
+        this.keyspace = keyspace;
+        this.table = table;
+
+        this.total = new PendingStat.Builder().addStat(pending).addStat(finalized).addStat(failed).build();
+        this.pending = pending;
+        this.finalized = finalized;
+        this.failed = failed;
+    }
+
+    public CompositeData toComposite()
+    {
+        Map<String, Object> values = new HashMap<>();
+        values.put(COMPOSITE_NAMES[0], keyspace);
+        values.put(COMPOSITE_NAMES[1], table);
+        values.put(COMPOSITE_NAMES[2], pending.toComposite());
+        values.put(COMPOSITE_NAMES[3], finalized.toComposite());
+        values.put(COMPOSITE_NAMES[4], failed.toComposite());
+        try
+        {
+            return new CompositeDataSupport(COMPOSITE_TYPE, values);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static PendingStats fromComposite(CompositeData cd)
+    {
+        Preconditions.checkArgument(cd.getCompositeType().equals(COMPOSITE_TYPE));
+        Object[] values = cd.getAll(COMPOSITE_NAMES);
+        return new PendingStats((String) values[0],
+                                (String) values[1],
+                                PendingStat.fromComposite((CompositeData) values[2]),
+                                PendingStat.fromComposite((CompositeData) values[3]),
+                                PendingStat.fromComposite((CompositeData) values[3]));
+
+    }
+}
diff --git a/src/java/org/apache/cassandra/repair/consistent/admin/RepairStats.java b/src/java/org/apache/cassandra/repair/consistent/admin/RepairStats.java
new file mode 100644
index 0000000..bbb4778
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/consistent/admin/RepairStats.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent.admin;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.management.openmbean.*;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.repair.consistent.RepairedState;
+
+public class RepairStats
+{
+    public static class Section
+    {
+
+        private static final String[] COMPOSITE_NAMES = new String[] { "start", "end", "repairedAt" };
+        private static final OpenType<?>[] COMPOSITE_TYPES;
+        private static final CompositeType COMPOSITE_TYPE;
+
+        static
+        {
+            try
+            {
+                COMPOSITE_TYPES = new OpenType[] { SimpleType.STRING, SimpleType.STRING, SimpleType.LONG };
+                COMPOSITE_TYPE = new CompositeType(Section.class.getName(), "Section", COMPOSITE_NAMES, COMPOSITE_NAMES, COMPOSITE_TYPES);
+            }
+            catch (OpenDataException e)
+            {
+                throw Throwables.propagate(e);
+            }
+        }
+
+        public final String start;
+        public final String end;
+        public final long time;
+
+        public Section(String start, String end, long time)
+        {
+            this.start = start;
+            this.end = end;
+            this.time = time;
+        }
+
+        private CompositeData toComposite()
+        {
+            Map<String, Object> values = new HashMap<>();
+            values.put(COMPOSITE_NAMES[0], start);
+            values.put(COMPOSITE_NAMES[1], end);
+            values.put(COMPOSITE_NAMES[2], time);
+
+            try
+            {
+                return new CompositeDataSupport(COMPOSITE_TYPE, values);
+            }
+            catch (OpenDataException e)
+            {
+                throw Throwables.propagate(e);
+            }
+        }
+
+        private static Section fromComposite(CompositeData cd)
+        {
+            Preconditions.checkArgument(cd.getCompositeType().equals(COMPOSITE_TYPE));
+            Object[] values = cd.getAll(COMPOSITE_NAMES);
+            String start = (String) values[0];
+            String end = (String) values[1];
+            long time = (long) values[2];
+            return new Section(start, end, time);
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("(%s,%s]=%s", start, end, time);
+        }
+    }
+
+    private static final String[] COMPOSITE_NAMES = new String[] { "keyspace", "table", "minRepaired", "maxRepaired", "sections" };
+    private static final OpenType<?>[] COMPOSITE_TYPES;
+    private static final CompositeType COMPOSITE_TYPE;
+
+    static
+    {
+        try
+        {
+            COMPOSITE_TYPES = new OpenType[] { SimpleType.STRING, SimpleType.STRING,
+                                               SimpleType.LONG, SimpleType.LONG,
+                                               ArrayType.getArrayType(Section.COMPOSITE_TYPE)};
+            COMPOSITE_TYPE = new CompositeType(RepairStats.class.getName(), RepairStats.class.getSimpleName(),
+                                               COMPOSITE_NAMES, COMPOSITE_NAMES, COMPOSITE_TYPES);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public final String keyspace;
+    public final String table;
+    public final long minRepaired;
+    public final long maxRepaired;
+    public final List<Section> sections;
+
+    private RepairStats(String keyspace, String table, long minRepaired, long maxRepaired, List<Section> sections)
+    {
+        this.keyspace = keyspace;
+        this.table = table;
+        this.minRepaired = minRepaired;
+        this.maxRepaired = maxRepaired;
+        this.sections = sections;
+    }
+
+    public static List<Section> convertSections(List<RepairedState.Section> from)
+    {
+        List<Section> to = new ArrayList<>(from.size());
+        for (RepairedState.Section section : from)
+        {
+            to.add(new Section(section.range.left.toString(), section.range.right.toString(), section.repairedAt));
+        }
+        return to;
+    }
+
+    public static RepairStats fromRepairState(String keyspace, String table, RepairedState.Stats stats)
+    {
+        return new RepairStats(keyspace, table, stats.minRepaired, stats.maxRepaired, convertSections(stats.sections));
+    }
+
+    public CompositeData toComposite()
+    {
+        Map<String, Object> values = new HashMap<>();
+        values.put(COMPOSITE_NAMES[0], keyspace);
+        values.put(COMPOSITE_NAMES[1], table);
+        values.put(COMPOSITE_NAMES[2], minRepaired);
+        values.put(COMPOSITE_NAMES[3], maxRepaired);
+
+        CompositeData[] compositeSections = new CompositeData[sections.size()];
+        for (int i=0; i<sections.size(); i++)
+            compositeSections[i] = sections.get(i).toComposite();
+
+        values.put(COMPOSITE_NAMES[4], compositeSections);
+        try
+        {
+            return new CompositeDataSupport(COMPOSITE_TYPE, values);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static RepairStats fromComposite(CompositeData cd)
+    {
+        Preconditions.checkArgument(cd.getCompositeType().equals(COMPOSITE_TYPE));
+        Object[] values = cd.getAll(COMPOSITE_NAMES);
+
+        String keyspace = (String) values[0];
+        String table = (String) values[1];
+        long minRepaired = (long) values[2];
+        long maxRepaired = (long) values[3];
+        CompositeData[] sectionData = (CompositeData[]) values[4];
+        List<Section> sections = new ArrayList<>(sectionData.length);
+        for (CompositeData scd : sectionData)
+            sections.add(Section.fromComposite(scd));
+        return new RepairStats(keyspace, table, minRepaired, maxRepaired, sections);
+    }
+}
diff --git a/src/java/org/apache/cassandra/repair/consistent/admin/SchemaArgsParser.java b/src/java/org/apache/cassandra/repair/consistent/admin/SchemaArgsParser.java
new file mode 100644
index 0000000..67c0244
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/consistent/admin/SchemaArgsParser.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent.admin;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.utils.AbstractIterator;
+
+public class SchemaArgsParser implements Iterable<ColumnFamilyStore>
+{
+
+    private final List<String> schemaArgs;
+
+    private SchemaArgsParser(List<String> schemaArgs)
+    {
+        this.schemaArgs = schemaArgs;
+    }
+
+    private static class TableIterator extends AbstractIterator<ColumnFamilyStore>
+    {
+        private final Iterator<ColumnFamilyStore> tables;
+
+        public TableIterator(String ksName, List<String> tableNames)
+        {
+            Preconditions.checkArgument(Schema.instance.getKeyspaceMetadata(ksName) != null);
+            Keyspace keyspace = Keyspace.open(ksName);
+
+            if (tableNames.isEmpty())
+            {
+                tables = keyspace.getColumnFamilyStores().iterator();
+            }
+            else
+            {
+                tables = Lists.newArrayList(Iterables.transform(tableNames, tn -> keyspace.getColumnFamilyStore(tn))).iterator();
+            }
+        }
+
+        @Override
+        protected ColumnFamilyStore computeNext()
+        {
+            return tables.hasNext() ? tables.next() : endOfData();
+        }
+    }
+
+    @Override
+    public Iterator<ColumnFamilyStore> iterator()
+    {
+        if (schemaArgs.isEmpty())
+        {
+            // iterate over everything
+            Iterator<String> ksNames = Schema.instance.getNonLocalStrategyKeyspaces().iterator();
+
+            return new AbstractIterator<ColumnFamilyStore>()
+            {
+                TableIterator current = null;
+                protected ColumnFamilyStore computeNext()
+                {
+                    for (;;)
+                    {
+                        if (current != null && current.hasNext())
+                        {
+                            return current.next();
+                        }
+
+                        if (ksNames.hasNext())
+                        {
+                            current = new TableIterator(ksNames.next(), Collections.emptyList());
+                            continue;
+                        }
+
+                        return endOfData();
+                    }
+                }
+            };
+
+        }
+        else
+        {
+            return new TableIterator(schemaArgs.get(0), schemaArgs.subList(1, schemaArgs.size()));
+        }
+    }
+
+    public static Iterable<ColumnFamilyStore> parse(List<String> schemaArgs)
+    {
+        return new SchemaArgsParser(schemaArgs);
+    }
+
+    public static Iterable<ColumnFamilyStore> parse(String... schemaArgs)
+    {
+        return parse(Lists.newArrayList(schemaArgs));
+    }
+}
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
index adcd776..f7ed052 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@ -57,6 +57,30 @@ public class RepairOption
 
     private static final Logger logger = LoggerFactory.getLogger(RepairOption.class);
 
+    public static Set<Range<Token>> parseRanges(String rangesStr, IPartitioner partitioner)
+    {
+        if (rangesStr == null || rangesStr.isEmpty())
+            return Collections.emptySet();
+
+        Set<Range<Token>> ranges = new HashSet<>();
+        StringTokenizer tokenizer = new StringTokenizer(rangesStr, ",");
+        while (tokenizer.hasMoreTokens())
+        {
+            String[] rangeStr = tokenizer.nextToken().split(":", 2);
+            if (rangeStr.length < 2)
+            {
+                continue;
+            }
+            Token parsedBeginToken = partitioner.getTokenFactory().fromString(rangeStr[0].trim());
+            Token parsedEndToken = partitioner.getTokenFactory().fromString(rangeStr[1].trim());
+            if (parsedBeginToken.equals(parsedEndToken))
+            {
+                throw new IllegalArgumentException("Start and end tokens must be different.");
+            }
+            ranges.add(new Range<>(parsedBeginToken, parsedEndToken));
+        }
+        return ranges;
+    }
     /**
      * Construct RepairOptions object from given map of Strings.
      * <p>
@@ -165,28 +189,10 @@ public class RepairOption
             }
             catch (NumberFormatException ignore) {}
         }
+
         // ranges
-        String rangesStr = options.get(RANGES_KEY);
-        Set<Range<Token>> ranges = new HashSet<>();
-        if (rangesStr != null)
-        {
-            StringTokenizer tokenizer = new StringTokenizer(rangesStr, ",");
-            while (tokenizer.hasMoreTokens())
-            {
-                String[] rangeStr = tokenizer.nextToken().split(":", 2);
-                if (rangeStr.length < 2)
-                {
-                    continue;
-                }
-                Token parsedBeginToken = partitioner.getTokenFactory().fromString(rangeStr[0].trim());
-                Token parsedEndToken = partitioner.getTokenFactory().fromString(rangeStr[1].trim());
-                if (parsedBeginToken.equals(parsedEndToken))
-                {
-                    throw new IllegalArgumentException("Start and end tokens must be different.");
-                }
-                ranges.add(new Range<>(parsedBeginToken, parsedEndToken));
-            }
-        }
+        Set<Range<Token>> ranges = parseRanges(options.get(RANGES_KEY), partitioner);
+
         boolean asymmetricSyncing = Boolean.parseBoolean(options.get(OPTIMISE_STREAMS_KEY));
 
         RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, force, previewKind, asymmetricSyncing);
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index e61bd38..3b13907 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -22,6 +22,7 @@ import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
+import javax.management.openmbean.CompositeData;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -31,7 +32,6 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
 import com.google.common.util.concurrent.AbstractFuture;
-
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 
@@ -55,9 +55,9 @@ import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.gms.IFailureDetector;
 import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
 import org.apache.cassandra.gms.IFailureDetectionEventListener;
+import org.apache.cassandra.gms.IFailureDetector;
 import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.TokenMetadata;
@@ -66,14 +66,23 @@ import org.apache.cassandra.net.RequestCallback;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.CommonRange;
-import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.repair.RepairJobDesc;
 import org.apache.cassandra.repair.RepairParallelism;
 import org.apache.cassandra.repair.RepairSession;
 import org.apache.cassandra.repair.consistent.CoordinatorSessions;
 import org.apache.cassandra.repair.consistent.LocalSessions;
-import org.apache.cassandra.repair.messages.*;
+import org.apache.cassandra.repair.consistent.admin.CleanupSummary;
+import org.apache.cassandra.repair.consistent.admin.PendingStats;
+import org.apache.cassandra.repair.consistent.admin.RepairStats;
+import org.apache.cassandra.repair.consistent.RepairedState;
+import org.apache.cassandra.repair.consistent.admin.SchemaArgsParser;
+import org.apache.cassandra.repair.messages.PrepareMessage;
+import org.apache.cassandra.repair.messages.RepairMessage;
+import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.repair.messages.SyncResponse;
+import org.apache.cassandra.repair.messages.ValidationResponse;
 import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.Pair;
@@ -210,9 +219,10 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
     }
 
     @Override
-    public List<Map<String, String>> getSessions(boolean all)
+    public List<Map<String, String>> getSessions(boolean all, String rangesStr)
     {
-        return consistent.local.sessionInfo(all);
+        Set<Range<Token>> ranges = RepairOption.parseRanges(rangesStr, DatabaseDescriptor.getPartitioner());
+        return consistent.local.sessionInfo(all, ranges);
     }
 
     @Override
@@ -234,6 +244,65 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         return DatabaseDescriptor.getRepairSessionSpaceInMegabytes();
     }
 
+    public List<CompositeData> getRepairStats(List<String> schemaArgs, String rangeString)
+    {
+        List<CompositeData> stats = new ArrayList<>();
+        Collection<Range<Token>> userRanges = rangeString != null
+                                              ? RepairOption.parseRanges(rangeString, DatabaseDescriptor.getPartitioner())
+                                              : null;
+
+        for (ColumnFamilyStore cfs : SchemaArgsParser.parse(schemaArgs))
+        {
+            String keyspace = cfs.keyspace.getName();
+            Collection<Range<Token>> ranges = userRanges != null
+                                              ? userRanges
+                                              : StorageService.instance.getLocalReplicas(keyspace).ranges();
+            RepairedState.Stats cfStats = consistent.local.getRepairedStats(cfs.metadata().id, ranges);
+            stats.add(RepairStats.fromRepairState(keyspace, cfs.name, cfStats).toComposite());
+        }
+
+        return stats;
+    }
+
+    @Override
+    public List<CompositeData> getPendingStats(List<String> schemaArgs, String rangeString)
+    {
+        List<CompositeData> stats = new ArrayList<>();
+        Collection<Range<Token>> userRanges = rangeString != null
+                                              ? RepairOption.parseRanges(rangeString, DatabaseDescriptor.getPartitioner())
+                                              : null;
+        for (ColumnFamilyStore cfs : SchemaArgsParser.parse(schemaArgs))
+        {
+            String keyspace = cfs.keyspace.getName();
+            Collection<Range<Token>> ranges = userRanges != null
+                                              ? userRanges
+                                              : StorageService.instance.getLocalReplicas(keyspace).ranges();
+            PendingStats cfStats = consistent.local.getPendingStats(cfs.metadata().id, ranges);
+            stats.add(cfStats.toComposite());
+        }
+
+        return stats;
+    }
+
+    @Override
+    public List<CompositeData> cleanupPending(List<String> schemaArgs, String rangeString, boolean force)
+    {
+        List<CompositeData> stats = new ArrayList<>();
+        Collection<Range<Token>> userRanges = rangeString != null
+                                              ? RepairOption.parseRanges(rangeString, DatabaseDescriptor.getPartitioner())
+                                              : null;
+        for (ColumnFamilyStore cfs : SchemaArgsParser.parse(schemaArgs))
+        {
+            String keyspace = cfs.keyspace.getName();
+            Collection<Range<Token>> ranges = userRanges != null
+                                              ? userRanges
+                                              : StorageService.instance.getLocalReplicas(keyspace).ranges();
+            CleanupSummary summary = consistent.local.cleanup(cfs.metadata().id, ranges, force);
+            stats.add(summary.toComposite());
+        }
+        return stats;
+    }
+
     /**
      * Requests repairs for the given keyspace and column families.
      *
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java b/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
index f4d6c48..8cffecc 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
@@ -20,12 +20,13 @@ package org.apache.cassandra.service;
 
 import java.util.List;
 import java.util.Map;
+import javax.management.openmbean.CompositeData;
 
 public interface ActiveRepairServiceMBean
 {
     public static final String MBEAN_NAME = "org.apache.cassandra.db:type=RepairService";
 
-    public List<Map<String, String>> getSessions(boolean all);
+    public List<Map<String, String>> getSessions(boolean all, String rangesStr);
     public void failSession(String session, boolean force);
 
     public void setRepairSessionSpaceInMegabytes(int sizeInMegabytes);
@@ -36,4 +37,8 @@ public interface ActiveRepairServiceMBean
 
     public int getRepairPendingCompactionRejectThreshold();
     public void setRepairPendingCompactionRejectThreshold(int value);
+
+    public List<CompositeData> getRepairStats(List<String> schemaArgs, String rangeString);
+    public List<CompositeData> getPendingStats(List<String> schemaArgs, String rangeString);
+    public List<CompositeData> cleanupPending(List<String> schemaArgs, String rangeString, boolean force);
 }
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index bf5e5cc..69cd04c 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -153,7 +153,6 @@ public class NodeTool
                 ReloadSeeds.class,
                 ResetFullQueryLog.class,
                 Repair.class,
-                RepairAdmin.class,
                 ReplayBatchlog.class,
                 SetCacheCapacity.class,
                 SetConcurrency.class,
@@ -224,6 +223,15 @@ public class NodeTool
                 .withDefaultCommand(CassHelp.class)
                 .withCommand(BootstrapResume.class);
 
+        builder.withGroup("repair_admin")
+               .withDescription("list and fail incremental repair sessions")
+               .withDefaultCommand(RepairAdmin.ListCmd.class)
+               .withCommand(RepairAdmin.ListCmd.class)
+               .withCommand(RepairAdmin.CancelCmd.class)
+               .withCommand(RepairAdmin.CleanupDataCmd.class)
+               .withCommand(RepairAdmin.SummarizePendingCmd.class)
+               .withCommand(RepairAdmin.SummarizeRepairedCmd.class);
+
         Cli<Consumer<INodeProbeFactory>> parser = builder.build();
 
         int status = 0;
diff --git a/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java b/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java
index ba3cf62..b66c32a 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java
@@ -22,13 +22,23 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import javax.management.openmbean.CompositeData;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
+import org.apache.commons.lang3.StringUtils;
+
+import io.airlift.airline.Arguments;
 import io.airlift.airline.Command;
 import io.airlift.airline.Option;
 import org.apache.cassandra.repair.consistent.LocalSessionInfo;
+import org.apache.cassandra.repair.consistent.admin.CleanupSummary;
+import org.apache.cassandra.repair.consistent.admin.PendingStat;
+import org.apache.cassandra.repair.consistent.admin.PendingStats;
+import org.apache.cassandra.repair.consistent.admin.RepairStats;
 import org.apache.cassandra.service.ActiveRepairServiceMBean;
 import org.apache.cassandra.tools.NodeProbe;
 import org.apache.cassandra.tools.NodeTool;
@@ -37,113 +47,287 @@ import org.apache.cassandra.utils.FBUtilities;
 /**
  * Supports listing and failing incremental repair sessions
  */
-@Command(name = "repair_admin", description = "list and fail incremental repair sessions")
-public class RepairAdmin extends NodeTool.NodeToolCmd
+public abstract class RepairAdmin extends NodeTool.NodeToolCmd
 {
-    @Option(title = "list", name = {"-l", "--list"}, description = "list repair sessions (default behavior)")
-    private boolean list = false;
-
-    @Option(title = "all", name = {"-a", "--all"}, description = "include completed and failed sessions")
-    private boolean all = false;
+    @Command(name = "list", description = "list repair sessions")
+    public static class ListCmd extends RepairAdmin
+    {
+        @Option(title = "all", name = {"-a", "--all"}, description = "include completed and failed sessions")
+        private boolean all;
 
-    @Option(title = "cancel", name = {"-x", "--cancel"}, description = "cancel an incremental repair session")
-    private String cancel = null;
+        @Option(title = "start_token", name = {"-st", "--start-token"}, description = "Use -st to specify a token at which the repair range starts")
+        private String startToken = StringUtils.EMPTY;
 
-    @Option(title = "force", name = {"-f", "--force"}, description = "cancel repair session from a node other than the repair coordinator." +
-                                                                     " Attempting to cancel FINALIZED or FAILED sessions is an error.")
-    private boolean force = false;
+        @Option(title = "end_token", name = {"-et", "--end-token"}, description = "Use -et to specify a token at which repair range ends")
+        private String endToken = StringUtils.EMPTY;
 
-    private static final List<String> header = Lists.newArrayList("id",
-                                                                  "state",
-                                                                  "last activity",
-                                                                  "coordinator",
-                                                                  "participants",
-                                                                  "participants_wp");
+        protected void execute(NodeProbe probe)
+        {
+            List<Map<String, String>> sessions = probe.getRepairServiceProxy().getSessions(all, getRangeString(startToken, endToken));
+            if (sessions.isEmpty())
+            {
+                System.out.println("no sessions");
 
+            }
+            else
+            {
+                List<List<String>> rows = new ArrayList<>();
+                rows.add(Lists.newArrayList("id",
+                                            "state",
+                                            "last activity",
+                                            "coordinator",
+                                            "participants",
+                                            "participants_wp"));
+                int now = FBUtilities.nowInSeconds();
+                for (Map<String, String> session : sessions)
+                {
+                    int updated = Integer.parseInt(session.get(LocalSessionInfo.LAST_UPDATE));
+                    List<String> values = Lists.newArrayList(session.get(LocalSessionInfo.SESSION_ID),
+                                                             session.get(LocalSessionInfo.STATE),
+                                                             (now - updated) + " (s)",
+                                                             session.get(LocalSessionInfo.COORDINATOR),
+                                                             session.get(LocalSessionInfo.PARTICIPANTS),
+                                                             session.get(LocalSessionInfo.PARTICIPANTS_WP));
+                    rows.add(values);
+                }
 
-    private List<String> sessionValues(Map<String, String> session, int now)
-    {
-        int updated = Integer.parseInt(session.get(LocalSessionInfo.LAST_UPDATE));
-        return Lists.newArrayList(session.get(LocalSessionInfo.SESSION_ID),
-                                  session.get(LocalSessionInfo.STATE),
-                                  Integer.toString(now - updated) + " (s)",
-                                  session.get(LocalSessionInfo.COORDINATOR),
-                                  session.get(LocalSessionInfo.PARTICIPANTS),
-                                  session.get(LocalSessionInfo.PARTICIPANTS_WP));
+                printTable(rows);
+            }
+        }
     }
-
-    private void listSessions(ActiveRepairServiceMBean repairServiceProxy)
+    @Command(name = "summarize-pending", description = "report the amount of data marked pending repair for the given token " +
+                                                       "range (or all replicated range if no tokens are provided")
+    public static class SummarizePendingCmd extends RepairAdmin
     {
-        Preconditions.checkArgument(cancel == null);
-        Preconditions.checkArgument(!force, "-f/--force only valid for session cancel");
-        List<Map<String, String>> sessions = repairServiceProxy.getSessions(all);
-        if (sessions.isEmpty())
-        {
-            System.out.println("no sessions");
+        @Option(title = "verbose", name = {"-v", "--verbose"}, description = "print additional info ")
+        private boolean verbose;
 
-        }
-        else
+        @Option(title = "start_token", name = {"-st", "--start-token"}, description = "Use -st to specify a token at which the repair range starts")
+        private String startToken = StringUtils.EMPTY;
+
+        @Option(title = "end_token", name = {"-et", "--end-token"}, description = "Use -et to specify a token at which repair range ends")
+        private String endToken = StringUtils.EMPTY;
+
+        @Arguments(usage = "[<keyspace> <tables>...]", description = "The keyspace followed by one or many tables")
+        private List<String> schemaArgs = new ArrayList<>();
+
+        protected void execute(NodeProbe probe)
         {
-            List<List<String>> rows = new ArrayList<>();
-            rows.add(header);
-            int now = FBUtilities.nowInSeconds();
-            for (Map<String, String> session : sessions)
+            List<CompositeData> cds = probe.getRepairServiceProxy().getPendingStats(schemaArgs, getRangeString(startToken, endToken));
+            List<PendingStats> stats = new ArrayList<>(cds.size());
+            cds.forEach(cd -> stats.add(PendingStats.fromComposite(cd)));
+
+            stats.sort((l, r) -> {
+                int cmp = l.keyspace.compareTo(r.keyspace);
+                if (cmp != 0)
+                    return cmp;
+
+                return l.table.compareTo(r.table);
+            });
+
+            List<String> header = Lists.newArrayList("keyspace", "table", "total");
+            if (verbose)
             {
-                rows.add(sessionValues(session, now));
+                header.addAll(Lists.newArrayList("pending", "finalized", "failed"));
             }
 
-            // get max col widths
-            int[] widths = new int[header.size()];
-            for (List<String> row : rows)
+            List<List<String>> rows = new ArrayList<>(stats.size() + 1);
+            rows.add(header);
+
+            for (PendingStats stat : stats)
             {
-                assert row.size() == widths.length;
-                for (int i = 0; i < widths.length; i++)
+                List<String> row = new ArrayList<>(header.size());
+
+                row.add(stat.keyspace);
+                row.add(stat.table);
+                row.add(stat.total.sizeString());
+                if (verbose)
                 {
-                    widths[i] = Math.max(widths[i], row.get(i).length());
+                    row.add(stat.pending.sizeString());
+                    row.add(stat.finalized.sizeString());
+                    row.add(stat.failed.sizeString());
                 }
+                rows.add(row);
             }
 
-            List<String> fmts = new ArrayList<>(widths.length);
-            for (int i = 0; i < widths.length; i++)
+            printTable(rows);
+        }
+    }
+
+    @Command(name = "summarize-repaired", description = "return the most recent repairedAt timestamp for the given token range " +
+                                                        "(or all replicated ranges if no tokens are provided)")
+    public static class SummarizeRepairedCmd extends RepairAdmin
+    {
+        @Option(title = "verbose", name = {"-v", "--verbose"}, description = "print additional info ")
+        private boolean verbose = false;
+
+        @Option(title = "start_token", name = {"-st", "--start-token"}, description = "Use -st to specify a token at which the repair range starts")
+        private String startToken = StringUtils.EMPTY;
+
+        @Option(title = "end_token", name = {"-et", "--end-token"}, description = "Use -et to specify a token at which repair range ends")
+        private String endToken = StringUtils.EMPTY;
+
+        @Arguments(usage = "[<keyspace> <tables>...]", description = "The keyspace followed by one or many tables")
+        private List<String> schemaArgs = new ArrayList<>();
+
+        protected void execute(NodeProbe probe)
+        {
+            List<CompositeData> compositeData = probe.getRepairServiceProxy().getRepairStats(schemaArgs, getRangeString(startToken, endToken));
+
+            if (compositeData.isEmpty())
             {
-                fmts.add("%-" + Integer.toString(widths[i]) + "s");
+                System.out.println("no stats");
+                return;
             }
 
+            List<RepairStats> stats = new ArrayList<>(compositeData.size());
+            compositeData.forEach(cd -> stats.add(RepairStats.fromComposite(cd)));
+
+            stats.sort((l, r) -> {
+                int cmp = l.keyspace.compareTo(r.keyspace);
+                if (cmp != 0)
+                    return cmp;
+
+                return l.table.compareTo(r.table);
+            });
+
+            List<String> header = Lists.newArrayList("keyspace", "table", "min_repaired", "max_repaired");
+            if (verbose)
+                header.add("detail");
 
-            // print
-            for (List<String> row : rows)
+            List<List<String>> rows = new ArrayList<>(stats.size() + 1);
+            rows.add(header);
+
+            for (RepairStats stat : stats)
             {
-                List<String> formatted = new ArrayList<>(row.size());
-                for (int i = 0; i < widths.length; i++)
+                List<String> row = Lists.newArrayList(stat.keyspace,
+                                                      stat.table,
+                                                      Long.toString(stat.minRepaired),
+                                                      Long.toString(stat.maxRepaired));
+                if (verbose)
                 {
-                    formatted.add(String.format(fmts.get(i), row.get(i)));
+                    row.add(Joiner.on(", ").join(Iterables.transform(stat.sections, RepairStats.Section::toString)));
                 }
-                System.out.println(Joiner.on(" | ").join(formatted));
+                rows.add(row);
             }
+
+            printTable(rows);
         }
     }
 
-    private void cancelSession(ActiveRepairServiceMBean repairServiceProxy)
+    @Command(name = "cleanup", description = "cleans up pending data from completed sessions. " +
+                                             "This happens automatically, but the command is provided " +
+                                             "for situations where it needs to be expedited." +
+                                            " Use --force to cancel compactions that are preventing promotion")
+    public static class CleanupDataCmd extends RepairAdmin
     {
-        Preconditions.checkArgument(!list);
-        Preconditions.checkArgument(!all, "-a/--all only valid for session list");
-        repairServiceProxy.failSession(cancel, force);
+        @Option(title = "force", name = {"-f", "--force"}, description = "Force a cleanup.")
+        private boolean force = false;
+
+        @Option(title = "start_token", name = {"-st", "--start-token"}, description = "Use -st to specify a token at which the repair range starts")
+        private String startToken = StringUtils.EMPTY;
+
+        @Option(title = "end_token", name = {"-et", "--end-token"}, description = "Use -et to specify a token at which repair range ends")
+        private String endToken = StringUtils.EMPTY;
+
+        @Arguments(usage = "[<keyspace> <tables>...]", description = "The keyspace followed by one or many tables")
+        private List<String> schemaArgs = new ArrayList<>();
+
+        protected void execute(NodeProbe probe)
+        {
+            System.out.println("Cleaning up data from completed sessions...");
+            List<CompositeData> compositeData = probe.getRepairServiceProxy().cleanupPending(schemaArgs, getRangeString(startToken, endToken), force);
+
+            List<CleanupSummary> summaries = new ArrayList<>(compositeData.size());
+            compositeData.forEach(cd -> summaries.add(CleanupSummary.fromComposite(cd)));
+
+            summaries.sort((l, r) -> {
+                int cmp = l.keyspace.compareTo(r.keyspace);
+                if (cmp != 0)
+                    return cmp;
+
+                return l.table.compareTo(r.table);
+            });
+
+            List<String> header = Lists.newArrayList("keyspace", "table", "successful sessions", "unsuccessful sessions");
+            List<List<String>> rows = new ArrayList<>(summaries.size() + 1);
+            rows.add(header);
+
+            boolean hasFailures = false;
+            for (CleanupSummary summary : summaries)
+            {
+                List<String> row = Lists.newArrayList(summary.keyspace,
+                                                      summary.table,
+                                                      Integer.toString(summary.successful.size()),
+                                                      Integer.toString(summary.unsuccessful.size()));
+
+                hasFailures |= !summary.unsuccessful.isEmpty();
+                rows.add(row);
+            }
+
+            if (hasFailures)
+                System.out.println("Some tables couldn't be cleaned up completely");
+
+            printTable(rows);
+        }
     }
 
-    protected void execute(NodeProbe probe)
+    @Command(name = "cancel", description = "cancel an incremental repair session." +
+                                            " Use --force to cancel from a node other than the repair coordinator" +
+                                            " Attempting to cancel FINALIZED or FAILED sessions is an error.")
+    public static class CancelCmd extends RepairAdmin
     {
-        if (list && cancel != null)
+        @Option(title = "force", name = {"-f", "--force"}, description = "Force a cancellation.")
+        private boolean force = false;
+
+        @Option(title = "session", name = {"-s", "--session"}, description = "The session to cancel", required = true)
+        private String sessionToCancel;
+
+        protected void execute(NodeProbe probe)
         {
-            throw new RuntimeException("Can either list, or cancel sessions, not both");
+            probe.getRepairServiceProxy().failSession(sessionToCancel, force);
         }
-        else if (cancel != null)
+    }
+
+    private static void printTable(List<List<String>> rows)
+    {
+        if (rows.isEmpty())
+            return;
+
+        // get max col widths
+        int[] widths = new int[rows.get(0).size()];
+        for (List<String> row : rows)
         {
-            cancelSession(probe.getRepairServiceProxy());
+            assert row.size() == widths.length;
+            for (int i = 0; i < widths.length; i++)
+            {
+                widths[i] = Math.max(widths[i], row.get(i).length());
+            }
         }
-        else
+
+        List<String> fmts = new ArrayList<>(widths.length);
+        for (int i = 0; i < widths.length; i++)
         {
-            // default
-            listSessions(probe.getRepairServiceProxy());
+            fmts.add("%-" + widths[i] + "s");
         }
+
+        // print
+        for (List<String> row : rows)
+        {
+            List<String> formatted = new ArrayList<>(row.size());
+            for (int i = 0; i < widths.length; i++)
+            {
+                formatted.add(String.format(fmts.get(i), row.get(i)));
+            }
+            System.out.println(Joiner.on(" | ").join(formatted));
+        }
+    }
+
+    static String getRangeString(String startToken, String endToken)
+    {
+        String rangeStr = null;
+        if (!startToken.isEmpty() || !endToken.isEmpty())
+            rangeStr = startToken + ':' + endToken;
+        return rangeStr;
     }
 }
diff --git a/test/unit/org/apache/cassandra/dht/RangeTest.java b/test/unit/org/apache/cassandra/dht/RangeTest.java
index 29b120b..68113f0 100644
--- a/test/unit/org/apache/cassandra/dht/RangeTest.java
+++ b/test/unit/org/apache/cassandra/dht/RangeTest.java
@@ -27,7 +27,9 @@ import java.util.Random;
 import java.util.Set;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -723,4 +725,19 @@ public class RangeTest
         Range<Token> r1 = r(20, -5);
         assertNotSame(r0.compareTo(r1), r1.compareTo(r0));
     }
+
+    @Test
+    public void testGroupIntersect()
+    {
+        assertTrue(Range.intersects(asList(r(1, 5), r(10, 15)), asList(r(4, 6), r(20, 25))));
+        assertFalse(Range.intersects(asList(r(1, 5), r(10, 15)), asList(r(6, 7), r(20, 25))));
+    }
+
+    @Test
+    public void testGroupSubtract()
+    {
+        Collection<Range<Token>> ranges = Sets.newHashSet(r(1, 5), r(10, 15));
+        assertEquals(ranges, Range.subtract(ranges, asList(r(6, 7), r(20, 25))));
+        assertEquals(Sets.newHashSet(r(1, 4), r(11, 15)), Range.subtract(ranges, asList(r(4, 7), r(8, 11))));
+    }
 }
diff --git a/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
index 2c47137..d57607a 100644
--- a/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
+++ b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
@@ -71,6 +71,11 @@ public abstract class AbstractRepairTest
         return DatabaseDescriptor.getPartitioner().getToken(ByteBufferUtil.bytes(v));
     }
 
+    protected static Range<Token> r(int l, int r)
+    {
+        return new Range<>(t(l), t(r));
+    }
+
     protected static final Range<Token> RANGE1 = new Range<>(t(1), t(2));
     protected static final Range<Token> RANGE2 = new Range<>(t(2), t(3));
     protected static final Range<Token> RANGE3 = new Range<>(t(4), t(5));
diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
index 15fd1fc..cb420b7 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
@@ -72,6 +72,8 @@ import static org.apache.cassandra.repair.consistent.ConsistentSession.State.*;
 
 public class LocalSessionTest extends AbstractRepairTest
 {
+    private static final UUID TID1 = UUIDGen.getTimeUUID();
+    private static final UUID TID2 = UUIDGen.getTimeUUID();
 
     static LocalSession.Builder createBuilder()
     {
@@ -79,7 +81,7 @@ public class LocalSessionTest extends AbstractRepairTest
         builder.withState(PREPARING);
         builder.withSessionID(UUIDGen.getTimeUUID());
         builder.withCoordinator(COORDINATOR);
-        builder.withUUIDTableIds(Sets.newHashSet(UUIDGen.getTimeUUID(), UUIDGen.getTimeUUID()));
+        builder.withUUIDTableIds(Sets.newHashSet(TID1, TID2));
         builder.withRepairedAt(System.currentTimeMillis());
         builder.withRanges(Sets.newHashSet(RANGE1, RANGE2, RANGE3));
         builder.withParticipants(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3));
@@ -870,6 +872,7 @@ public class LocalSessionTest extends AbstractRepairTest
     {
         LocalSession.Builder builder = createBuilder();
         builder.withStartedAt(started);
+        builder.withRepairedAt(started);
         builder.withLastUpdate(updated);
         return builder.build();
     }
@@ -940,11 +943,26 @@ public class LocalSessionTest extends AbstractRepairTest
 
         sessions.cleanup();
 
+        // failed session should be gone, but finalized should not, since it hasn't been superseded
         Assert.assertNull(sessions.getSession(failed.sessionID));
-        Assert.assertNull(sessions.getSession(finalized.sessionID));
+        Assert.assertNotNull(sessions.getSession(finalized.sessionID));
 
         Assert.assertNull(sessions.loadUnsafe(failed.sessionID));
+        Assert.assertNotNull(sessions.loadUnsafe(finalized.sessionID));
+
+        // add a finalized superseding session
+        LocalSession superseding = sessionWithTime(time, time + 1);
+        superseding.setState(FINALIZED);
+        sessions.putSessionUnsafe(superseding);
+
+        sessions.cleanup();
+
+        // old finalized should be removed, superseding should still be there
+        Assert.assertNull(sessions.getSession(finalized.sessionID));
+        Assert.assertNotNull(sessions.getSession(superseding.sessionID));
+
         Assert.assertNull(sessions.loadUnsafe(finalized.sessionID));
+        Assert.assertNotNull(sessions.loadUnsafe(superseding.sessionID));
     }
 
     /**
diff --git a/test/unit/org/apache/cassandra/repair/consistent/PendingRepairStatTest.java b/test/unit/org/apache/cassandra/repair/consistent/PendingRepairStatTest.java
new file mode 100644
index 0000000..6c42724
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/consistent/PendingRepairStatTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.repair.AbstractRepairTest;
+import org.apache.cassandra.repair.consistent.LocalSessionTest.InstrumentedLocalSessions;
+import org.apache.cassandra.repair.consistent.admin.PendingStats;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.repair.consistent.ConsistentSession.State.FAILED;
+import static org.apache.cassandra.repair.consistent.ConsistentSession.State.FINALIZED;
+import static org.apache.cassandra.repair.consistent.ConsistentSession.State.PREPARING;
+import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
+import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
+
+public class PendingRepairStatTest extends AbstractRepairTest
+{
+    private static TableMetadata cfm;
+    private static ColumnFamilyStore cfs;
+
+    private static Range<Token> FULL_RANGE;
+    private static IPartitioner partitioner;
+
+    static
+    {
+        DatabaseDescriptor.daemonInitialization();
+        partitioner = DatabaseDescriptor.getPartitioner();
+        assert partitioner instanceof ByteOrderedPartitioner;
+        FULL_RANGE = new Range<>(DatabaseDescriptor.getPartitioner().getMinimumToken(),
+                                 DatabaseDescriptor.getPartitioner().getMinimumToken());
+    }
+
+    @BeforeClass
+    public static void setupClass()
+    {
+        SchemaLoader.prepareServer();
+        cfm = CreateTableStatement.parse("CREATE TABLE tbl (k INT PRIMARY KEY, v INT)", "coordinatorsessiontest").build();
+        SchemaLoader.createKeyspace("coordinatorsessiontest", KeyspaceParams.simple(1), cfm);
+        cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id);
+    }
+
+    @Before
+    public void setUp() throws Exception
+    {
+        cfs.enableAutoCompaction();
+    }
+
+    static LocalSession createSession()
+    {
+        LocalSession.Builder builder = LocalSession.builder();
+        builder.withState(PREPARING);
+        builder.withSessionID(UUIDGen.getTimeUUID());
+        builder.withCoordinator(COORDINATOR);
+        builder.withUUIDTableIds(Sets.newHashSet(cfm.id.asUUID()));
+        builder.withRepairedAt(System.currentTimeMillis());
+        builder.withRanges(Collections.singleton(FULL_RANGE));
+        builder.withParticipants(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3));
+
+        int now = FBUtilities.nowInSeconds();
+        builder.withStartedAt(now);
+        builder.withLastUpdate(now);
+
+        return builder.build();
+    }
+
+    private static SSTableReader createSSTable(int startKey, int keys)
+    {
+        Set<SSTableReader> existing = cfs.getLiveSSTables();
+        assert keys > 0;
+        for (int i=0; i<keys; i++)
+        {
+            int key = startKey + i;
+            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", cfm.keyspace, cfm.name), key, key);
+        }
+        cfs.forceBlockingFlush();
+        return Iterables.getOnlyElement(Sets.difference(cfs.getLiveSSTables(), existing));
+    }
+
+    private static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair)
+    {
+        try
+        {
+            cfs.getCompactionStrategyManager().mutateRepaired(Collections.singleton(sstable), repairedAt, pendingRepair, false);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void pendingRepairStats()
+    {
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+        sessions.start();
+        cfs.disableAutoCompaction();
+        SSTableReader sstable1 = createSSTable(0, 10);
+        SSTableReader sstable2 = createSSTable(10, 10);
+        SSTableReader sstable3 = createSSTable(10, 20);
+
+        LocalSession session1 = createSession();
+        sessions.putSessionUnsafe(session1);
+        LocalSession session2 = createSession();
+        sessions.putSessionUnsafe(session2);
+
+        PendingStats stats;
+        stats = sessions.getPendingStats(cfm.id, Collections.singleton(FULL_RANGE));
+        Assert.assertEquals(0, stats.total.numSSTables);
+
+        // set all sstables to pending
+        mutateRepaired(sstable1, UNREPAIRED_SSTABLE, session1.sessionID);
+        mutateRepaired(sstable2, UNREPAIRED_SSTABLE, session2.sessionID);
+        mutateRepaired(sstable3, UNREPAIRED_SSTABLE, session2.sessionID);
+
+        stats = sessions.getPendingStats(cfm.id, Collections.singleton(FULL_RANGE));
+        Assert.assertEquals(Sets.newHashSet(session1.sessionID, session2.sessionID), stats.total.sessions);
+        Assert.assertEquals(3, stats.total.numSSTables);
+        Assert.assertEquals(3, stats.pending.numSSTables);
+        Assert.assertEquals(0, stats.failed.numSSTables);
+        Assert.assertEquals(0, stats.finalized.numSSTables);
+
+        // set the 2 sessions to failed and finalized
+        session1.setState(FAILED);
+        sessions.save(session1);
+        session2.setState(FINALIZED);
+        sessions.save(session2);
+
+        stats = sessions.getPendingStats(cfm.id, Collections.singleton(FULL_RANGE));
+        Assert.assertEquals(3, stats.total.numSSTables);
+        Assert.assertEquals(0, stats.pending.numSSTables);
+        Assert.assertEquals(1, stats.failed.numSSTables);
+        Assert.assertEquals(2, stats.finalized.numSSTables);
+
+        // remove sstables from pending sets
+        mutateRepaired(sstable1, UNREPAIRED_SSTABLE, NO_PENDING_REPAIR);
+        mutateRepaired(sstable2, session2.repairedAt, NO_PENDING_REPAIR);
+        mutateRepaired(sstable3, session2.repairedAt, NO_PENDING_REPAIR);
+
+        stats = sessions.getPendingStats(cfm.id, Collections.singleton(FULL_RANGE));
+        Assert.assertTrue(stats.total.sessions.isEmpty());
+    }
+}
diff --git a/test/unit/org/apache/cassandra/repair/consistent/RepairStateTest.java b/test/unit/org/apache/cassandra/repair/consistent/RepairStateTest.java
new file mode 100644
index 0000000..db425de
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/consistent/RepairStateTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+import static org.apache.cassandra.repair.consistent.RepairedState.getRepairedStats;
+
+public class RepairStateTest
+{
+    private static Token tk(long t)
+    {
+        return new Murmur3Partitioner.LongToken(t);
+    }
+
+    private static Range<Token> range(long left, long right)
+    {
+        return new Range<>(tk(left), tk(right));
+    }
+
+    private static List<Range<Token>> ranges(long... tokens)
+    {
+        assert tokens.length %2 == 0;
+        List<Range<Token>> ranges = new ArrayList<>();
+        for (int i=0; i<tokens.length; i+=2)
+        {
+            ranges.add(range(tokens[i], tokens[i+1]));
+
+        }
+        return ranges;
+    }
+
+    private static RepairedState.Level level(Collection<Range<Token>> ranges, long repairedAt)
+    {
+        return new RepairedState.Level(ranges, repairedAt);
+    }
+
+    private static RepairedState.Section sect(Range<Token> range, long repairedAt)
+    {
+        return new RepairedState.Section(range, repairedAt);
+    }
+
+    private static RepairedState.Section sect(int l, int r, long time)
+    {
+        return sect(range(l, r), time);
+    }
+
+    private static <T> List<T> l(T... contents)
+    {
+        return Lists.newArrayList(contents);
+    }
+
+    @Test
+    public void mergeOverlapping()
+    {
+        RepairedState repairs = new RepairedState();
+
+        repairs.add(ranges(100, 300), 5);
+        repairs.add(ranges(200, 400), 6);
+
+        RepairedState.State state = repairs.state();
+        Assert.assertEquals(l(level(ranges(200, 400), 6), level(ranges(100, 200), 5)), state.levels);
+        Assert.assertEquals(l(sect(range(100, 200), 5), sect(range(200, 400), 6)), state.sections);
+        Assert.assertEquals(ranges(100, 400), state.covered);
+    }
+
+    @Test
+    public void mergeSameRange()
+    {
+        RepairedState repairs = new RepairedState();
+
+        repairs.add(ranges(100, 400), 5);
+        repairs.add(ranges(100, 400), 6);
+
+        RepairedState.State state = repairs.state();
+        Assert.assertEquals(l(level(ranges(100, 400), 6)), state.levels);
+        Assert.assertEquals(l(sect(range(100, 400), 6)), state.sections);
+        Assert.assertEquals(ranges(100, 400), state.covered);
+    }
+
+    @Test
+    public void mergeLargeRange()
+    {
+        RepairedState repairs = new RepairedState();
+
+        repairs.add(ranges(200, 300), 5);
+        repairs.add(ranges(100, 400), 6);
+
+        RepairedState.State state = repairs.state();
+        Assert.assertEquals(l(level(ranges(100, 400), 6)), state.levels);
+        Assert.assertEquals(l(sect(range(100, 400), 6)), state.sections);
+        Assert.assertEquals(ranges(100, 400), state.covered);
+    }
+
+    @Test
+    public void mergeSmallRange()
+    {
+        RepairedState repairs = new RepairedState();
+
+        repairs.add(ranges(100, 400), 5);
+        repairs.add(ranges(200, 300), 6);
+
+        RepairedState.State state = repairs.state();
+        Assert.assertEquals(l(level(ranges(200, 300), 6), level(ranges(100, 200, 300, 400), 5)), state.levels);
+        Assert.assertEquals(l(sect(range(100, 200), 5), sect(range(200, 300), 6), sect(range(300, 400), 5)), state.sections);
+        Assert.assertEquals(ranges(100, 400), state.covered);
+    }
+
+
+    @Test
+    public void repairedAt()
+    {
+        RepairedState rs;
+
+        // overlapping
+        rs = new RepairedState();
+        rs.add(ranges(100, 300), 5);
+        rs.add(ranges(200, 400), 6);
+
+        Assert.assertEquals(5, rs.minRepairedAt(ranges(150, 250)));
+        Assert.assertEquals(5, rs.minRepairedAt(ranges(150, 160)));
+        Assert.assertEquals(5, rs.minRepairedAt(ranges(100, 200)));
+        Assert.assertEquals(6, rs.minRepairedAt(ranges(200, 400)));
+        Assert.assertEquals(0, rs.minRepairedAt(ranges(200, 401)));
+        Assert.assertEquals(0, rs.minRepairedAt(ranges(99, 200)));
+        Assert.assertEquals(0, rs.minRepairedAt(ranges(50, 450)));
+        Assert.assertEquals(0, rs.minRepairedAt(ranges(50, 60)));
+        Assert.assertEquals(0, rs.minRepairedAt(ranges(450, 460)));
+    }
+
+    @Test
+    public void stats()
+    {
+        Assert.assertEquals(l(sect(100, 200, 5), sect(200, 300, 0), sect(300, 400, 5)),
+                            getRepairedStats(l(sect(100, 200, 5), sect(300, 400, 5)), ranges(100, 400)));
+
+        Assert.assertEquals(l(sect(100, 200, 0), sect(200, 300, 5), sect(300, 400, 0)),
+                            getRepairedStats(l(sect(200, 300, 5)), ranges(100, 400)));
+
+        Assert.assertEquals(l(sect(200, 300, 5)), getRepairedStats(l(sect(200, 300, 5)), ranges(200, 300)));
+    }
+}
diff --git a/test/unit/org/apache/cassandra/repair/consistent/admin/SchemaArgsParserTest.java b/test/unit/org/apache/cassandra/repair/consistent/admin/SchemaArgsParserTest.java
new file mode 100644
index 0000000..9d98c9d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/consistent/admin/SchemaArgsParserTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent.admin;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.assertj.core.util.Lists;
+
+public class SchemaArgsParserTest
+{
+    private static final String KEYSPACE = "schemaargsparsertest";
+    private static final int NUM_TBL = 3;
+    private static TableMetadata[] cfm = new TableMetadata[NUM_TBL];
+    private static ColumnFamilyStore[] cfs = new ColumnFamilyStore[NUM_TBL];
+
+    @BeforeClass
+    public static void setupClass()
+    {
+        SchemaLoader.prepareServer();
+        for (int i=0; i<NUM_TBL; i++)
+            cfm[i] = CreateTableStatement.parse("CREATE TABLE tbl" + i + " (k INT PRIMARY KEY, v INT)", KEYSPACE).build();
+        SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), cfm);
+        for (int i=0; i<NUM_TBL; i++)
+            cfs[i] = Schema.instance.getColumnFamilyStoreInstance(cfm[i].id);
+    }
+
+    /**
+     * Specifying only the keyspace should return all tables in that keyspaces
+     */
+    @Test
+    public void keyspaceOnly()
+    {
+        Set<ColumnFamilyStore> tables = Sets.newHashSet(SchemaArgsParser.parse(KEYSPACE));
+        Assert.assertEquals(Sets.newHashSet(cfs), tables);
+    }
+
+    @Test
+    public void someTables()
+    {
+        Set<ColumnFamilyStore> tables = Sets.newHashSet(SchemaArgsParser.parse(KEYSPACE, "tbl1", "tbl2"));
+        Assert.assertEquals(Sets.newHashSet(cfs[1], cfs[2]), tables);
+    }
+
+    @Test
+    public void noKeyspace()
+    {
+        Set<ColumnFamilyStore> allTables = Sets.newHashSet(SchemaArgsParser.parse().iterator());
+        Assert.assertTrue(allTables.containsAll(Sets.newHashSet(cfs)));
+    }
+
+    @Test( expected = IllegalArgumentException.class )
+    public void invalidKeyspace()
+    {
+        Sets.newHashSet(SchemaArgsParser.parse("SOME_KEYSPACE"));
+    }
+
+    @Test( expected = IllegalArgumentException.class )
+    public void invalidTables()
+    {
+        Set<ColumnFamilyStore> tables = Sets.newHashSet(SchemaArgsParser.parse(KEYSPACE, "sometable"));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org