You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2021/11/13 01:00:00 UTC

[cassandra] branch trunk updated: Refactor normal/preview/IR repair to standardize repair cleanup and error handling of failed RepairJobs

This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b84ec51  Refactor normal/preview/IR repair to standardize repair cleanup and error handling of failed RepairJobs
b84ec51 is described below

commit b84ec51b4c73d7b9b58ea8a1708ae2f330972970
Author: David Capwell <dc...@apache.org>
AuthorDate: Fri Nov 12 15:47:44 2021 -0800

    Refactor normal/preview/IR repair to standardize repair cleanup and error handling of failed RepairJobs
    
    patch by David Capwell; reviewed by Marcus Eriksson for CASSANDRA-17069
---
 CHANGES.txt                                        |   1 +
 checkstyle.xml                                     |  22 +-
 .../org/apache/cassandra/cache/CaffeineCache.java  |   3 +-
 .../org/apache/cassandra/cache/ChunkCache.java     |   3 +-
 .../apache/cassandra/cache/SerializingCache.java   |   3 +-
 .../org/apache/cassandra/cql3/QueryProcessor.java  |   3 +-
 .../cassandra/db/repair/PendingAntiCompaction.java |   2 +-
 .../index/sasi/analyzer/filter/StemmerFactory.java |   3 +-
 .../sasi/analyzer/filter/StopWordFactory.java      |   3 +-
 .../cassandra/metrics/HintedHandoffMetrics.java    |   5 +-
 .../cassandra/metrics/HintsServiceMetrics.java     |   3 +-
 .../cassandra/repair/AbstractRepairTask.java       | 122 +++++++
 .../cassandra/repair/CoordinatedRepairResult.java  | 108 ++++++
 .../cassandra/repair/IncrementalRepairTask.java    |  75 ++++
 .../apache/cassandra/repair/NormalRepairTask.java  |  57 +++
 .../apache/cassandra/repair/PreviewRepairTask.java | 145 ++++++++
 .../org/apache/cassandra/repair/RepairJob.java     |   4 +-
 ...airFailedException.java => RepairNotifier.java} |  14 +-
 .../apache/cassandra/repair/RepairRunnable.java    | 391 +++------------------
 ...eRepairFailedException.java => RepairTask.java} |  32 +-
 .../repair/SomeRepairFailedException.java          |   5 +
 .../repair/consistent/CoordinatorSession.java      | 138 +++-----
 .../repair/consistent/SyncStatSummary.java         |   7 +-
 .../apache/cassandra/security/CipherFactory.java   |   3 +-
 .../cassandra/service/ActiveRepairService.java     |  15 +-
 .../cassandra/utils/concurrent/AbstractFuture.java |  44 ++-
 .../cassandra/utils/concurrent/AsyncFuture.java    |  15 +-
 .../apache/cassandra/utils/concurrent/Future.java  |  30 +-
 .../cassandra/utils/concurrent/ListenerList.java   |  33 +-
 .../cassandra/utils/concurrent/SyncFuture.java     |  15 +-
 .../test/AbstractNetstatsBootstrapStreaming.java   |   4 +-
 .../distributed/test/repair/ForceRepairTest.java   | 170 +++++++++
 .../consistent/CoordinatorMessagingTest.java       |  36 +-
 .../repair/consistent/CoordinatorSessionTest.java  |  61 ++--
 .../utils/concurrent/AbstractTestAsyncPromise.java |  83 +++--
 35 files changed, 1082 insertions(+), 576 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index fb97fec..12848af 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Refactor normal/preview/IR repair to standardize repair cleanup and error handling of failed RepairJobs (CASSANDRA-17069)
  * Log missing peers in StartupClusterConnectivityChecker (CASSANDRA-17130)
  * Introduce separate rate limiting settings for entire SSTable streaming (CASSANDRA-17065)
  * Implement Virtual Tables for Auth Caches (CASSANDRA-16914)
diff --git a/checkstyle.xml b/checkstyle.xml
index 4bc31dd..19bfa86 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -51,14 +51,30 @@
  
     <module name="SuppressWithNearbyCommentFilter">
        <property name="commentFormat" value="checkstyle: permit system clock"/>
-       <property name="checkFormat" value="RegexpSinglelineJava"/>
+       <property name="idFormat" value="blockSystemClock"/>
        <property name="influenceFormat" value="0"/>
     </module>
  
     <module name="RegexpSinglelineJava">
-      <!-- To prevent static imports and System.nanoTime or System.currentTimeMillis -->
-      <property name="format" value="(newSingleThreadExecutor|newFixedThreadPool|newCachedThreadPool|newSingleThreadScheduledExecutor|newWorkStealingPool|newScheduledThreadPool|defaultThreadFactory)\(|System\.(currentTimeMillis|nanoTime)"/>
+      <!-- block system time -->
+      <property name="id" value="blockSystemClock"/>
+      <property name="format" value="System\.(currentTimeMillis|nanoTime)"/>
       <property name="ignoreComments" value="true"/>
+      <property name="message" value="Avoid System for time, should use org.apache.cassandra.utils.Clock.Global or org.apache.cassandra.utils.Clock interface" />
+    </module>
+    <module name="RegexpSinglelineJava">
+      <!-- block normal executors -->
+      <property name="id" value="blockExecutors"/>
+      <property name="format" value="newSingleThreadExecutor|newFixedThreadPool|newCachedThreadPool|newSingleThreadScheduledExecutor|newWorkStealingPool|newScheduledThreadPool|defaultThreadFactory"/>
+      <property name="ignoreComments" value="true"/>
+      <property name="message" value="Avoid creating an executor directly, should use org.apache.cassandra.concurrent.ExecutorFactory.Global#executorFactory" />
+    </module>
+    <module name="RegexpSinglelineJava">
+      <!-- block guavas directExecutor -->
+      <property name="id" value="blockGuavaDirectExecutor"/>
+      <property name="format" value="MoreExecutors\.directExecutor"/>
+      <property name="ignoreComments" value="true"/>
+      <property name="message" value="Avoid MoreExecutors.directExecutor() in favor of ImmediateExecutor.INSTANCE" />
     </module>
     <module name="IllegalImport">
       <property name="illegalPkgs" value=""/>
diff --git a/src/java/org/apache/cassandra/cache/CaffeineCache.java b/src/java/org/apache/cassandra/cache/CaffeineCache.java
index d51ea84..b01093f 100644
--- a/src/java/org/apache/cassandra/cache/CaffeineCache.java
+++ b/src/java/org/apache/cassandra/cache/CaffeineCache.java
@@ -27,6 +27,7 @@ import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.Policy.Eviction;
 import com.github.benmanes.caffeine.cache.Weigher;
+import org.apache.cassandra.concurrent.ImmediateExecutor;
 
 /**
  * An adapter from a Caffeine cache to the ICache interface. This provides an on-heap cache using
@@ -54,7 +55,7 @@ public class CaffeineCache<K extends IMeasurableMemory, V extends IMeasurableMem
         Cache<K, V> cache = Caffeine.newBuilder()
                 .maximumWeight(weightedCapacity)
                 .weigher(weigher)
-                .executor(MoreExecutors.directExecutor())
+                .executor(ImmediateExecutor.INSTANCE)
                 .build();
         return new CaffeineCache<>(cache);
     }
diff --git a/src/java/org/apache/cassandra/cache/ChunkCache.java b/src/java/org/apache/cassandra/cache/ChunkCache.java
index c53810a..397850a 100644
--- a/src/java/org/apache/cassandra/cache/ChunkCache.java
+++ b/src/java/org/apache/cassandra/cache/ChunkCache.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.MoreExecutors;
 
 import com.github.benmanes.caffeine.cache.*;
+import org.apache.cassandra.concurrent.ImmediateExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.util.*;
@@ -143,7 +144,7 @@ public class ChunkCache
         metrics = new ChunkCacheMetrics(this);
         cache = Caffeine.newBuilder()
                         .maximumWeight(cacheSize)
-                        .executor(MoreExecutors.directExecutor())
+                        .executor(ImmediateExecutor.INSTANCE)
                         .weigher((key, buffer) -> ((Buffer) buffer).buffer.capacity())
                         .removalListener(this)
                         .recordStats(() -> metrics)
diff --git a/src/java/org/apache/cassandra/cache/SerializingCache.java b/src/java/org/apache/cassandra/cache/SerializingCache.java
index 55c20ec..cd028e2 100644
--- a/src/java/org/apache/cassandra/cache/SerializingCache.java
+++ b/src/java/org/apache/cassandra/cache/SerializingCache.java
@@ -21,6 +21,7 @@ import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.Weigher;
 
+import org.apache.cassandra.concurrent.ImmediateExecutor;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.util.MemoryInputStream;
 import org.apache.cassandra.io.util.MemoryOutputStream;
@@ -51,7 +52,7 @@ public class SerializingCache<K, V> implements ICache<K, V>
         this.cache = Caffeine.newBuilder()
                    .weigher(weigher)
                    .maximumWeight(capacity)
-                   .executor(MoreExecutors.directExecutor())
+                   .executor(ImmediateExecutor.INSTANCE)
                    .removalListener((key, mem, cause) -> {
                        if (cause.wasEvicted()) {
                            mem.unreference();
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index eea8336..ed99861 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.antlr.runtime.*;
+import org.apache.cassandra.concurrent.ImmediateExecutor;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.metrics.ClientRequestMetrics;
@@ -97,7 +98,7 @@ public class QueryProcessor implements QueryHandler
     static
     {
         preparedStatements = Caffeine.newBuilder()
-                             .executor(MoreExecutors.directExecutor())
+                             .executor(ImmediateExecutor.INSTANCE)
                              .maximumWeight(capacityToBytes(DatabaseDescriptor.getPreparedStatementsCacheSizeMB()))
                              .weigher(QueryProcessor::measure)
                              .removalListener((key, prepared, cause) -> {
diff --git a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
index 59eff55..0007bbe 100644
--- a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
+++ b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
@@ -372,7 +372,7 @@ public class PendingAntiCompaction
         }
 
         Future<List<AcquireResult>> acquisitionResults = FutureCombiner.successfulOf(tasks);
-        return acquisitionResults.andThenAsync(getAcquisitionCallback(prsId, tokenRanges));
+        return acquisitionResults.flatMap(getAcquisitionCallback(prsId, tokenRanges));
     }
 
     @VisibleForTesting
diff --git a/src/java/org/apache/cassandra/index/sasi/analyzer/filter/StemmerFactory.java b/src/java/org/apache/cassandra/index/sasi/analyzer/filter/StemmerFactory.java
index d278c28..17f1512 100644
--- a/src/java/org/apache/cassandra/index/sasi/analyzer/filter/StemmerFactory.java
+++ b/src/java/org/apache/cassandra/index/sasi/analyzer/filter/StemmerFactory.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import com.google.common.util.concurrent.MoreExecutors;
 
+import org.apache.cassandra.concurrent.ImmediateExecutor;
 import org.tartarus.snowball.SnowballStemmer;
 import org.tartarus.snowball.ext.*;
 
@@ -42,7 +43,7 @@ public class StemmerFactory
 {
     private static final Logger logger = LoggerFactory.getLogger(StemmerFactory.class);
     private static final LoadingCache<Class, Constructor<?>> STEMMER_CONSTRUCTOR_CACHE = Caffeine.newBuilder()
-            .executor(MoreExecutors.directExecutor())
+            .executor(ImmediateExecutor.INSTANCE)
             .build(new CacheLoader<Class, Constructor<?>>()
             {
                 public Constructor<?> load(Class aClass) throws Exception
diff --git a/src/java/org/apache/cassandra/index/sasi/analyzer/filter/StopWordFactory.java b/src/java/org/apache/cassandra/index/sasi/analyzer/filter/StopWordFactory.java
index 56c07f3..434fbda 100644
--- a/src/java/org/apache/cassandra/index/sasi/analyzer/filter/StopWordFactory.java
+++ b/src/java/org/apache/cassandra/index/sasi/analyzer/filter/StopWordFactory.java
@@ -32,6 +32,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 
+import org.apache.cassandra.concurrent.ImmediateExecutor;
 import org.apache.cassandra.io.util.File;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,7 +52,7 @@ public class StopWordFactory
             "pl","pt","ro","ru","sv"));
 
     private static final LoadingCache<String, Set<String>> STOP_WORDS_CACHE = Caffeine.newBuilder()
-            .executor(MoreExecutors.directExecutor())
+            .executor(ImmediateExecutor.INSTANCE)
             .build(StopWordFactory::getStopWordsFromResource);
 
     public static Set<String> getStopWordsForLanguage(Locale locale)
diff --git a/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java b/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java
index 0261e8e..2a5fa9a 100644
--- a/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java
@@ -25,6 +25,7 @@ import com.codahale.metrics.Counter;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 
+import org.apache.cassandra.concurrent.ImmediateExecutor;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.UUIDGen;
@@ -44,12 +45,12 @@ public class HintedHandoffMetrics
 
     /** Total number of hints which are not stored, This is not a cache. */
     private final LoadingCache<InetAddressAndPort, DifferencingCounter> notStored = Caffeine.newBuilder()
-                                                                                            .executor(MoreExecutors.directExecutor())
+                                                                                            .executor(ImmediateExecutor.INSTANCE)
                                                                                             .build(DifferencingCounter::new);
 
     /** Total number of hints that have been created, This is not a cache. */
     private final LoadingCache<InetAddressAndPort, Counter> createdHintCounts = Caffeine.newBuilder()
-                                                                                        .executor(MoreExecutors.directExecutor())
+                                                                                        .executor(ImmediateExecutor.INSTANCE)
                                                                                         .build(address -> Metrics.counter(factory.createMetricName("Hints_created-" + address.toString().replace(':', '.'))));
 
     public void incrCreatedHints(InetAddressAndPort address)
diff --git a/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java b/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java
index 424f502..50defd9 100644
--- a/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java
@@ -25,6 +25,7 @@ import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
+import org.apache.cassandra.concurrent.ImmediateExecutor;
 import org.apache.cassandra.locator.InetAddressAndPort;
 
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
@@ -47,7 +48,7 @@ public final class HintsServiceMetrics
 
     /** Histograms per-endpoint of hint delivery delays, This is not a cache. */
     private static final LoadingCache<InetAddressAndPort, Histogram> delayByEndpoint = Caffeine.newBuilder()
-                                                                                               .executor(MoreExecutors.directExecutor())
+                                                                                               .executor(ImmediateExecutor.INSTANCE)
                                                                                                .build(address -> Metrics.histogram(factory.createMetricName("Hint_delays-"+address.toString().replace(':', '.')), false));
 
     public static void updateDelayMetrics(InetAddressAndPort endpoint, long delay)
diff --git a/src/java/org/apache/cassandra/repair/AbstractRepairTask.java b/src/java/org/apache/cassandra/repair/AbstractRepairTask.java
new file mode 100644
index 0000000..c729cda
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/AbstractRepairTask.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.FutureCombiner;
+
+public abstract class AbstractRepairTask implements RepairTask
+{
+    protected static final Logger logger = LoggerFactory.getLogger(AbstractRepairTask.class);
+
+    protected final RepairOption options;
+    protected final String keyspace;
+    protected final RepairNotifier notifier;
+
+    protected AbstractRepairTask(RepairOption options, String keyspace, RepairNotifier notifier)
+    {
+        this.options = Objects.requireNonNull(options);
+        this.keyspace = Objects.requireNonNull(keyspace);
+        this.notifier = Objects.requireNonNull(notifier);
+    }
+
+    private List<RepairSession> submitRepairSessions(UUID parentSession,
+                                                     boolean isIncremental,
+                                                     ExecutorPlus executor,
+                                                     List<CommonRange> commonRanges,
+                                                     String... cfnames)
+    {
+        List<RepairSession> futures = new ArrayList<>(options.getRanges().size());
+
+        for (CommonRange commonRange : commonRanges)
+        {
+            logger.info("Starting RepairSession for {}", commonRange);
+            RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession,
+                                                                                     commonRange,
+                                                                                     keyspace,
+                                                                                     options.getParallelism(),
+                                                                                     isIncremental,
+                                                                                     options.isPullRepair(),
+                                                                                     options.getPreviewKind(),
+                                                                                     options.optimiseStreams(),
+                                                                                     executor,
+                                                                                     cfnames);
+            if (session == null)
+                continue;
+            session.addCallback(new RepairSessionCallback(session));
+            futures.add(session);
+        }
+        return futures;
+    }
+
+    protected Future<CoordinatedRepairResult> runRepair(UUID parentSession,
+                                                        boolean isIncremental,
+                                                        ExecutorPlus executor,
+                                                        List<CommonRange> commonRanges,
+                                                        String... cfnames)
+    {
+        List<RepairSession> allSessions = submitRepairSessions(parentSession, isIncremental, executor, commonRanges, cfnames);
+        List<Collection<Range<Token>>> ranges = Lists.transform(allSessions, RepairSession::ranges);
+        Future<List<RepairSessionResult>> f = FutureCombiner.successfulOf(allSessions);
+        return f.map(results -> {
+            logger.debug("Repair result: {}", results);
+            return CoordinatedRepairResult.create(ranges, results);
+        });
+    }
+
+    private class RepairSessionCallback implements FutureCallback<RepairSessionResult>
+    {
+        private final RepairSession session;
+
+        public RepairSessionCallback(RepairSession session)
+        {
+            this.session = session;
+        }
+
+        public void onSuccess(RepairSessionResult result)
+        {
+            String message = String.format("Repair session %s for range %s finished", session.getId(),
+                                           session.ranges().toString());
+            notifier.notifyProgress(message);
+        }
+
+        public void onFailure(Throwable t)
+        {
+            String message = String.format("Repair session %s for range %s failed with error %s",
+                                           session.getId(), session.ranges().toString(), t.getMessage());
+            notifier.notifyError(new RuntimeException(message, t));
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/repair/CoordinatedRepairResult.java b/src/java/org/apache/cassandra/repair/CoordinatedRepairResult.java
new file mode 100644
index 0000000..9593acc
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/CoordinatedRepairResult.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+public class CoordinatedRepairResult
+{
+    public final Collection<Range<Token>> successfulRanges;
+    public final Collection<Range<Token>> failedRanges;
+    public final Collection<Range<Token>> skippedRanges;
+    public final Optional<List<RepairSessionResult>> results;
+
+    public CoordinatedRepairResult(Collection<Range<Token>> successfulRanges,
+                                   Collection<Range<Token>> failedRanges,
+                                   Collection<Range<Token>> skippedRanges,
+                                   List<RepairSessionResult> results)
+    {
+        this.successfulRanges = successfulRanges != null ? ImmutableList.copyOf(successfulRanges) : Collections.emptyList();
+        this.failedRanges = failedRanges != null ? ImmutableList.copyOf(failedRanges) : Collections.emptyList();
+        this.skippedRanges = skippedRanges != null ? ImmutableList.copyOf(skippedRanges) : Collections.emptyList();
+        this.results = Optional.ofNullable(results);
+    }
+
+    public static CoordinatedRepairResult create(List<Collection<Range<Token>>> ranges, List<RepairSessionResult> results)
+    {
+        if (results == null || results.isEmpty())
+            // something went wrong; assume all sessions failed
+            return failed(ranges);
+
+        assert ranges.size() == results.size() : String.format("range size %d != results size %d;ranges: %s, results: %s", ranges.size(), results.size(), ranges, results);
+        Collection<Range<Token>> successfulRanges = new ArrayList<>();
+        Collection<Range<Token>> failedRanges = new ArrayList<>();
+        Collection<Range<Token>> skippedRanges = new ArrayList<>();
+        int index = 0;
+        for (RepairSessionResult sessionResult : results)
+        {
+            if (sessionResult != null)
+            {
+                // don't record successful repair if we had to skip ranges
+                Collection<Range<Token>> replicas = sessionResult.skippedReplicas ? skippedRanges : successfulRanges;
+                replicas.addAll(sessionResult.ranges);
+            }
+            else
+            {
+                // FutureCombiner.successfulOf doesn't keep track of the original, but maintains order, so
+                // can fetch the original session
+                failedRanges.addAll(Objects.requireNonNull(ranges.get(index)));
+            }
+            index++;
+        }
+        return new CoordinatedRepairResult(successfulRanges, failedRanges, skippedRanges, results);
+    }
+
+    private static CoordinatedRepairResult failed(@Nullable List<Collection<Range<Token>>> ranges)
+    {
+        Collection<Range<Token>> failedRanges = new ArrayList<>(ranges == null ? 0 : ranges.size());
+        if (ranges != null)
+            ranges.forEach(failedRanges::addAll);
+        return new CoordinatedRepairResult(null, failedRanges, null, null);
+    }
+
+    /**
+     * Utility method for tests to produce a success result; should only be used by tests as syntaxtic sugar as all
+     * results must be present else an error is thrown.
+     */
+    @VisibleForTesting
+    public static CoordinatedRepairResult success(List<RepairSessionResult> results)
+    {
+        assert results != null && results.stream().allMatch(a -> a != null) : String.format("results was null or had a null (failed) result: %s", results);
+        List<Collection<Range<Token>>> ranges = Lists.transform(results, a -> a.ranges);
+        return create(ranges, results);
+    }
+
+    public boolean hasFailed()
+    {
+        return !failedRanges.isEmpty();
+    }
+}
diff --git a/src/java/org/apache/cassandra/repair/IncrementalRepairTask.java b/src/java/org/apache/cassandra/repair/IncrementalRepairTask.java
new file mode 100644
index 0000000..c2951d5
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/IncrementalRepairTask.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.repair.consistent.CoordinatorSession;
+import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Future;
+
+public class IncrementalRepairTask extends AbstractRepairTask
+{
+    private final UUID parentSession;
+    private final RepairRunnable.NeighborsAndRanges neighborsAndRanges;
+    private final String[] cfnames;
+
+    protected IncrementalRepairTask(RepairOption options,
+                                    String keyspace,
+                                    RepairNotifier notifier,
+                                    UUID parentSession,
+                                    RepairRunnable.NeighborsAndRanges neighborsAndRanges,
+                                    String[] cfnames)
+    {
+        super(options, keyspace, notifier);
+        this.parentSession = parentSession;
+        this.neighborsAndRanges = neighborsAndRanges;
+        this.cfnames = cfnames;
+    }
+
+    @Override
+    public String name()
+    {
+        return "Repair";
+    }
+
+    @Override
+    public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor) throws Exception
+    {
+        // the local node also needs to be included in the set of participants, since coordinator sessions aren't persisted
+        Set<InetAddressAndPort> allParticipants = ImmutableSet.<InetAddressAndPort>builder()
+                                                  .addAll(neighborsAndRanges.participants)
+                                                  .add(FBUtilities.getBroadcastAddressAndPort())
+                                                  .build();
+        // Not necessary to include self for filtering. The common ranges only contains neighbhor node endpoints.
+        List<CommonRange> allRanges = neighborsAndRanges.filterCommonRanges(keyspace, cfnames);
+
+        CoordinatorSession coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession, allParticipants, neighborsAndRanges.shouldExcludeDeadParticipants);
+
+        return coordinatorSession.execute(() -> runRepair(parentSession, true, executor, allRanges, cfnames));
+
+    }
+}
diff --git a/src/java/org/apache/cassandra/repair/NormalRepairTask.java b/src/java/org/apache/cassandra/repair/NormalRepairTask.java
new file mode 100644
index 0000000..532271c
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/NormalRepairTask.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.utils.concurrent.Future;
+
+public class NormalRepairTask extends AbstractRepairTask
+{
+    private final UUID parentSession;
+    private final List<CommonRange> commonRanges;
+    private final String[] cfnames;
+
+    protected NormalRepairTask(RepairOption options,
+                               String keyspace,
+                               RepairNotifier notifier,
+                               UUID parentSession,
+                               List<CommonRange> commonRanges,
+                               String[] cfnames)
+    {
+        super(options, keyspace, notifier);
+        this.parentSession = parentSession;
+        this.commonRanges = commonRanges;
+        this.cfnames = cfnames;
+    }
+
+    @Override
+    public String name()
+    {
+        return "Repair";
+    }
+
+    @Override
+    public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor)
+    {
+        return runRepair(parentSession, false, executor, commonRanges, cfnames);
+    }
+}
diff --git a/src/java/org/apache/cassandra/repair/PreviewRepairTask.java b/src/java/org/apache/cassandra/repair/PreviewRepairTask.java
new file mode 100644
index 0000000..e2eb08c
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/PreviewRepairTask.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.RepairMetrics;
+import org.apache.cassandra.repair.consistent.SyncStatSummary;
+import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.DiagnosticSnapshotService;
+import org.apache.cassandra.utils.concurrent.Future;
+
+public class PreviewRepairTask extends AbstractRepairTask
+{
+    private final UUID parentSession;
+    private final List<CommonRange> commonRanges;
+    private final String[] cfnames;
+
+    protected PreviewRepairTask(RepairOption options, String keyspace, RepairNotifier notifier, UUID parentSession, List<CommonRange> commonRanges, String[] cfnames)
+    {
+        super(options, keyspace, notifier);
+        this.parentSession = parentSession;
+        this.commonRanges = commonRanges;
+        this.cfnames = cfnames;
+    }
+
+    @Override
+    public String name()
+    {
+        return "Repair preview";
+    }
+
+    @Override
+    public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor)
+    {
+        Future<CoordinatedRepairResult> f = runRepair(parentSession, false, executor, commonRanges, cfnames);
+        return f.map(result -> {
+            if (result.hasFailed())
+                return result;
+
+            PreviewKind previewKind = options.getPreviewKind();
+            Preconditions.checkState(previewKind != PreviewKind.NONE, "Preview is NONE");
+            SyncStatSummary summary = new SyncStatSummary(true);
+            summary.consumeSessionResults(result.results);
+
+            final String message;
+            if (summary.isEmpty())
+            {
+                message = previewKind == PreviewKind.REPAIRED ? "Repaired data is in sync" : "Previewed data was in sync";
+            }
+            else
+            {
+                message = (previewKind == PreviewKind.REPAIRED ? "Repaired data is inconsistent\n" : "Preview complete\n") + summary;
+                RepairMetrics.previewFailures.inc();
+                if (previewKind == PreviewKind.REPAIRED)
+                    maybeSnapshotReplicas(parentSession, keyspace, result.results.get()); // we know its present as summary used it
+            }
+            notifier.notification(message);
+
+            return result;
+        });
+    }
+
+    private void maybeSnapshotReplicas(UUID parentSession, String keyspace, List<RepairSessionResult> results)
+    {
+        if (!DatabaseDescriptor.snapshotOnRepairedDataMismatch())
+            return;
+
+        try
+        {
+            Set<String> mismatchingTables = new HashSet<>();
+            Set<InetAddressAndPort> nodes = new HashSet<>();
+            for (RepairSessionResult sessionResult : results)
+            {
+                for (RepairResult repairResult : emptyIfNull(sessionResult.repairJobResults))
+                {
+                    for (SyncStat stat : emptyIfNull(repairResult.stats))
+                    {
+                        if (stat.numberOfDifferences > 0)
+                            mismatchingTables.add(repairResult.desc.columnFamily);
+                        // snapshot all replicas, even if they don't have any differences
+                        nodes.add(stat.nodes.coordinator);
+                        nodes.add(stat.nodes.peer);
+                    }
+                }
+            }
+
+            String snapshotName = DiagnosticSnapshotService.getSnapshotName(DiagnosticSnapshotService.REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX);
+            for (String table : mismatchingTables)
+            {
+                // we can just check snapshot existence locally since the repair coordinator is always a replica (unlike in the read case)
+                if (!Keyspace.open(keyspace).getColumnFamilyStore(table).snapshotExists(snapshotName))
+                {
+                    logger.info("{} Snapshotting {}.{} for preview repair mismatch with tag {} on instances {}",
+                                options.getPreviewKind().logPrefix(parentSession),
+                                keyspace, table, snapshotName, nodes);
+                    DiagnosticSnapshotService.repairedDataMismatch(Keyspace.open(keyspace).getColumnFamilyStore(table).metadata(), nodes);
+                }
+                else
+                {
+                    logger.info("{} Not snapshotting {}.{} - snapshot {} exists",
+                                options.getPreviewKind().logPrefix(parentSession),
+                                keyspace, table, snapshotName);
+                }
+            }
+        }
+        catch (Exception e)
+        {
+            logger.error("{} Failed snapshotting replicas", options.getPreviewKind().logPrefix(parentSession), e);
+        }
+    }
+
+    private static <T> Iterable<T> emptyIfNull(Iterable<T> iter)
+    {
+        if (iter == null)
+            return Collections.emptyList();
+        return iter;
+    }
+}
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index a57ca84..c78cc73 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -135,7 +135,7 @@ public class RepairJob extends AsyncFuture<RepairResult> implements Runnable
             }
 
             // When all snapshot complete, send validation requests
-            treeResponses = allSnapshotTasks.andThenAsync(endpoints -> {
+            treeResponses = allSnapshotTasks.flatMap(endpoints -> {
                 if (parallelismDegree == RepairParallelism.SEQUENTIAL)
                     return sendSequentialValidationRequest(endpoints);
                 else
@@ -149,7 +149,7 @@ public class RepairJob extends AsyncFuture<RepairResult> implements Runnable
         }
 
         // When all validations complete, submit sync tasks
-        Future<List<SyncStat>> syncResults = treeResponses.andThenAsync(session.optimiseStreams && !session.pullRepair ? this::optimisedSyncing : this::standardSyncing, taskExecutor);
+        Future<List<SyncStat>> syncResults = treeResponses.flatMap(session.optimiseStreams && !session.pullRepair ? this::optimisedSyncing : this::standardSyncing, taskExecutor);
 
         // When all sync complete, set the final result
         syncResults.addCallback(new FutureCallback<List<SyncStat>>()
diff --git a/src/java/org/apache/cassandra/repair/SomeRepairFailedException.java b/src/java/org/apache/cassandra/repair/RepairNotifier.java
similarity index 61%
copy from src/java/org/apache/cassandra/repair/SomeRepairFailedException.java
copy to src/java/org/apache/cassandra/repair/RepairNotifier.java
index 4b077b8..977bc4e 100644
--- a/src/java/org/apache/cassandra/repair/SomeRepairFailedException.java
+++ b/src/java/org/apache/cassandra/repair/RepairNotifier.java
@@ -15,17 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.repair;
 
-/**
- * This is a special exception which states "I know something failed but I don't have access to the failure". This
- * is mostly used to make sure the error notifications are clean and the history table has a meaningful exception.
- *
- * The expected behavior is that when this is thrown, this error should be ignored from history table and not used
- * for notifications
- */
-public class SomeRepairFailedException extends RuntimeException
+public interface RepairNotifier
 {
-    public static final SomeRepairFailedException INSTANCE = new SomeRepairFailedException();
+    void notification(String message);
+    void notifyError(Throwable error);
+    void notifyProgress(String message);
 }
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 30244a7..cc9c770 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -19,16 +19,24 @@ package org.apache.cassandra.repair;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.*;
-import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.commons.lang3.time.DurationFormatUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,10 +58,7 @@ import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.locator.EndpointsForRange;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.metrics.RepairMetrics;
 import org.apache.cassandra.metrics.StorageMetrics;
-import org.apache.cassandra.repair.consistent.CoordinatorSession;
-import org.apache.cassandra.repair.consistent.SyncStatSummary;
 import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.SystemDistributedKeyspace;
@@ -61,20 +66,16 @@ import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.tracing.TraceKeyspace;
 import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.DiagnosticSnapshotService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.apache.cassandra.utils.concurrent.Future;
-import org.apache.cassandra.utils.concurrent.FutureCombiner;
-import org.apache.cassandra.utils.concurrent.ImmediateFuture;
 import org.apache.cassandra.utils.progress.ProgressEvent;
 import org.apache.cassandra.utils.progress.ProgressEventNotifier;
 import org.apache.cassandra.utils.progress.ProgressEventType;
@@ -85,7 +86,7 @@ import static org.apache.cassandra.service.QueryState.forInternalCalls;
 import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 
-public class RepairRunnable implements Runnable, ProgressEventNotifier
+public class RepairRunnable implements Runnable, ProgressEventNotifier, RepairNotifier
 {
     private static final Logger logger = LoggerFactory.getLogger(RepairRunnable.class);
 
@@ -141,26 +142,14 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
         }
     }
 
+    @Override
     public void notification(String msg)
     {
         logger.info(msg);
         fireProgressEvent(new ProgressEvent(ProgressEventType.NOTIFICATION, progressCounter.get(), totalProgress, msg));
     }
 
-    private void skip(String msg)
-    {
-        notification("Repair " + parentSession + " skipped: " + msg);
-        success(msg);
-    }
-
-    private void success(String msg)
-    {
-        fireProgressEvent(new ProgressEvent(ProgressEventType.SUCCESS, progressCounter.get(), totalProgress, msg));
-        ActiveRepairService.instance.recordRepairStatus(cmd, ActiveRepairService.ParentRepairStatus.COMPLETED,
-                                                        ImmutableList.of(msg));
-        complete(null);
-    }
-
+    @Override
     public void notifyError(Throwable error)
     {
         // exception should be ignored
@@ -187,6 +176,30 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
         maybeStoreParentRepairFailure(error);
     }
 
+    @Override
+    public void notifyProgress(String message)
+    {
+        logger.info(message);
+        fireProgressEvent(new ProgressEvent(ProgressEventType.PROGRESS,
+                                            progressCounter.incrementAndGet(),
+                                            totalProgress,
+                                            message));
+    }
+
+    private void skip(String msg)
+    {
+        notification("Repair " + parentSession + " skipped: " + msg);
+        success(msg);
+    }
+
+    private void success(String msg)
+    {
+        fireProgressEvent(new ProgressEvent(ProgressEventType.SUCCESS, progressCounter.get(), totalProgress, msg));
+        ActiveRepairService.instance.recordRepairStatus(cmd, ActiveRepairService.ParentRepairStatus.COMPLETED,
+                                                        ImmutableList.of(msg));
+        complete(null);
+    }
+
     private void fail(String reason)
     {
         if (reason == null)
@@ -397,264 +410,51 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
 
     private void repair(String[] cfnames, NeighborsAndRanges neighborsAndRanges)
     {
+        RepairTask task;
         if (options.isPreview())
         {
-            previewRepair(parentSession,
-                          neighborsAndRanges.filterCommonRanges(keyspace, cfnames),
-                          neighborsAndRanges.participants,
-                          cfnames);
+            task = new PreviewRepairTask(options, keyspace, this, parentSession, neighborsAndRanges.filterCommonRanges(keyspace, cfnames), cfnames);
         }
         else if (options.isIncremental())
         {
-            incrementalRepair(parentSession,
-                              traceState,
-                              neighborsAndRanges,
-                              neighborsAndRanges.participants,
-                              cfnames);
+            task = new IncrementalRepairTask(options, keyspace, this, parentSession, neighborsAndRanges, cfnames);
         }
         else
         {
-            normalRepair(parentSession,
-                         traceState,
-                         neighborsAndRanges.filterCommonRanges(keyspace, cfnames),
-                         neighborsAndRanges.participants,
-                         cfnames);
+            task = new NormalRepairTask(options, keyspace, this, parentSession, neighborsAndRanges.filterCommonRanges(keyspace, cfnames), cfnames);
         }
-    }
 
-    @SuppressWarnings("UnstableApiUsage")
-    private void normalRepair(UUID parentSession,
-                              TraceState traceState,
-                              List<CommonRange> commonRanges,
-                              Set<InetAddressAndPort> preparedEndpoints,
-                              String... cfnames)
-    {
-
-        // Set up RepairJob executor for this repair command.
         ExecutorPlus executor = createExecutor();
-
-        // Setting the repairedAt time to UNREPAIRED_SSTABLE causes the repairedAt times to be preserved across streamed sstables
-        final Future<List<RepairSessionResult>> allSessions = submitRepairSessions(parentSession, false, executor, commonRanges, cfnames);
-
-        // After all repair sessions completes(successful or not),
-        // run anticompaction if necessary and send finish notice back to client
-        final Collection<Range<Token>> successfulRanges = new ArrayList<>();
-        final AtomicBoolean hasFailure = new AtomicBoolean();
-        allSessions.andThenAsync(results -> {
-            logger.debug("Repair result: {}", results);
-            // filter out null(=failed) results and get successful ranges
-            for (RepairSessionResult sessionResult : results)
+        Future<CoordinatedRepairResult> f = task.perform(executor);
+        f.addCallback((result, failure) -> {
+            try
             {
-                if (sessionResult != null)
+                if (failure != null)
                 {
-                    // don't record successful repair if we had to skip ranges
-                    if (!sessionResult.skippedReplicas)
-                    {
-                        successfulRanges.addAll(sessionResult.ranges);
-                    }
+                    notifyError(failure);
+                    fail(failure.getMessage());
                 }
                 else
                 {
-                    hasFailure.compareAndSet(false, true);
-                }
-            }
-            return ImmediateFuture.success(null);
-        }).addCallback(new RepairCompleteCallback(parentSession,
-                                                  successfulRanges,
-                                                  preparedEndpoints,
-                                                  traceState,
-                                                  hasFailure,
-                                                  executor));
-    }
-
-    private void incrementalRepair(UUID parentSession,
-                                   TraceState traceState,
-                                   NeighborsAndRanges neighborsAndRanges,
-                                   Set<InetAddressAndPort> preparedEndpoints,
-                                   String... cfnames)
-    {
-        // the local node also needs to be included in the set of participants, since coordinator sessions aren't persisted
-        Set<InetAddressAndPort> allParticipants = ImmutableSet.<InetAddressAndPort>builder()
-                                                  .addAll(neighborsAndRanges.participants)
-                                                  .add(FBUtilities.getBroadcastAddressAndPort())
-                                                  .build();
-        // Not necessary to include self for filtering. The common ranges only contains neighbhor node endpoints.
-        List<CommonRange> allRanges = neighborsAndRanges.filterCommonRanges(keyspace, cfnames);
-
-        CoordinatorSession coordinatorSession;
-        try
-        {
-            coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession, allParticipants, neighborsAndRanges.shouldExcludeDeadParticipants);
-        }
-        catch (NoSuchRepairSessionException e)
-        {
-            logger.warn("Aborting repair session: "+e.getMessage());
-            fail(e.getMessage());
-            return;
-        }
-        ExecutorPlus executor = createExecutor();
-        AtomicBoolean hasFailure = new AtomicBoolean(false);
-        Future<?> repairResult = coordinatorSession.execute(() -> submitRepairSessions(parentSession, true, executor, allRanges, cfnames),
-                                                                   hasFailure);
-        Collection<Range<Token>> ranges = new HashSet<>();
-        for (Collection<Range<Token>> range : Iterables.transform(allRanges, cr -> cr.ranges))
-        {
-            ranges.addAll(range);
-        }
-        repairResult.addCallback(new RepairCompleteCallback(parentSession, ranges, preparedEndpoints, traceState, hasFailure, executor));
-    }
-
-    private void previewRepair(UUID parentSession,
-                               List<CommonRange> commonRanges,
-                               Set<InetAddressAndPort> preparedEndpoints,
-                               String... cfnames)
-    {
-
-        logger.debug("Starting preview repair for {}", parentSession);
-        // Set up RepairJob executor for this repair command.
-        ExecutorPlus executor = createExecutor();
-
-        final Future<List<RepairSessionResult>> allSessions = submitRepairSessions(parentSession, false, executor, commonRanges, cfnames);
-
-        allSessions.addCallback(new FutureCallback<List<RepairSessionResult>>()
-        {
-            public void onSuccess(List<RepairSessionResult> results)
-            {
-                try
-                {
-                    if (results == null || results.stream().anyMatch(s -> s == null))
+                    maybeStoreParentRepairSuccess(result.successfulRanges);
+                    if (result.hasFailed())
                     {
-                        // something failed
                         fail(null);
-                        return;
-                    }
-                    PreviewKind previewKind = options.getPreviewKind();
-                    Preconditions.checkState(previewKind != PreviewKind.NONE, "Preview is NONE");
-                    SyncStatSummary summary = new SyncStatSummary(true);
-                    summary.consumeSessionResults(results);
-
-                    final String message;
-                    if (summary.isEmpty())
-                    {
-                        message = previewKind == PreviewKind.REPAIRED ? "Repaired data is in sync" : "Previewed data was in sync";
                     }
                     else
                     {
-                        message = (previewKind == PreviewKind.REPAIRED ? "Repaired data is inconsistent\n" : "Preview complete\n") + summary;
-                        RepairMetrics.previewFailures.inc();
-                        if (previewKind == PreviewKind.REPAIRED)
-                            maybeSnapshotReplicas(parentSession, keyspace, results);
+                        success(task.name() + " completed successfully");
+                        ActiveRepairService.instance.cleanUp(parentSession, neighborsAndRanges.participants);
                     }
-                    notification(message);
-
-                    success("Repair preview completed successfully");
-                    ActiveRepairService.instance.cleanUp(parentSession, preparedEndpoints);
-                }
-                catch (Throwable t)
-                {
-                    logger.error("Error completing preview repair", t);
-                    onFailure(t);
-                }
-                finally
-                {
-                    executor.shutdownNow();
                 }
             }
-
-            public void onFailure(Throwable t)
+            finally
             {
-                notifyError(t);
-                fail("Error completing preview repair: " + t.getMessage());
                 executor.shutdownNow();
             }
         });
     }
 
-    private void maybeSnapshotReplicas(UUID parentSession, String keyspace, List<RepairSessionResult> results)
-    {
-        if (!DatabaseDescriptor.snapshotOnRepairedDataMismatch())
-            return;
-
-        try
-        {
-            Set<String> mismatchingTables = new HashSet<>();
-            Set<InetAddressAndPort> nodes = new HashSet<>();
-            for (RepairSessionResult sessionResult : results)
-            {
-                for (RepairResult repairResult : emptyIfNull(sessionResult.repairJobResults))
-                {
-                    for (SyncStat stat : emptyIfNull(repairResult.stats))
-                    {
-                        if (stat.numberOfDifferences > 0)
-                            mismatchingTables.add(repairResult.desc.columnFamily);
-                        // snapshot all replicas, even if they don't have any differences
-                        nodes.add(stat.nodes.coordinator);
-                        nodes.add(stat.nodes.peer);
-                    }
-                }
-            }
-
-            String snapshotName = DiagnosticSnapshotService.getSnapshotName(DiagnosticSnapshotService.REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX);
-            for (String table : mismatchingTables)
-            {
-                // we can just check snapshot existence locally since the repair coordinator is always a replica (unlike in the read case)
-                if (!Keyspace.open(keyspace).getColumnFamilyStore(table).snapshotExists(snapshotName))
-                {
-                    logger.info("{} Snapshotting {}.{} for preview repair mismatch with tag {} on instances {}",
-                                options.getPreviewKind().logPrefix(parentSession),
-                                keyspace, table, snapshotName, nodes);
-                    DiagnosticSnapshotService.repairedDataMismatch(Keyspace.open(keyspace).getColumnFamilyStore(table).metadata(), nodes);
-                }
-                else
-                {
-                    logger.info("{} Not snapshotting {}.{} - snapshot {} exists",
-                                options.getPreviewKind().logPrefix(parentSession),
-                                keyspace, table, snapshotName);
-                }
-            }
-        }
-        catch (Exception e)
-        {
-            logger.error("{} Failed snapshotting replicas", options.getPreviewKind().logPrefix(parentSession), e);
-        }
-    }
-
-    private static <T> Iterable<T> emptyIfNull(Iterable<T> iter)
-    {
-        if (iter == null)
-            return Collections.emptyList();
-        return iter;
-    }
-
-    private Future<List<RepairSessionResult>> submitRepairSessions(UUID parentSession,
-                                                                   boolean isIncremental,
-                                                                   ExecutorPlus executor,
-                                                                   List<CommonRange> commonRanges,
-                                                                   String... cfnames)
-    {
-        List<Future<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size());
-
-        for (CommonRange commonRange : commonRanges)
-        {
-            logger.info("Starting RepairSession for {}", commonRange);
-            RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession,
-                                                                                     commonRange,
-                                                                                     keyspace,
-                                                                                     options.getParallelism(),
-                                                                                     isIncremental,
-                                                                                     options.isPullRepair(),
-                                                                                     options.getPreviewKind(),
-                                                                                     options.optimiseStreams(),
-                                                                                     executor,
-                                                                                     cfnames);
-            if (session == null)
-                continue;
-            session.addCallback(new RepairSessionCallback(session));
-            futures.add(session);
-        }
-        return FutureCombiner.successfulOf(futures);
-    }
-
     private ExecutorPlus createExecutor()
     {
         return executorFactory()
@@ -663,81 +463,6 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
                 .pooled("Repair#" + cmd, options.getJobThreads());
     }
 
-    private class RepairSessionCallback implements FutureCallback<RepairSessionResult>
-    {
-        private final RepairSession session;
-
-        public RepairSessionCallback(RepairSession session)
-        {
-            this.session = session;
-        }
-
-        public void onSuccess(RepairSessionResult result)
-        {
-            String message = String.format("Repair session %s for range %s finished", session.getId(),
-                                           session.ranges().toString());
-            logger.info(message);
-            fireProgressEvent(new ProgressEvent(ProgressEventType.PROGRESS,
-                                                progressCounter.incrementAndGet(),
-                                                totalProgress,
-                                                message));
-        }
-
-        public void onFailure(Throwable t)
-        {
-            String message = String.format("Repair session %s for range %s failed with error %s",
-                                           session.getId(), session.ranges().toString(), t.getMessage());
-            notifyError(new RuntimeException(message, t));
-        }
-    }
-
-    private class RepairCompleteCallback implements FutureCallback<Object>
-    {
-        final UUID parentSession;
-        final Collection<Range<Token>> successfulRanges;
-        final Set<InetAddressAndPort> preparedEndpoints;
-        final TraceState traceState;
-        final AtomicBoolean hasFailure;
-        final ExecutorService executor;
-
-        public RepairCompleteCallback(UUID parentSession,
-                                      Collection<Range<Token>> successfulRanges,
-                                      Set<InetAddressAndPort> preparedEndpoints,
-                                      TraceState traceState,
-                                      AtomicBoolean hasFailure,
-                                      ExecutorService executor)
-        {
-            this.parentSession = parentSession;
-            this.successfulRanges = successfulRanges;
-            this.preparedEndpoints = preparedEndpoints;
-            this.traceState = traceState;
-            this.hasFailure = hasFailure;
-            this.executor = executor;
-        }
-
-        public void onSuccess(Object result)
-        {
-            maybeStoreParentRepairSuccess(successfulRanges);
-            if (hasFailure.get())
-            {
-                fail(null);
-            }
-            else
-            {
-                success("Repair completed successfully");
-                ActiveRepairService.instance.cleanUp(parentSession, preparedEndpoints);
-            }
-            executor.shutdownNow();
-        }
-
-        public void onFailure(Throwable t)
-        {
-            notifyError(t);
-            fail(t.getMessage());
-            executor.shutdownNow();
-        }
-    }
-
     private static void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, EndpointsForRange neighbors)
     {
         Set<InetAddressAndPort> endpoints = neighbors.endpoints();
@@ -842,9 +567,9 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
 
     static final class NeighborsAndRanges
     {
-        private final boolean shouldExcludeDeadParticipants;
-        private final Set<InetAddressAndPort> participants;
-        private final List<CommonRange> commonRanges;
+        final boolean shouldExcludeDeadParticipants;
+        final Set<InetAddressAndPort> participants;
+        final List<CommonRange> commonRanges;
 
         NeighborsAndRanges(boolean shouldExcludeDeadParticipants, Set<InetAddressAndPort> participants, List<CommonRange> commonRanges)
         {
diff --git a/src/java/org/apache/cassandra/repair/SomeRepairFailedException.java b/src/java/org/apache/cassandra/repair/RepairTask.java
similarity index 55%
copy from src/java/org/apache/cassandra/repair/SomeRepairFailedException.java
copy to src/java/org/apache/cassandra/repair/RepairTask.java
index 4b077b8..12d65c1 100644
--- a/src/java/org/apache/cassandra/repair/SomeRepairFailedException.java
+++ b/src/java/org/apache/cassandra/repair/RepairTask.java
@@ -15,17 +15,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.repair;
 
-/**
- * This is a special exception which states "I know something failed but I don't have access to the failure". This
- * is mostly used to make sure the error notifications are clean and the history table has a meaningful exception.
- *
- * The expected behavior is that when this is thrown, this error should be ignored from history table and not used
- * for notifications
- */
-public class SomeRepairFailedException extends RuntimeException
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
+
+public interface RepairTask
 {
-    public static final SomeRepairFailedException INSTANCE = new SomeRepairFailedException();
+    String name();
+
+    Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor) throws Exception;
+
+    default Future<CoordinatedRepairResult> perform(ExecutorPlus executor)
+    {
+        try
+        {
+            return performUnsafe(executor);
+        }
+        catch (Exception | Error e)
+        {
+            JVMStabilityInspector.inspectThrowable(e);
+            return ImmediateFuture.failure(e);
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/repair/SomeRepairFailedException.java b/src/java/org/apache/cassandra/repair/SomeRepairFailedException.java
index 4b077b8..13ee0ae 100644
--- a/src/java/org/apache/cassandra/repair/SomeRepairFailedException.java
+++ b/src/java/org/apache/cassandra/repair/SomeRepairFailedException.java
@@ -28,4 +28,9 @@ package org.apache.cassandra.repair;
 public class SomeRepairFailedException extends RuntimeException
 {
     public static final SomeRepairFailedException INSTANCE = new SomeRepairFailedException();
+
+    private SomeRepairFailedException()
+    {
+        super(null, null, false, false);
+    }
 }
diff --git a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
index 1027d0a..24e24fa 100644
--- a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
@@ -19,21 +19,18 @@
 package org.apache.cassandra.repair.consistent;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 
-import javax.annotation.Nullable;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.FutureCallback;
+
+import org.apache.cassandra.concurrent.ImmediateExecutor;
+import org.apache.cassandra.repair.CoordinatedRepairResult;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
 import org.apache.cassandra.utils.concurrent.Future;
-import org.apache.cassandra.utils.concurrent.ImmediateFuture;
-import org.apache.cassandra.utils.concurrent.Promise;
+
 import org.apache.commons.lang3.time.DurationFormatUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,7 +40,6 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.Verb;
-import org.apache.cassandra.repair.RepairSessionResult;
 import org.apache.cassandra.repair.SomeRepairFailedException;
 import org.apache.cassandra.repair.messages.FailSession;
 import org.apache.cassandra.repair.messages.FinalizeCommit;
@@ -64,8 +60,8 @@ public class CoordinatorSession extends ConsistentSession
     private static final Logger logger = LoggerFactory.getLogger(CoordinatorSession.class);
 
     private final Map<InetAddressAndPort, State> participantStates = new HashMap<>();
-    private final AsyncPromise<Boolean> prepareFuture = AsyncPromise.uncancellable();
-    private final AsyncPromise<Boolean> finalizeProposeFuture = AsyncPromise.uncancellable();
+    private final AsyncPromise<Void> prepareFuture = AsyncPromise.uncancellable();
+    private final AsyncPromise<Void> finalizeProposeFuture = AsyncPromise.uncancellable();
 
     private volatile long sessionStart = Long.MIN_VALUE;
     private volatile long repairStart = Long.MIN_VALUE;
@@ -148,7 +144,7 @@ public class CoordinatorSession extends ConsistentSession
         MessagingService.instance().send(message, destination);
     }
 
-    public Future<Boolean> prepare()
+    public Future<Void> prepare()
     {
         Preconditions.checkArgument(allStates(State.PREPARING));
 
@@ -188,12 +184,11 @@ public class CoordinatorSession extends ConsistentSession
         if (getState() == State.PREPARED)
         {
             logger.info("Incremental repair session {} successfully prepared.", sessionID);
-            prepareFuture.trySuccess(true);
+            prepareFuture.trySuccess(null);
         }
         else
         {
             fail();
-            prepareFuture.trySuccess(false);
         }
     }
 
@@ -202,7 +197,7 @@ public class CoordinatorSession extends ConsistentSession
         setAll(State.REPAIRING);
     }
 
-    public synchronized Future<Boolean> finalizePropose()
+    public synchronized Future<Void> finalizePropose()
     {
         Preconditions.checkArgument(allStates(State.REPAIRING));
         logger.info("Proposing finalization of repair session {}", sessionID);
@@ -224,7 +219,6 @@ public class CoordinatorSession extends ConsistentSession
         {
             logger.warn("Finalization proposal of session {} rejected by {}. Aborting session", sessionID, participant);
             fail();
-            finalizeProposeFuture.trySuccess(false);
         }
         else
         {
@@ -233,7 +227,7 @@ public class CoordinatorSession extends ConsistentSession
             if (getState() == State.FINALIZE_PROMISED)
             {
                 logger.info("Finalization proposal for repair session {} accepted by all participants.", sessionID);
-                finalizeProposeFuture.trySuccess(true);
+                finalizeProposeFuture.trySuccess(null);
             }
         }
     }
@@ -287,103 +281,51 @@ public class CoordinatorSession extends ConsistentSession
     /**
      * Runs the asynchronous consistent repair session. Actual repair sessions are scheduled via a submitter to make unit testing easier
      */
-    public Future execute(Supplier<Future<List<RepairSessionResult>>> sessionSubmitter, AtomicBoolean hasFailure)
+    public Future<CoordinatedRepairResult> execute(Supplier<Future<CoordinatedRepairResult>> sessionSubmitter)
     {
         logger.info("Beginning coordination of incremental repair session {}", sessionID);
 
         sessionStart = currentTimeMillis();
-        Future<Boolean> prepareResult = prepare();
+        Future<Void> prepareResult = prepare();
 
         // run repair sessions normally
-        Future<List<RepairSessionResult>> repairSessionResults = prepareResult.andThenAsync(success ->
-        {
-            if (success)
-            {
-                repairStart = currentTimeMillis();
-                if (logger.isDebugEnabled())
-                {
-                    logger.debug("Incremental repair {} prepare phase completed in {}", sessionID, formatDuration(sessionStart, repairStart));
-                }
-                setRepairing();
-                return sessionSubmitter.get();
-            }
-            else
-            {
-                return ImmediateFuture.success(null);
-            }
+        Future<CoordinatedRepairResult> repairSessionResults = prepareResult.flatMap(ignore -> {
+            repairStart = currentTimeMillis();
+            if (logger.isDebugEnabled())
+                logger.debug("Incremental repair {} prepare phase completed in {}", sessionID, formatDuration(sessionStart, repairStart));
+            setRepairing();
+            return sessionSubmitter.get();
         });
 
-        // mark propose finalization
-        Future<Boolean> proposeFuture = repairSessionResults.andThenAsync(results ->
-        {
-            if (results == null || results.isEmpty() || Iterables.any(results, r -> r == null))
+        // if any session failed, then fail the future
+        Future<CoordinatedRepairResult> onlySuccessSessionResults = repairSessionResults.map(result -> {
+            finalizeStart = currentTimeMillis();
+            if (result.hasFailed())
             {
-                finalizeStart = currentTimeMillis();
                 if (logger.isDebugEnabled())
-                {
                     logger.debug("Incremental repair {} validation/stream phase completed in {}", sessionID, formatDuration(repairStart, finalizeStart));
-                }
-                return ImmediateFuture.failure(SomeRepairFailedException.INSTANCE);
-            }
-            else
-            {
-                return finalizePropose();
+                throw SomeRepairFailedException.INSTANCE;
             }
+            return result;
         });
 
-        // return execution result as set by following callback
-        Promise<Boolean> resultFuture = AsyncPromise.uncancellable();
-
-        // commit repaired data
-        proposeFuture.addCallback(new FutureCallback<Boolean>()
-        {
-            public void onSuccess(@Nullable Boolean result)
-            {
-                try
-                {
-                    if (result != null && result)
-                    {
-                        if (logger.isDebugEnabled())
-                        {
-                            logger.debug("Incremental repair {} finalization phase completed in {}", sessionID, formatDuration(finalizeStart, currentTimeMillis()));
-                        }
-                        finalizeCommit();
-                        if (logger.isDebugEnabled())
-                        {
-                            logger.debug("Incremental repair {} phase completed in {}", sessionID, formatDuration(sessionStart, currentTimeMillis()));
-                        }
-                    }
-                    else
-                    {
-                        hasFailure.set(true);
-                        fail();
-                    }
-                    resultFuture.trySuccess(result);
-                }
-                catch (Exception e)
-                {
-                    resultFuture.tryFailure(e);
-                }
-            }
-
-            public void onFailure(Throwable t)
+        // mark propose finalization and commit
+        Future<CoordinatedRepairResult> proposeFuture = onlySuccessSessionResults.flatMap(results -> finalizePropose().map(ignore -> {
+            if (logger.isDebugEnabled())
+                logger.debug("Incremental repair {} finalization phase completed in {}", sessionID, formatDuration(finalizeStart, currentTimeMillis()));
+            finalizeCommit();
+            if (logger.isDebugEnabled())
+                logger.debug("Incremental repair {} phase completed in {}", sessionID, formatDuration(sessionStart, currentTimeMillis()));
+            return results;
+        }));
+
+        return proposeFuture.addCallback((ignore, failure) -> {
+            if (failure != null)
             {
-                try
-                {
-                    if (logger.isDebugEnabled())
-                    {
-                        logger.debug("Incremental repair {} phase failed in {}", sessionID, formatDuration(sessionStart, currentTimeMillis()));
-                    }
-                    hasFailure.set(true);
-                    fail();
-                }
-                finally
-                {
-                    resultFuture.tryFailure(t);
-                }
+                if (logger.isDebugEnabled())
+                    logger.debug("Incremental repair {} phase failed in {}", sessionID, formatDuration(sessionStart, currentTimeMillis()));
+                fail();
             }
-        });
-
-        return resultFuture;
+        }, ImmediateExecutor.INSTANCE);
     }
 }
diff --git a/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java b/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java
index b145fb6..249d1a4 100644
--- a/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java
+++ b/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 import com.google.common.collect.Lists;
 
@@ -179,11 +180,11 @@ public class SyncStatSummary
         summaries.get(cf).consumeStats(result.stats);
     }
 
-    public void consumeSessionResults(List<RepairSessionResult> results)
+    public void consumeSessionResults(Optional<List<RepairSessionResult>> results)
     {
-        if (results != null)
+        if (results.isPresent())
         {
-            filter(results, Objects::nonNull).forEach(r -> filter(r.repairJobResults, Objects::nonNull).forEach(this::consumeRepairResult));
+            filter(results.get(), Objects::nonNull).forEach(r -> filter(r.repairJobResults, Objects::nonNull).forEach(this::consumeRepairResult));
         }
     }
 
diff --git a/src/java/org/apache/cassandra/security/CipherFactory.java b/src/java/org/apache/cassandra/security/CipherFactory.java
index 3c13629..c42ebca 100644
--- a/src/java/org/apache/cassandra/security/CipherFactory.java
+++ b/src/java/org/apache/cassandra/security/CipherFactory.java
@@ -40,6 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import io.netty.util.concurrent.FastThreadLocal;
+import org.apache.cassandra.concurrent.ImmediateExecutor;
 import org.apache.cassandra.config.TransparentDataEncryptionOptions;
 
 /**
@@ -81,7 +82,7 @@ public class CipherFactory
 
         cache = Caffeine.newBuilder() // by default cache is unbounded
                 .maximumSize(64) // a value large enough that we should never even get close (so nothing gets evicted)
-                .executor(MoreExecutors.directExecutor())
+                .executor(ImmediateExecutor.INSTANCE)
                 .removalListener((key, value, cause) ->
                 {
                     // maybe reload the key? (to avoid the reload being on the user's dime)
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 2532c8b..fbcb745 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -340,17 +340,10 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
             LocalSessions.registerListener(session);
 
         // remove session at completion
-        session.addListener(new Runnable()
-        {
-            /**
-             * When repair finished, do clean up
-             */
-            public void run()
-            {
-                sessions.remove(session.getId());
-                LocalSessions.unregisterListener(session);
-            }
-        }, MoreExecutors.directExecutor());
+        session.addListener(() -> {
+            sessions.remove(session.getId());
+            LocalSessions.unregisterListener(session);
+        });
         session.start(executor);
         return session;
     }
diff --git a/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java b/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java
index b8944f9..86e3c12 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java
@@ -280,7 +280,14 @@ public abstract class AbstractFuture<V> implements Future<V>
     @Override
     public AbstractFuture<V> addCallback(BiConsumer<? super V, Throwable> callback)
     {
-        appendListener(new CallbackBiConsumerListener<>(this, callback));
+        appendListener(new CallbackBiConsumerListener<>(this, callback, null));
+        return this;
+    }
+
+    @Override
+    public Future<V> addCallback(BiConsumer<? super V, Throwable> callback, Executor executor)
+    {
+        appendListener(new CallbackBiConsumerListener<>(this, callback, executor));
         return this;
     }
 
@@ -305,19 +312,42 @@ public abstract class AbstractFuture<V> implements Future<V>
     @Override
     public AbstractFuture<V> addCallback(Consumer<? super V> onSuccess, Consumer<? super Throwable> onFailure)
     {
-        appendListener(new CallbackLambdaListener<>(this, onSuccess, onFailure));
+        appendListener(new CallbackLambdaListener<>(this, onSuccess, onFailure, null));
         return this;
     }
 
     /**
-     * Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively
+     * Support more fluid version of {@link com.google.common.util.concurrent.Futures#addCallback}
      *
      * See {@link #addListener(GenericFutureListener)} for ordering semantics.
      */
     @Override
-    public <T> Future<T> andThenAsync(Function<? super V, ? extends Future<T>> andThen)
+    public AbstractFuture<V> addCallback(Consumer<? super V> onSuccess, Consumer<? super Throwable> onFailure, Executor executor)
     {
-        return andThenAsync(andThen, null);
+        appendListener(new CallbackLambdaListener<>(this, onSuccess, onFailure, executor));
+        return this;
+    }
+
+    /**
+     * Support {@link com.google.common.util.concurrent.Futures#transform(ListenableFuture, com.google.common.base.Function, Executor)} natively
+     *
+     * See {@link #addListener(GenericFutureListener)} for ordering semantics.
+     */
+    protected <T> Future<T> map(AbstractFuture<T> result, Function<? super V, ? extends T> mapper, @Nullable Executor executor)
+    {
+        addListener(() -> {
+            try
+            {
+                if (isSuccess()) result.trySet(mapper.apply(getNow()));
+                else result.tryFailure(cause());
+            }
+            catch (Throwable t)
+            {
+                result.tryFailure(t);
+                throw t;
+            }
+        }, executor);
+        return result;
     }
 
     /**
@@ -325,12 +355,12 @@ public abstract class AbstractFuture<V> implements Future<V>
      *
      * See {@link #addListener(GenericFutureListener)} for ordering semantics.
      */
-    protected <T> Future<T> andThenAsync(AbstractFuture<T> result, Function<? super V, ? extends Future<T>> andThen, @Nullable Executor executor)
+    protected <T> Future<T> flatMap(AbstractFuture<T> result, Function<? super V, ? extends Future<T>> flatMapper, @Nullable Executor executor)
     {
         addListener(() -> {
             try
             {
-                if (isSuccess()) andThen.apply(getNow()).addListener(propagate(result));
+                if (isSuccess()) flatMapper.apply(getNow()).addListener(propagate(result));
                 else result.tryFailure(cause());
             }
             catch (Throwable t)
diff --git a/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java b/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java
index a7b7a6a..b09eeb7 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java
@@ -123,14 +123,25 @@ public class AsyncFuture<V> extends AbstractFuture<V>
     }
 
     /**
+     * Support {@link com.google.common.util.concurrent.Futures#transform(ListenableFuture, com.google.common.base.Function, Executor)} natively
+     *
+     * See {@link #addListener(GenericFutureListener)} for ordering semantics.
+     */
+    @Override
+    public <T> Future<T> map(Function<? super V, ? extends T> mapper, Executor executor)
+    {
+        return map(new AsyncFuture<>(), mapper, executor);
+    }
+
+    /**
      * Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively
      *
      * See {@link #addListener(GenericFutureListener)} for ordering semantics.
      */
     @Override
-    public <T> Future<T> andThenAsync(Function<? super V, ? extends Future<T>> andThen, @Nullable Executor executor)
+    public <T> Future<T> flatMap(Function<? super V, ? extends Future<T>> flatMapper, @Nullable Executor executor)
     {
-        return andThenAsync(new AsyncFuture<>(), andThen, executor);
+        return flatMap(new AsyncFuture<>(), flatMapper, executor);
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Future.java b/src/java/org/apache/cassandra/utils/concurrent/Future.java
index 22b15cc..69dc83d 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Future.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Future.java
@@ -118,6 +118,11 @@ public interface Future<V> extends io.netty.util.concurrent.Future<V>, Listenabl
     /**
      * Support {@link com.google.common.util.concurrent.Futures#addCallback} natively
      */
+    Future<V> addCallback(BiConsumer<? super V, Throwable> callback, Executor executor);
+
+    /**
+     * Support {@link com.google.common.util.concurrent.Futures#addCallback} natively
+     */
     Future<V> addCallback(FutureCallback<? super V> callback);
 
     /**
@@ -131,14 +136,35 @@ public interface Future<V> extends io.netty.util.concurrent.Future<V>, Listenabl
     Future<V> addCallback(Consumer<? super V> onSuccess, Consumer<? super Throwable> onFailure);
 
     /**
+     * Support {@link com.google.common.util.concurrent.Futures#addCallback} natively
+     */
+    Future<V> addCallback(Consumer<? super V> onSuccess, Consumer<? super Throwable> onFailure, Executor executor);
+
+    /**
+     * Support {@link com.google.common.util.concurrent.Futures#transform(ListenableFuture, com.google.common.base.Function, Executor)} natively
+     */
+    default <T> Future<T> map(Function<? super V, ? extends T> mapper)
+    {
+        return map(mapper, null);
+    }
+
+    /**
+     * Support {@link com.google.common.util.concurrent.Futures#transform(ListenableFuture, com.google.common.base.Function, Executor)} natively
+     */
+    <T> Future<T> map(Function<? super V, ? extends T> mapper, Executor executor);
+
+    /**
      * Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively
      */
-    <T> Future<T> andThenAsync(Function<? super V, ? extends Future<T>> andThen);
+    default <T> Future<T> flatMap(Function<? super V, ? extends Future<T>> flatMapper)
+    {
+        return flatMap(flatMapper, null);
+    }
 
     /**
      * Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively
      */
-    <T> Future<T> andThenAsync(Function<? super V, ? extends Future<T>> andThen, Executor executor);
+    <T> Future<T> flatMap(Function<? super V, ? extends Future<T>> flatMapper, Executor executor);
 
     /**
      * Invoke {@code runnable} on completion, using {@code executor}.
diff --git a/src/java/org/apache/cassandra/utils/concurrent/ListenerList.java b/src/java/org/apache/cassandra/utils/concurrent/ListenerList.java
index c150ea2..57737ea 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/ListenerList.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/ListenerList.java
@@ -22,7 +22,6 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
-
 import javax.annotation.Nullable;
 
 import com.google.common.util.concurrent.FutureCallback;
@@ -151,7 +150,7 @@ abstract class ListenerList<V> extends IntrusiveStack<ListenerList<V>>
     static <F extends io.netty.util.concurrent.Future<?>> void notifyListener(Executor notifyExecutor, GenericFutureListener<F> listener, F future)
     {
         if (notifyExecutor == null) notifyListener(listener, future);
-        else notifyExecutor.execute(() -> notifyListener(listener, future));
+        else safeExecute(notifyExecutor, () -> notifyListener(listener, future));
     }
 
     /**
@@ -159,8 +158,22 @@ abstract class ListenerList<V> extends IntrusiveStack<ListenerList<V>>
      */
     static void notifyListener(@Nullable Executor notifyExecutor, Runnable listener)
     {
-        if (notifyExecutor == null) ImmediateExecutor.INSTANCE.execute(listener);
-        else notifyExecutor.execute(listener);
+        safeExecute(notifyExecutor, listener);
+    }
+
+    private static void safeExecute(@Nullable Executor notifyExecutor, Runnable runnable)
+    {
+        if (notifyExecutor == null)
+            notifyExecutor = ImmediateExecutor.INSTANCE;
+        try
+        {
+            notifyExecutor.execute(runnable);
+        }
+        catch (Exception | Error e)
+        {
+            // TODO: suboptimal package interdependency - move FutureTask etc here?
+            ExecutionFailure.handle(e);
+        }
     }
 
     /**
@@ -219,11 +232,13 @@ abstract class ListenerList<V> extends IntrusiveStack<ListenerList<V>>
     {
         final Future<V> future;
         final BiConsumer<? super V, Throwable> callback;
+        final Executor executor;
 
-        CallbackBiConsumerListener(Future<V> future, BiConsumer<? super V, Throwable> callback)
+        CallbackBiConsumerListener(Future<V> future, BiConsumer<? super V, Throwable> callback, Executor executor)
         {
             this.future = future;
             this.callback = callback;
+            this.executor = executor;
         }
 
         @Override
@@ -236,7 +251,7 @@ abstract class ListenerList<V> extends IntrusiveStack<ListenerList<V>>
         @Override
         void notifySelf(Executor notifyExecutor, Future<V> future)
         {
-            notifyListener(notifyExecutor, this);
+            notifyListener(executor == null ? notifyExecutor : executor, this);
         }
     }
 
@@ -269,12 +284,14 @@ abstract class ListenerList<V> extends IntrusiveStack<ListenerList<V>>
         final Future<V> future;
         final Consumer<? super V> onSuccess;
         final Consumer<? super Throwable> onFailure;
+        final Executor executor;
 
-        CallbackLambdaListener(Future<V> future, Consumer<? super V> onSuccess, Consumer<? super Throwable> onFailure)
+        CallbackLambdaListener(Future<V> future, Consumer<? super V> onSuccess, Consumer<? super Throwable> onFailure, Executor executor)
         {
             this.future = future;
             this.onSuccess = onSuccess;
             this.onFailure = onFailure;
+            this.executor = executor;
         }
 
         @Override
@@ -287,7 +304,7 @@ abstract class ListenerList<V> extends IntrusiveStack<ListenerList<V>>
         @Override
         void notifySelf(Executor notifyExecutor, Future future)
         {
-            notifyListener(notifyExecutor, this);
+            notifyListener(executor == null ? notifyExecutor : executor, this);
         }
     }
 
diff --git a/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java b/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java
index 9635969..43648c0 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java
@@ -90,14 +90,25 @@ public class SyncFuture<V> extends AbstractFuture<V>
     }
 
     /**
+     * Support {@link com.google.common.util.concurrent.Futures#transform(ListenableFuture, com.google.common.base.Function, Executor)} natively
+     *
+     * See {@link #addListener(GenericFutureListener)} for ordering semantics.
+     */
+    @Override
+    public <T> Future<T> map(Function<? super V, ? extends T> mapper, Executor executor)
+    {
+        return map(new SyncFuture<>(), mapper, executor);
+    }
+
+    /**
      * Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively
      *
      * See {@link #addListener(GenericFutureListener)} for ordering semantics.
      */
     @Override
-    public <T> Future<T> andThenAsync(Function<? super V, ? extends Future<T>> andThen, @Nullable Executor executor)
+    public <T> Future<T> flatMap(Function<? super V, ? extends Future<T>> flatMapper, @Nullable Executor executor)
     {
-        return andThenAsync(new SyncFuture<>(), andThen, executor);
+        return flatMap(new SyncFuture<>(), flatMapper, executor);
     }
 
     /**
diff --git a/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java b/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java
index baada12..0bf9b9f 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java
@@ -77,8 +77,10 @@ public abstract class AbstractNetstatsBootstrapStreaming extends AbstractNetstat
             final Future<?> startupRunnable = executorService.submit((Runnable) secondNode::startup);
             final Future<AbstractNetstatsStreaming.NetstatResults> netstatsFuture = executorService.submit(new NetstatsCallable(cluster.get(1)));
 
+            startupRunnable.get(3, MINUTES);
+            // 1m is a bit much, but should be fine on slower environments.  Node2 can't come up without streaming
+            // completing, so if node2 is up 1m is enough time for the nodetool watcher to yield
             final AbstractNetstatsStreaming.NetstatResults results = netstatsFuture.get(1, MINUTES);
-            startupRunnable.get(2, MINUTES);
 
             results.assertSuccessful();
 
diff --git a/test/distributed/org/apache/cassandra/distributed/test/repair/ForceRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/repair/ForceRepairTest.java
new file mode 100644
index 0000000..479dac3
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/repair/ForceRepairTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.repair;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.junit.Test;
+
+import com.carrotsearch.hppc.LongArrayList;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.QueryResults;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.shared.AssertUtils;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.assertj.core.api.Assertions;
+
+public class ForceRepairTest extends TestBaseImpl
+{
+    /**
+     * Port of python dtest "repair_tests/incremental_repair_test.py::TestIncRepair::test_force" but extends to test
+     * all types of repair.
+     */
+    @Test
+    public void force() throws IOException
+    {
+        force(false);
+    }
+
+    @Test
+    public void forceWithDifference() throws IOException
+    {
+        force(true);
+    }
+
+    private void force(boolean includeDifference) throws IOException
+    {
+        long nowInMicro = System.currentTimeMillis() * 1000;
+        try (Cluster cluster = Cluster.build(3)
+                                      .withConfig(c -> c.set("hinted_handoff_enabled", false)
+                                                        .with(Feature.values()))
+                                      .start())
+        {
+            init(cluster);
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (k INT PRIMARY KEY, v INT)"));
+
+            for (int i = 0; i < 10; i++)
+                cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (k,v) VALUES (?, ?) USING TIMESTAMP ?"), ConsistencyLevel.ALL, i, i, nowInMicro++);
+
+            ClusterUtils.stopUnchecked(cluster.get(2));
+
+            // repair should fail because node2 is down
+            IInvokableInstance node1 = cluster.get(1);
+
+            for (String[] args : Arrays.asList(new String[]{ "--full" },
+                                               new String[]{ "--full", "--preview" },
+                                               new String[]{ "--full", "--validate"}, // nothing should be in the repaired set, so shouldn't stream
+                                               new String[]{ "--preview" }, // IR Preview
+                                               new String[]{ "--validate"}, // nothing should be in the repaired set, so shouldn't stream
+                                               new String[0])) // IR
+            {
+                if (includeDifference)
+                    node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (k,v) VALUES (?, ?) USING TIMESTAMP ?"), -1, -1, nowInMicro++); // each loop should have a different timestamp, causing a new difference
+
+                try
+                {
+                    node1.nodetoolResult(ArrayUtils.addAll(new String[] {"repair", KEYSPACE}, args)).asserts().failure();
+                    node1.nodetoolResult(ArrayUtils.addAll(new String[] {"repair", KEYSPACE, "--force"}, args)).asserts().success();
+
+                    assertNoRepairedAt(cluster);
+                }
+                catch (Exception | Error e)
+                {
+                    // tag the error to include which args broke
+                    e.addSuppressed(new AssertionError("Failure for args: " + Arrays.toString(args)));
+                    throw e;
+                }
+            }
+
+            if (includeDifference)
+            {
+                SimpleQueryResult expected = QueryResults.builder()
+                                                         .row(-1, -1)
+                                                         .build();
+                for (IInvokableInstance node : Arrays.asList(node1, cluster.get(3)))
+                {
+                    SimpleQueryResult results = node.executeInternalWithResult(withKeyspace("SELECT * FROM %s.tbl WHERE k=?"), -1);
+                    expected.reset();
+                    AssertUtils.assertRows(results, expected);
+                }
+            }
+        }
+    }
+
+    private static void assertNoRepairedAt(Cluster cluster)
+    {
+        List<long[]> repairedAt = getRepairedAt(cluster, KEYSPACE, "tbl");
+        Assertions.assertThat(repairedAt).hasSize(cluster.size());
+        for (int i = 0; i < repairedAt.size(); i++)
+        {
+            long[] array = repairedAt.get(i);
+            if (array == null)
+            {
+                // ignore downed nodes
+                Assertions.assertThat(cluster.get(i + 1).isShutdown()).isTrue();
+                continue;
+            }
+            Assertions.assertThat(array).isNotEmpty();
+            for (long a : array)
+                Assertions.assertThat(a).describedAs("node%d had a repaired sstable", i + 1).isEqualTo(0);
+        }
+    }
+
+    private static List<long[]> getRepairedAt(Cluster cluster, String keyspace, String table)
+    {
+        return cluster.stream().map(i -> {
+            if (i.isShutdown())
+                return null;
+
+            return i.callOnInstance(() -> {
+                TableMetadata meta = Schema.instance.getTableMetadata(keyspace, table);
+                ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(meta.id);
+
+                View view = cfs.getTracker().getView();
+                LongArrayList list = new LongArrayList();
+                for (SSTableReader sstable : view.liveSSTables())
+                {
+                    try
+                    {
+                        StatsMetadata metadata = sstable.getSSTableMetadata();
+                        list.add(metadata.repairedAt);
+                    }
+                    catch (Exception e)
+                    {
+                        // ignore
+                    }
+                }
+                return list.toArray();
+            });
+        }).collect(Collectors.toList());
+    }
+}
diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
index ef08022..2bfd4dc 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.repair.consistent;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -31,6 +30,8 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 
 import com.google.common.collect.Lists;
+
+import org.apache.cassandra.repair.CoordinatedRepairResult;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.Promise;
@@ -106,29 +107,27 @@ public class CoordinatorMessagingTest extends AbstractRepairTest
         UUID uuid = registerSession(cfs, true, true);
         CoordinatorSession coordinator = ActiveRepairService.instance.consistent.coordinated.registerSession(uuid, PARTICIPANTS, false);
         AtomicBoolean repairSubmitted = new AtomicBoolean(false);
-        Promise<List<RepairSessionResult>> repairFuture = AsyncPromise.uncancellable();
-        Supplier<Future<List<RepairSessionResult>>> sessionSupplier = () ->
+        Promise<CoordinatedRepairResult> repairFuture = AsyncPromise.uncancellable();
+        Supplier<Future<CoordinatedRepairResult>> sessionSupplier = () ->
         {
             repairSubmitted.set(true);
             return repairFuture;
         };
 
         // coordinator sends prepare requests to create local session and perform anticompaction
-        AtomicBoolean hasFailures = new AtomicBoolean(false);
         Assert.assertFalse(repairSubmitted.get());
 
         // execute repair and start prepare phase
-        Future<Boolean> sessionResult = coordinator.execute(sessionSupplier, hasFailures);
+        Future<CoordinatedRepairResult> sessionResult = coordinator.execute(sessionSupplier);
         Assert.assertFalse(sessionResult.isDone());
-        Assert.assertFalse(hasFailures.get());
+
         // prepare completed
         prepareLatch.countDown();
         spyPrepare.interceptMessageOut(3).get(1, TimeUnit.SECONDS);
         Assert.assertFalse(sessionResult.isDone());
-        Assert.assertFalse(hasFailures.get());
 
         // set result from local repair session
-        repairFuture.trySuccess(Lists.newArrayList(createResult(coordinator), createResult(coordinator), createResult(coordinator)));
+        repairFuture.trySuccess(CoordinatedRepairResult.success(Lists.newArrayList(createResult(coordinator), createResult(coordinator), createResult(coordinator))));
 
         // finalize phase
         finalizeLatch.countDown();
@@ -136,8 +135,7 @@ public class CoordinatorMessagingTest extends AbstractRepairTest
 
         // commit phase
         spyCommit.interceptMessageOut(3).get(1, TimeUnit.SECONDS);
-        Assert.assertTrue(sessionResult.get());
-        Assert.assertFalse(hasFailures.get());
+        Assert.assertFalse(sessionResult.get().hasFailed());
 
         // expect no other messages except from intercepted so far
         spyPrepare.interceptNoMsg(100, TimeUnit.MILLISECONDS);
@@ -197,19 +195,18 @@ public class CoordinatorMessagingTest extends AbstractRepairTest
         UUID uuid = registerSession(cfs, true, true);
         CoordinatorSession coordinator = ActiveRepairService.instance.consistent.coordinated.registerSession(uuid, PARTICIPANTS, false);
         AtomicBoolean repairSubmitted = new AtomicBoolean(false);
-        Promise<List<RepairSessionResult>> repairFuture = AsyncPromise.uncancellable();
-        Supplier<Future<List<RepairSessionResult>>> sessionSupplier = () ->
+        Promise<CoordinatedRepairResult> repairFuture = AsyncPromise.uncancellable();
+        Supplier<Future<CoordinatedRepairResult>> sessionSupplier = () ->
         {
             repairSubmitted.set(true);
             return repairFuture;
         };
 
         // coordinator sends prepare requests to create local session and perform anticompaction
-        AtomicBoolean proposeFailed = new AtomicBoolean(false);
         Assert.assertFalse(repairSubmitted.get());
 
         // execute repair and start prepare phase
-        Future<Boolean> sessionResult = coordinator.execute(sessionSupplier, proposeFailed);
+        Future<CoordinatedRepairResult> sessionResult = coordinator.execute(sessionSupplier);
         prepareLatch.countDown();
         // prepare completed
         try
@@ -222,7 +219,8 @@ public class CoordinatorMessagingTest extends AbstractRepairTest
         }
         sendFailSessionExpectedSpy.interceptMessageOut(3).get(1, TimeUnit.SECONDS);
         Assert.assertFalse(repairSubmitted.get());
-        Assert.assertTrue(proposeFailed.get());
+        Assert.assertTrue(sessionResult.isDone());
+        Assert.assertNotNull(sessionResult.cause());
         Assert.assertEquals(ConsistentSession.State.FAILED, coordinator.getState());
         Assert.assertFalse(ActiveRepairService.instance.consistent.local.isSessionInProgress(uuid));
     }
@@ -236,19 +234,18 @@ public class CoordinatorMessagingTest extends AbstractRepairTest
         UUID uuid = registerSession(cfs, true, true);
         CoordinatorSession coordinator = ActiveRepairService.instance.consistent.coordinated.registerSession(uuid, PARTICIPANTS, false);
         AtomicBoolean repairSubmitted = new AtomicBoolean(false);
-        Promise<List<RepairSessionResult>> repairFuture = AsyncPromise.uncancellable();
-        Supplier<Future<List<RepairSessionResult>>> sessionSupplier = () ->
+        Promise<CoordinatedRepairResult> repairFuture = AsyncPromise.uncancellable();
+        Supplier<Future<CoordinatedRepairResult>> sessionSupplier = () ->
         {
             repairSubmitted.set(true);
             return repairFuture;
         };
 
         // coordinator sends prepare requests to create local session and perform anticompaction
-        AtomicBoolean hasFailures = new AtomicBoolean(false);
         Assert.assertFalse(repairSubmitted.get());
 
         // execute repair and start prepare phase
-        Future<Boolean> sessionResult = coordinator.execute(sessionSupplier, hasFailures);
+        Future<CoordinatedRepairResult> sessionResult = coordinator.execute(sessionSupplier);
         try
         {
             sessionResult.get(1, TimeUnit.SECONDS);
@@ -266,7 +263,6 @@ public class CoordinatorMessagingTest extends AbstractRepairTest
         spyPrepare.expectMockedMessage(2).get(100, TimeUnit.MILLISECONDS);
         sendFailSessionUnexpectedSpy.interceptNoMsg(100, TimeUnit.MILLISECONDS);
         Assert.assertFalse(repairSubmitted.get());
-        Assert.assertFalse(hasFailures.get());
         Assert.assertEquals(ConsistentSession.State.PREPARING, coordinator.getState());
         Assert.assertFalse(ActiveRepairService.instance.consistent.local.isSessionInProgress(uuid));
     }
diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
index 42453d4..0421ad3 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
@@ -19,6 +19,8 @@
 package org.apache.cassandra.repair.consistent;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -28,16 +30,15 @@ import java.util.function.Supplier;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-
-import org.apache.cassandra.utils.concurrent.AsyncPromise;
-import org.apache.cassandra.utils.concurrent.Future;
-import org.apache.cassandra.utils.concurrent.Promise;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.repair.AbstractRepairTest;
+import org.apache.cassandra.repair.CoordinatedRepairResult;
 import org.apache.cassandra.repair.RepairSessionResult;
 import org.apache.cassandra.repair.messages.FailSession;
 import org.apache.cassandra.repair.messages.FinalizeCommit;
@@ -45,8 +46,15 @@ import org.apache.cassandra.repair.messages.FinalizePropose;
 import org.apache.cassandra.repair.messages.PrepareConsistentRequest;
 import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
 
-import static org.apache.cassandra.repair.consistent.ConsistentSession.State.*;
+import static org.apache.cassandra.repair.consistent.ConsistentSession.State.FAILED;
+import static org.apache.cassandra.repair.consistent.ConsistentSession.State.FINALIZE_PROMISED;
+import static org.apache.cassandra.repair.consistent.ConsistentSession.State.PREPARED;
+import static org.apache.cassandra.repair.consistent.ConsistentSession.State.PREPARING;
+import static org.apache.cassandra.repair.consistent.ConsistentSession.State.REPAIRING;
 
 public class CoordinatorSessionTest extends AbstractRepairTest
 {
@@ -210,18 +218,17 @@ public class CoordinatorSessionTest extends AbstractRepairTest
     {
         InstrumentedCoordinatorSession coordinator = createInstrumentedSession();
         AtomicBoolean repairSubmitted = new AtomicBoolean(false);
-        Promise<List<RepairSessionResult>> repairFuture = AsyncPromise.uncancellable();
-        Supplier<Future<List<RepairSessionResult>>> sessionSupplier = () ->
+        Promise<CoordinatedRepairResult> repairFuture = AsyncPromise.uncancellable();
+        Supplier<Future<CoordinatedRepairResult>> sessionSupplier = () ->
         {
             repairSubmitted.set(true);
             return repairFuture;
         };
 
         // coordinator sends prepare requests to create local session and perform anticompaction
-        AtomicBoolean hasFailures = new AtomicBoolean(false);
         Assert.assertFalse(repairSubmitted.get());
         Assert.assertTrue(coordinator.sentMessages.isEmpty());
-        Future sessionResult = coordinator.execute(sessionSupplier, hasFailures);
+        Future<CoordinatedRepairResult> sessionResult = coordinator.execute(sessionSupplier);
 
         for (InetAddressAndPort participant : PARTICIPANTS)
         {
@@ -253,7 +260,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
                                                                     createResult(coordinator));
 
         coordinator.sentMessages.clear();
-        repairFuture.trySuccess(results);
+        repairFuture.trySuccess(CoordinatedRepairResult.success(results));
 
         // propose messages should have been sent once all repair sessions completed successfully
         for (InetAddressAndPort participant : PARTICIPANTS)
@@ -286,7 +293,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
         }
 
         Assert.assertTrue(sessionResult.isDone());
-        Assert.assertFalse(hasFailures.get());
+        sessionResult.syncUninterruptibly();
     }
 
     @Test
@@ -294,18 +301,17 @@ public class CoordinatorSessionTest extends AbstractRepairTest
     {
         InstrumentedCoordinatorSession coordinator = createInstrumentedSession();
         AtomicBoolean repairSubmitted = new AtomicBoolean(false);
-        Promise<List<RepairSessionResult>> repairFuture = AsyncPromise.uncancellable();
-        Supplier<Future<List<RepairSessionResult>>> sessionSupplier = () ->
+        Promise<CoordinatedRepairResult> repairFuture = AsyncPromise.uncancellable();
+        Supplier<Future<CoordinatedRepairResult>> sessionSupplier = () ->
         {
             repairSubmitted.set(true);
             return repairFuture;
         };
 
         // coordinator sends prepare requests to create local session and perform anticompaction
-        AtomicBoolean hasFailures = new AtomicBoolean(false);
         Assert.assertFalse(repairSubmitted.get());
         Assert.assertTrue(coordinator.sentMessages.isEmpty());
-        Future sessionResult = coordinator.execute(sessionSupplier, hasFailures);
+        Future<CoordinatedRepairResult> sessionResult = coordinator.execute(sessionSupplier);
         for (InetAddressAndPort participant : PARTICIPANTS)
         {
             PrepareConsistentRequest expected = new PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new HashSet<>(PARTICIPANTS));
@@ -330,6 +336,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
 
         Assert.assertEquals(ConsistentSession.State.REPAIRING, coordinator.getState());
 
+        List<Collection<Range<Token>>> ranges = Arrays.asList(coordinator.ranges, coordinator.ranges, coordinator.ranges);
         ArrayList<RepairSessionResult> results = Lists.newArrayList(createResult(coordinator),
                                                                     null,
                                                                     createResult(coordinator));
@@ -337,7 +344,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
         coordinator.sentMessages.clear();
         Assert.assertFalse(coordinator.failCalled);
         coordinator.onFail = () -> Assert.assertEquals(REPAIRING, coordinator.getState());
-        repairFuture.trySuccess(results);
+        repairFuture.trySuccess(CoordinatedRepairResult.create(ranges, results));
         Assert.assertTrue(coordinator.failCalled);
 
         // all participants should have been notified of session failure
@@ -348,7 +355,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
         }
 
         Assert.assertTrue(sessionResult.isDone());
-        Assert.assertTrue(hasFailures.get());
+        Assert.assertNotNull(sessionResult.cause());
     }
 
     @Test
@@ -356,18 +363,17 @@ public class CoordinatorSessionTest extends AbstractRepairTest
     {
         InstrumentedCoordinatorSession coordinator = createInstrumentedSession();
         AtomicBoolean repairSubmitted = new AtomicBoolean(false);
-        Promise<List<RepairSessionResult>> repairFuture = AsyncPromise.uncancellable();
-        Supplier<Future<List<RepairSessionResult>>> sessionSupplier = () ->
+        Promise<CoordinatedRepairResult> repairFuture = AsyncPromise.uncancellable();
+        Supplier<Future<CoordinatedRepairResult>> sessionSupplier = () ->
         {
             repairSubmitted.set(true);
             return repairFuture;
         };
 
         // coordinator sends prepare requests to create local session and perform anticompaction
-        AtomicBoolean hasFailures = new AtomicBoolean(false);
         Assert.assertFalse(repairSubmitted.get());
         Assert.assertTrue(coordinator.sentMessages.isEmpty());
-        Future sessionResult = coordinator.execute(sessionSupplier, hasFailures);
+        Future<CoordinatedRepairResult> sessionResult = coordinator.execute(sessionSupplier);
         for (InetAddressAndPort participant : PARTICIPANTS)
         {
             PrepareConsistentRequest expected = new PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new HashSet<>(PARTICIPANTS));
@@ -414,7 +420,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
         Assert.assertFalse(coordinator.sentMessages.containsKey(PARTICIPANT2));
 
         Assert.assertTrue(sessionResult.isDone());
-        Assert.assertTrue(hasFailures.get());
+        Assert.assertNotNull(sessionResult.cause());
     }
 
     @Test
@@ -422,18 +428,17 @@ public class CoordinatorSessionTest extends AbstractRepairTest
     {
         InstrumentedCoordinatorSession coordinator = createInstrumentedSession();
         AtomicBoolean repairSubmitted = new AtomicBoolean(false);
-        Promise<List<RepairSessionResult>> repairFuture = AsyncPromise.uncancellable();
-        Supplier<Future<List<RepairSessionResult>>> sessionSupplier = () ->
+        Promise<CoordinatedRepairResult> repairFuture = AsyncPromise.uncancellable();
+        Supplier<Future<CoordinatedRepairResult>> sessionSupplier = () ->
         {
             repairSubmitted.set(true);
             return repairFuture;
         };
 
         // coordinator sends prepare requests to create local session and perform anticompaction
-        AtomicBoolean hasFailures = new AtomicBoolean(false);
         Assert.assertFalse(repairSubmitted.get());
         Assert.assertTrue(coordinator.sentMessages.isEmpty());
-        Future sessionResult = coordinator.execute(sessionSupplier, hasFailures);
+        Future<CoordinatedRepairResult> sessionResult = coordinator.execute(sessionSupplier);
 
         for (InetAddressAndPort participant : PARTICIPANTS)
         {
@@ -465,7 +470,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
                                                                     createResult(coordinator));
 
         coordinator.sentMessages.clear();
-        repairFuture.trySuccess(results);
+        repairFuture.trySuccess(CoordinatedRepairResult.success(results));
 
         // propose messages should have been sent once all repair sessions completed successfully
         for (InetAddressAndPort participant : PARTICIPANTS)
@@ -501,6 +506,6 @@ public class CoordinatorSessionTest extends AbstractRepairTest
         }
 
         Assert.assertTrue(sessionResult.isDone());
-        Assert.assertTrue(hasFailures.get());
+        Assert.assertNotNull(sessionResult.cause());
     }
 }
diff --git a/test/unit/org/apache/cassandra/utils/concurrent/AbstractTestAsyncPromise.java b/test/unit/org/apache/cassandra/utils/concurrent/AbstractTestAsyncPromise.java
index e1c069b..0c297ec 100644
--- a/test/unit/org/apache/cassandra/utils/concurrent/AbstractTestAsyncPromise.java
+++ b/test/unit/org/apache/cassandra/utils/concurrent/AbstractTestAsyncPromise.java
@@ -35,9 +35,15 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.Assert;
 
 import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.cassandra.config.DatabaseDescriptor;
 
 public abstract class AbstractTestAsyncPromise extends AbstractTestPromise
 {
+    static
+    {
+        DatabaseDescriptor.clientInitialization();
+    }
+
     public static <V> Promise<V> cancelSuccess(Promise<V> promise)
     {
         success(promise, Promise::isCancellable, true);
@@ -117,20 +123,30 @@ public abstract class AbstractTestAsyncPromise extends AbstractTestPromise
                 int id = count++;
                 return result -> { results.add(result); order.add(id); return ImmediateFuture.success(result); };
             }
+            public Function<V, V> getFunction()
+            {
+                int id = count++;
+                return result -> { results.add(result); order.add(id); return result; };
+            }
             public Function<V, Future<V>> getRecursiveAsyncFunction(Promise<V> promise)
             {
                 int id = count++;
-                return result -> { promise.andThenAsync(getAsyncFunction()); results.add(result); order.add(id); return ImmediateFuture.success(result); };
+                return result -> { promise.flatMap(getAsyncFunction()); results.add(result); order.add(id); return ImmediateFuture.success(result); };
             }
             public Function<V, Future<V>> getAsyncFailingFunction()
             {
                 int id = count++;
                 return result -> { results.add(result); order.add(id); return ImmediateFuture.failure(new RuntimeException()); };
             }
+            public Function<V, V> getFailingFunction()
+            {
+                int id = count++;
+                return result -> { results.add(result); order.add(id); throw new RuntimeException(); };
+            }
             public Function<V, Future<V>> getRecursiveAsyncFailingFunction(Promise<V> promise)
             {
                 int id = count++;
-                return result -> { promise.andThenAsync(getAsyncFailingFunction()); results.add(result); order.add(id); return ImmediateFuture.failure(new RuntimeException()); };
+                return result -> { promise.flatMap(getAsyncFailingFunction()); results.add(result); order.add(id); return ImmediateFuture.failure(new RuntimeException()); };
             }
             public FutureCallback<V> getCallback(Future<V> p)
             {
@@ -181,17 +197,26 @@ public abstract class AbstractTestAsyncPromise extends AbstractTestPromise
         promise.addListener(listeners.getRecursiveRunnable(promise), MoreExecutors.directExecutor());
         promise.addListener(listeners.getRecursive());
         promise.addCallback(listeners.getCallback(promise));
+        promise.addCallback(listeners.getCallback(promise), MoreExecutors.directExecutor());
         promise.addCallback(listeners.getRecursiveCallback(promise));
+        promise.addCallback(listeners.getRecursiveCallback(promise), MoreExecutors.directExecutor());
         promise.addCallback(listeners.getConsumer(), fail -> Assert.fail());
+        promise.addCallback(listeners.getConsumer(), fail -> Assert.fail(), MoreExecutors.directExecutor());
         promise.addCallback(listeners.getRecursiveConsumer(), fail -> Assert.fail());
-        promise.andThenAsync(listeners.getAsyncFunction()).addListener(listeners.get());
-        promise.andThenAsync(listeners.getAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get());
-        promise.andThenAsync(listeners.getRecursiveAsyncFunction(promise)).addListener(listeners.get());
-        promise.andThenAsync(listeners.getRecursiveAsyncFunction(promise), MoreExecutors.directExecutor()).addListener(listeners.get());
-        promise.andThenAsync(listeners.getAsyncFailingFunction()).addListener(listeners.getListenerToFailure(promise));
-        promise.andThenAsync(listeners.getAsyncFailingFunction(), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise));
-        promise.andThenAsync(listeners.getRecursiveAsyncFailingFunction(promise)).addListener(listeners.getListenerToFailure(promise));
-        promise.andThenAsync(listeners.getRecursiveAsyncFailingFunction(promise), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise));
+        promise.addCallback(listeners.getRecursiveConsumer(), fail -> Assert.fail(), MoreExecutors.directExecutor());
+        promise.map(listeners.getFunction()).addListener(listeners.get());
+        promise.map(listeners.getFunction(), MoreExecutors.directExecutor()).addListener(listeners.get());
+        promise.map(listeners.getFailingFunction()).addListener(listeners.getListenerToFailure(promise));
+        promise.map(listeners.getFailingFunction(), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise));
+        promise.flatMap(listeners.getAsyncFunction()).addListener(listeners.get());
+        promise.flatMap(listeners.getAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get());
+        promise.flatMap(listeners.getRecursiveAsyncFunction(promise)).addListener(listeners.get());
+        promise.flatMap(listeners.getRecursiveAsyncFunction(promise), MoreExecutors.directExecutor()).addListener(listeners.get());
+        promise.flatMap(listeners.getAsyncFailingFunction()).addListener(listeners.getListenerToFailure(promise));
+        promise.flatMap(listeners.getAsyncFailingFunction(), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise));
+        promise.flatMap(listeners.getRecursiveAsyncFailingFunction(promise)).addListener(listeners.getListenerToFailure(promise));
+        promise.flatMap(listeners.getRecursiveAsyncFailingFunction(promise), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise));
+
         success(promise, Promise::getNow, null);
         success(promise, Promise::isSuccess, false);
         success(promise, Promise::isDone, false);
@@ -224,17 +249,25 @@ public abstract class AbstractTestAsyncPromise extends AbstractTestPromise
         promise.addListener(listeners.getRecursiveRunnable(promise), MoreExecutors.directExecutor());
         promise.addListener(listeners.getRecursive());
         promise.addCallback(listeners.getCallback(promise));
+        promise.addCallback(listeners.getCallback(promise), MoreExecutors.directExecutor());
         promise.addCallback(listeners.getRecursiveCallback(promise));
+        promise.addCallback(listeners.getRecursiveCallback(promise), MoreExecutors.directExecutor());
         promise.addCallback(listeners.getConsumer(), fail -> Assert.fail());
+        promise.addCallback(listeners.getConsumer(), fail -> Assert.fail(), MoreExecutors.directExecutor());
         promise.addCallback(listeners.getRecursiveConsumer(), fail -> Assert.fail());
-        promise.andThenAsync(listeners.getAsyncFunction()).addListener(listeners.get());
-        promise.andThenAsync(listeners.getAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get());
-        promise.andThenAsync(listeners.getRecursiveAsyncFunction(promise)).addListener(listeners.get());
-        promise.andThenAsync(listeners.getRecursiveAsyncFunction(promise), MoreExecutors.directExecutor()).addListener(listeners.get());
-        promise.andThenAsync(listeners.getAsyncFailingFunction()).addListener(listeners.getListenerToFailure(promise));
-        promise.andThenAsync(listeners.getAsyncFailingFunction(), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise));
-        promise.andThenAsync(listeners.getRecursiveAsyncFailingFunction(promise)).addListener(listeners.getListenerToFailure(promise));
-        promise.andThenAsync(listeners.getRecursiveAsyncFailingFunction(promise), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise));
+        promise.addCallback(listeners.getRecursiveConsumer(), fail -> Assert.fail(), MoreExecutors.directExecutor());
+        promise.map(listeners.getFunction()).addListener(listeners.get());
+        promise.map(listeners.getFunction(), MoreExecutors.directExecutor()).addListener(listeners.get());
+        promise.map(listeners.getFailingFunction()).addListener(listeners.getListenerToFailure(promise));
+        promise.map(listeners.getFailingFunction(), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise));
+        promise.flatMap(listeners.getAsyncFunction()).addListener(listeners.get());
+        promise.flatMap(listeners.getAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get());
+        promise.flatMap(listeners.getRecursiveAsyncFunction(promise)).addListener(listeners.get());
+        promise.flatMap(listeners.getRecursiveAsyncFunction(promise), MoreExecutors.directExecutor()).addListener(listeners.get());
+        promise.flatMap(listeners.getAsyncFailingFunction()).addListener(listeners.getListenerToFailure(promise));
+        promise.flatMap(listeners.getAsyncFailingFunction(), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise));
+        promise.flatMap(listeners.getRecursiveAsyncFailingFunction(promise)).addListener(listeners.getListenerToFailure(promise));
+        promise.flatMap(listeners.getRecursiveAsyncFailingFunction(promise), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise));
         success(promise, Promise::isSuccess, true);
         success(promise, Promise::isDone, true);
         success(promise, Promise::isCancelled, false);
@@ -292,7 +325,7 @@ public abstract class AbstractTestAsyncPromise extends AbstractTestPromise
             }
             public Function<V, Future<V>> getRecursiveAsyncFunction()
             {
-                return result -> { promise.andThenAsync(getAsyncFunction()); return ImmediateFuture.success(result); };
+                return result -> { promise.flatMap(getAsyncFunction()); return ImmediateFuture.success(result); };
             }
             public FutureCallback<V> getCallback(Future<V> p)
             {
@@ -346,10 +379,10 @@ public abstract class AbstractTestAsyncPromise extends AbstractTestPromise
         promise.addCallback(listeners.getRecursiveCallback(promise));
         promise.addCallback(fail -> Assert.fail(), listeners.getConsumer());
         promise.addCallback(fail -> Assert.fail(), listeners.getRecursiveConsumer());
-        promise.andThenAsync(listeners.getAsyncFunction()).addListener(listeners.get());
-        promise.andThenAsync(listeners.getAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get());
-        promise.andThenAsync(listeners.getRecursiveAsyncFunction()).addListener(listeners.get());
-        promise.andThenAsync(listeners.getRecursiveAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get());
+        promise.flatMap(listeners.getAsyncFunction()).addListener(listeners.get());
+        promise.flatMap(listeners.getAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get());
+        promise.flatMap(listeners.getRecursiveAsyncFunction()).addListener(listeners.get());
+        promise.flatMap(listeners.getRecursiveAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get());
         success(promise, Promise::isSuccess, false);
         success(promise, Promise::isDone, false);
         success(promise, Promise::isCancelled, false);
@@ -393,8 +426,8 @@ public abstract class AbstractTestAsyncPromise extends AbstractTestPromise
         promise.addCallback(listeners.getRecursiveCallback(promise));
         promise.addCallback(fail -> Assert.fail(), listeners.getConsumer());
         promise.addCallback(fail -> Assert.fail(), listeners.getRecursiveConsumer());
-        promise.andThenAsync(listeners.getAsyncFunction()).addListener(listeners.get());
-        promise.andThenAsync(listeners.getAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get());
+        promise.flatMap(listeners.getAsyncFunction()).addListener(listeners.get());
+        promise.flatMap(listeners.getAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get());
         success(promise, Promise::isSuccess, false);
         success(promise, Promise::isDone, true);
         success(promise, Promise::isCancelled, false);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org