You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2021/11/13 01:00:00 UTC
[cassandra] branch trunk updated: Refactor normal/preview/IR repair
to standardize repair cleanup and error handling of failed RepairJobs
This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new b84ec51 Refactor normal/preview/IR repair to standardize repair cleanup and error handling of failed RepairJobs
b84ec51 is described below
commit b84ec51b4c73d7b9b58ea8a1708ae2f330972970
Author: David Capwell <dc...@apache.org>
AuthorDate: Fri Nov 12 15:47:44 2021 -0800
Refactor normal/preview/IR repair to standardize repair cleanup and error handling of failed RepairJobs
patch by David Capwell; reviewed by Marcus Eriksson for CASSANDRA-17069
---
CHANGES.txt | 1 +
checkstyle.xml | 22 +-
.../org/apache/cassandra/cache/CaffeineCache.java | 3 +-
.../org/apache/cassandra/cache/ChunkCache.java | 3 +-
.../apache/cassandra/cache/SerializingCache.java | 3 +-
.../org/apache/cassandra/cql3/QueryProcessor.java | 3 +-
.../cassandra/db/repair/PendingAntiCompaction.java | 2 +-
.../index/sasi/analyzer/filter/StemmerFactory.java | 3 +-
.../sasi/analyzer/filter/StopWordFactory.java | 3 +-
.../cassandra/metrics/HintedHandoffMetrics.java | 5 +-
.../cassandra/metrics/HintsServiceMetrics.java | 3 +-
.../cassandra/repair/AbstractRepairTask.java | 122 +++++++
.../cassandra/repair/CoordinatedRepairResult.java | 108 ++++++
.../cassandra/repair/IncrementalRepairTask.java | 75 ++++
.../apache/cassandra/repair/NormalRepairTask.java | 57 +++
.../apache/cassandra/repair/PreviewRepairTask.java | 145 ++++++++
.../org/apache/cassandra/repair/RepairJob.java | 4 +-
...airFailedException.java => RepairNotifier.java} | 14 +-
.../apache/cassandra/repair/RepairRunnable.java | 391 +++------------------
...eRepairFailedException.java => RepairTask.java} | 32 +-
.../repair/SomeRepairFailedException.java | 5 +
.../repair/consistent/CoordinatorSession.java | 138 +++-----
.../repair/consistent/SyncStatSummary.java | 7 +-
.../apache/cassandra/security/CipherFactory.java | 3 +-
.../cassandra/service/ActiveRepairService.java | 15 +-
.../cassandra/utils/concurrent/AbstractFuture.java | 44 ++-
.../cassandra/utils/concurrent/AsyncFuture.java | 15 +-
.../apache/cassandra/utils/concurrent/Future.java | 30 +-
.../cassandra/utils/concurrent/ListenerList.java | 33 +-
.../cassandra/utils/concurrent/SyncFuture.java | 15 +-
.../test/AbstractNetstatsBootstrapStreaming.java | 4 +-
.../distributed/test/repair/ForceRepairTest.java | 170 +++++++++
.../consistent/CoordinatorMessagingTest.java | 36 +-
.../repair/consistent/CoordinatorSessionTest.java | 61 ++--
.../utils/concurrent/AbstractTestAsyncPromise.java | 83 +++--
35 files changed, 1082 insertions(+), 576 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index fb97fec..12848af 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1
+ * Refactor normal/preview/IR repair to standardize repair cleanup and error handling of failed RepairJobs (CASSANDRA-17069)
* Log missing peers in StartupClusterConnectivityChecker (CASSANDRA-17130)
* Introduce separate rate limiting settings for entire SSTable streaming (CASSANDRA-17065)
* Implement Virtual Tables for Auth Caches (CASSANDRA-16914)
diff --git a/checkstyle.xml b/checkstyle.xml
index 4bc31dd..19bfa86 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -51,14 +51,30 @@
<module name="SuppressWithNearbyCommentFilter">
<property name="commentFormat" value="checkstyle: permit system clock"/>
- <property name="checkFormat" value="RegexpSinglelineJava"/>
+ <property name="idFormat" value="blockSystemClock"/>
<property name="influenceFormat" value="0"/>
</module>
<module name="RegexpSinglelineJava">
- <!-- To prevent static imports and System.nanoTime or System.currentTimeMillis -->
- <property name="format" value="(newSingleThreadExecutor|newFixedThreadPool|newCachedThreadPool|newSingleThreadScheduledExecutor|newWorkStealingPool|newScheduledThreadPool|defaultThreadFactory)\(|System\.(currentTimeMillis|nanoTime)"/>
+ <!-- block system time -->
+ <property name="id" value="blockSystemClock"/>
+ <property name="format" value="System\.(currentTimeMillis|nanoTime)"/>
<property name="ignoreComments" value="true"/>
+ <property name="message" value="Avoid System for time, should use org.apache.cassandra.utils.Clock.Global or org.apache.cassandra.utils.Clock interface" />
+ </module>
+ <module name="RegexpSinglelineJava">
+ <!-- block normal executors -->
+ <property name="id" value="blockExecutors"/>
+ <property name="format" value="newSingleThreadExecutor|newFixedThreadPool|newCachedThreadPool|newSingleThreadScheduledExecutor|newWorkStealingPool|newScheduledThreadPool|defaultThreadFactory"/>
+ <property name="ignoreComments" value="true"/>
+ <property name="message" value="Avoid creating an executor directly, should use org.apache.cassandra.concurrent.ExecutorFactory.Global#executorFactory" />
+ </module>
+ <module name="RegexpSinglelineJava">
+ <!-- block guavas directExecutor -->
+ <property name="id" value="blockGuavaDirectExecutor"/>
+ <property name="format" value="MoreExecutors\.directExecutor"/>
+ <property name="ignoreComments" value="true"/>
+ <property name="message" value="Avoid MoreExecutors.directExecutor() in favor of ImmediateExecutor.INSTANCE" />
</module>
<module name="IllegalImport">
<property name="illegalPkgs" value=""/>
diff --git a/src/java/org/apache/cassandra/cache/CaffeineCache.java b/src/java/org/apache/cassandra/cache/CaffeineCache.java
index d51ea84..b01093f 100644
--- a/src/java/org/apache/cassandra/cache/CaffeineCache.java
+++ b/src/java/org/apache/cassandra/cache/CaffeineCache.java
@@ -27,6 +27,7 @@ import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Policy.Eviction;
import com.github.benmanes.caffeine.cache.Weigher;
+import org.apache.cassandra.concurrent.ImmediateExecutor;
/**
* An adapter from a Caffeine cache to the ICache interface. This provides an on-heap cache using
@@ -54,7 +55,7 @@ public class CaffeineCache<K extends IMeasurableMemory, V extends IMeasurableMem
Cache<K, V> cache = Caffeine.newBuilder()
.maximumWeight(weightedCapacity)
.weigher(weigher)
- .executor(MoreExecutors.directExecutor())
+ .executor(ImmediateExecutor.INSTANCE)
.build();
return new CaffeineCache<>(cache);
}
diff --git a/src/java/org/apache/cassandra/cache/ChunkCache.java b/src/java/org/apache/cassandra/cache/ChunkCache.java
index c53810a..397850a 100644
--- a/src/java/org/apache/cassandra/cache/ChunkCache.java
+++ b/src/java/org/apache/cassandra/cache/ChunkCache.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import com.github.benmanes.caffeine.cache.*;
+import org.apache.cassandra.concurrent.ImmediateExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.util.*;
@@ -143,7 +144,7 @@ public class ChunkCache
metrics = new ChunkCacheMetrics(this);
cache = Caffeine.newBuilder()
.maximumWeight(cacheSize)
- .executor(MoreExecutors.directExecutor())
+ .executor(ImmediateExecutor.INSTANCE)
.weigher((key, buffer) -> ((Buffer) buffer).buffer.capacity())
.removalListener(this)
.recordStats(() -> metrics)
diff --git a/src/java/org/apache/cassandra/cache/SerializingCache.java b/src/java/org/apache/cassandra/cache/SerializingCache.java
index 55c20ec..cd028e2 100644
--- a/src/java/org/apache/cassandra/cache/SerializingCache.java
+++ b/src/java/org/apache/cassandra/cache/SerializingCache.java
@@ -21,6 +21,7 @@ import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Weigher;
+import org.apache.cassandra.concurrent.ImmediateExecutor;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.util.MemoryInputStream;
import org.apache.cassandra.io.util.MemoryOutputStream;
@@ -51,7 +52,7 @@ public class SerializingCache<K, V> implements ICache<K, V>
this.cache = Caffeine.newBuilder()
.weigher(weigher)
.maximumWeight(capacity)
- .executor(MoreExecutors.directExecutor())
+ .executor(ImmediateExecutor.INSTANCE)
.removalListener((key, mem, cause) -> {
if (cause.wasEvicted()) {
mem.unreference();
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index eea8336..ed99861 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.antlr.runtime.*;
+import org.apache.cassandra.concurrent.ImmediateExecutor;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.metrics.ClientRequestMetrics;
@@ -97,7 +98,7 @@ public class QueryProcessor implements QueryHandler
static
{
preparedStatements = Caffeine.newBuilder()
- .executor(MoreExecutors.directExecutor())
+ .executor(ImmediateExecutor.INSTANCE)
.maximumWeight(capacityToBytes(DatabaseDescriptor.getPreparedStatementsCacheSizeMB()))
.weigher(QueryProcessor::measure)
.removalListener((key, prepared, cause) -> {
diff --git a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
index 59eff55..0007bbe 100644
--- a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
+++ b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
@@ -372,7 +372,7 @@ public class PendingAntiCompaction
}
Future<List<AcquireResult>> acquisitionResults = FutureCombiner.successfulOf(tasks);
- return acquisitionResults.andThenAsync(getAcquisitionCallback(prsId, tokenRanges));
+ return acquisitionResults.flatMap(getAcquisitionCallback(prsId, tokenRanges));
}
@VisibleForTesting
diff --git a/src/java/org/apache/cassandra/index/sasi/analyzer/filter/StemmerFactory.java b/src/java/org/apache/cassandra/index/sasi/analyzer/filter/StemmerFactory.java
index d278c28..17f1512 100644
--- a/src/java/org/apache/cassandra/index/sasi/analyzer/filter/StemmerFactory.java
+++ b/src/java/org/apache/cassandra/index/sasi/analyzer/filter/StemmerFactory.java
@@ -24,6 +24,7 @@ import java.util.Map;
import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.cassandra.concurrent.ImmediateExecutor;
import org.tartarus.snowball.SnowballStemmer;
import org.tartarus.snowball.ext.*;
@@ -42,7 +43,7 @@ public class StemmerFactory
{
private static final Logger logger = LoggerFactory.getLogger(StemmerFactory.class);
private static final LoadingCache<Class, Constructor<?>> STEMMER_CONSTRUCTOR_CACHE = Caffeine.newBuilder()
- .executor(MoreExecutors.directExecutor())
+ .executor(ImmediateExecutor.INSTANCE)
.build(new CacheLoader<Class, Constructor<?>>()
{
public Constructor<?> load(Class aClass) throws Exception
diff --git a/src/java/org/apache/cassandra/index/sasi/analyzer/filter/StopWordFactory.java b/src/java/org/apache/cassandra/index/sasi/analyzer/filter/StopWordFactory.java
index 56c07f3..434fbda 100644
--- a/src/java/org/apache/cassandra/index/sasi/analyzer/filter/StopWordFactory.java
+++ b/src/java/org/apache/cassandra/index/sasi/analyzer/filter/StopWordFactory.java
@@ -32,6 +32,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
+import org.apache.cassandra.concurrent.ImmediateExecutor;
import org.apache.cassandra.io.util.File;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,7 +52,7 @@ public class StopWordFactory
"pl","pt","ro","ru","sv"));
private static final LoadingCache<String, Set<String>> STOP_WORDS_CACHE = Caffeine.newBuilder()
- .executor(MoreExecutors.directExecutor())
+ .executor(ImmediateExecutor.INSTANCE)
.build(StopWordFactory::getStopWordsFromResource);
public static Set<String> getStopWordsForLanguage(Locale locale)
diff --git a/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java b/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java
index 0261e8e..2a5fa9a 100644
--- a/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java
@@ -25,6 +25,7 @@ import com.codahale.metrics.Counter;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
+import org.apache.cassandra.concurrent.ImmediateExecutor;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.UUIDGen;
@@ -44,12 +45,12 @@ public class HintedHandoffMetrics
/** Total number of hints which are not stored, This is not a cache. */
private final LoadingCache<InetAddressAndPort, DifferencingCounter> notStored = Caffeine.newBuilder()
- .executor(MoreExecutors.directExecutor())
+ .executor(ImmediateExecutor.INSTANCE)
.build(DifferencingCounter::new);
/** Total number of hints that have been created, This is not a cache. */
private final LoadingCache<InetAddressAndPort, Counter> createdHintCounts = Caffeine.newBuilder()
- .executor(MoreExecutors.directExecutor())
+ .executor(ImmediateExecutor.INSTANCE)
.build(address -> Metrics.counter(factory.createMetricName("Hints_created-" + address.toString().replace(':', '.'))));
public void incrCreatedHints(InetAddressAndPort address)
diff --git a/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java b/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java
index 424f502..50defd9 100644
--- a/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java
@@ -25,6 +25,7 @@ import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
+import org.apache.cassandra.concurrent.ImmediateExecutor;
import org.apache.cassandra.locator.InetAddressAndPort;
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
@@ -47,7 +48,7 @@ public final class HintsServiceMetrics
/** Histograms per-endpoint of hint delivery delays, This is not a cache. */
private static final LoadingCache<InetAddressAndPort, Histogram> delayByEndpoint = Caffeine.newBuilder()
- .executor(MoreExecutors.directExecutor())
+ .executor(ImmediateExecutor.INSTANCE)
.build(address -> Metrics.histogram(factory.createMetricName("Hint_delays-"+address.toString().replace(':', '.')), false));
public static void updateDelayMetrics(InetAddressAndPort endpoint, long delay)
diff --git a/src/java/org/apache/cassandra/repair/AbstractRepairTask.java b/src/java/org/apache/cassandra/repair/AbstractRepairTask.java
new file mode 100644
index 0000000..c729cda
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/AbstractRepairTask.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.FutureCombiner;
+
+public abstract class AbstractRepairTask implements RepairTask
+{
+ protected static final Logger logger = LoggerFactory.getLogger(AbstractRepairTask.class);
+
+ protected final RepairOption options;
+ protected final String keyspace;
+ protected final RepairNotifier notifier;
+
+ protected AbstractRepairTask(RepairOption options, String keyspace, RepairNotifier notifier)
+ {
+ this.options = Objects.requireNonNull(options);
+ this.keyspace = Objects.requireNonNull(keyspace);
+ this.notifier = Objects.requireNonNull(notifier);
+ }
+
+ private List<RepairSession> submitRepairSessions(UUID parentSession,
+ boolean isIncremental,
+ ExecutorPlus executor,
+ List<CommonRange> commonRanges,
+ String... cfnames)
+ {
+ List<RepairSession> futures = new ArrayList<>(options.getRanges().size());
+
+ for (CommonRange commonRange : commonRanges)
+ {
+ logger.info("Starting RepairSession for {}", commonRange);
+ RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession,
+ commonRange,
+ keyspace,
+ options.getParallelism(),
+ isIncremental,
+ options.isPullRepair(),
+ options.getPreviewKind(),
+ options.optimiseStreams(),
+ executor,
+ cfnames);
+ if (session == null)
+ continue;
+ session.addCallback(new RepairSessionCallback(session));
+ futures.add(session);
+ }
+ return futures;
+ }
+
+ protected Future<CoordinatedRepairResult> runRepair(UUID parentSession,
+ boolean isIncremental,
+ ExecutorPlus executor,
+ List<CommonRange> commonRanges,
+ String... cfnames)
+ {
+ List<RepairSession> allSessions = submitRepairSessions(parentSession, isIncremental, executor, commonRanges, cfnames);
+ List<Collection<Range<Token>>> ranges = Lists.transform(allSessions, RepairSession::ranges);
+ Future<List<RepairSessionResult>> f = FutureCombiner.successfulOf(allSessions);
+ return f.map(results -> {
+ logger.debug("Repair result: {}", results);
+ return CoordinatedRepairResult.create(ranges, results);
+ });
+ }
+
+ private class RepairSessionCallback implements FutureCallback<RepairSessionResult>
+ {
+ private final RepairSession session;
+
+ public RepairSessionCallback(RepairSession session)
+ {
+ this.session = session;
+ }
+
+ public void onSuccess(RepairSessionResult result)
+ {
+ String message = String.format("Repair session %s for range %s finished", session.getId(),
+ session.ranges().toString());
+ notifier.notifyProgress(message);
+ }
+
+ public void onFailure(Throwable t)
+ {
+ String message = String.format("Repair session %s for range %s failed with error %s",
+ session.getId(), session.ranges().toString(), t.getMessage());
+ notifier.notifyError(new RuntimeException(message, t));
+ }
+ }
+}
diff --git a/src/java/org/apache/cassandra/repair/CoordinatedRepairResult.java b/src/java/org/apache/cassandra/repair/CoordinatedRepairResult.java
new file mode 100644
index 0000000..9593acc
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/CoordinatedRepairResult.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+public class CoordinatedRepairResult
+{
+ public final Collection<Range<Token>> successfulRanges;
+ public final Collection<Range<Token>> failedRanges;
+ public final Collection<Range<Token>> skippedRanges;
+ public final Optional<List<RepairSessionResult>> results;
+
+ public CoordinatedRepairResult(Collection<Range<Token>> successfulRanges,
+ Collection<Range<Token>> failedRanges,
+ Collection<Range<Token>> skippedRanges,
+ List<RepairSessionResult> results)
+ {
+ this.successfulRanges = successfulRanges != null ? ImmutableList.copyOf(successfulRanges) : Collections.emptyList();
+ this.failedRanges = failedRanges != null ? ImmutableList.copyOf(failedRanges) : Collections.emptyList();
+ this.skippedRanges = skippedRanges != null ? ImmutableList.copyOf(skippedRanges) : Collections.emptyList();
+ this.results = Optional.ofNullable(results);
+ }
+
+ public static CoordinatedRepairResult create(List<Collection<Range<Token>>> ranges, List<RepairSessionResult> results)
+ {
+ if (results == null || results.isEmpty())
+ // something went wrong; assume all sessions failed
+ return failed(ranges);
+
+ assert ranges.size() == results.size() : String.format("range size %d != results size %d;ranges: %s, results: %s", ranges.size(), results.size(), ranges, results);
+ Collection<Range<Token>> successfulRanges = new ArrayList<>();
+ Collection<Range<Token>> failedRanges = new ArrayList<>();
+ Collection<Range<Token>> skippedRanges = new ArrayList<>();
+ int index = 0;
+ for (RepairSessionResult sessionResult : results)
+ {
+ if (sessionResult != null)
+ {
+ // don't record successful repair if we had to skip ranges
+ Collection<Range<Token>> replicas = sessionResult.skippedReplicas ? skippedRanges : successfulRanges;
+ replicas.addAll(sessionResult.ranges);
+ }
+ else
+ {
+ // FutureCombiner.successfulOf doesn't keep track of the original, but maintains order, so
+ // can fetch the original session
+ failedRanges.addAll(Objects.requireNonNull(ranges.get(index)));
+ }
+ index++;
+ }
+ return new CoordinatedRepairResult(successfulRanges, failedRanges, skippedRanges, results);
+ }
+
+ private static CoordinatedRepairResult failed(@Nullable List<Collection<Range<Token>>> ranges)
+ {
+ Collection<Range<Token>> failedRanges = new ArrayList<>(ranges == null ? 0 : ranges.size());
+ if (ranges != null)
+ ranges.forEach(failedRanges::addAll);
+ return new CoordinatedRepairResult(null, failedRanges, null, null);
+ }
+
+ /**
+ * Utility method for tests to produce a success result; should only be used by tests as syntaxtic sugar as all
+ * results must be present else an error is thrown.
+ */
+ @VisibleForTesting
+ public static CoordinatedRepairResult success(List<RepairSessionResult> results)
+ {
+ assert results != null && results.stream().allMatch(a -> a != null) : String.format("results was null or had a null (failed) result: %s", results);
+ List<Collection<Range<Token>>> ranges = Lists.transform(results, a -> a.ranges);
+ return create(ranges, results);
+ }
+
+ public boolean hasFailed()
+ {
+ return !failedRanges.isEmpty();
+ }
+}
diff --git a/src/java/org/apache/cassandra/repair/IncrementalRepairTask.java b/src/java/org/apache/cassandra/repair/IncrementalRepairTask.java
new file mode 100644
index 0000000..c2951d5
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/IncrementalRepairTask.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.repair.consistent.CoordinatorSession;
+import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Future;
+
+public class IncrementalRepairTask extends AbstractRepairTask
+{
+ private final UUID parentSession;
+ private final RepairRunnable.NeighborsAndRanges neighborsAndRanges;
+ private final String[] cfnames;
+
+ protected IncrementalRepairTask(RepairOption options,
+ String keyspace,
+ RepairNotifier notifier,
+ UUID parentSession,
+ RepairRunnable.NeighborsAndRanges neighborsAndRanges,
+ String[] cfnames)
+ {
+ super(options, keyspace, notifier);
+ this.parentSession = parentSession;
+ this.neighborsAndRanges = neighborsAndRanges;
+ this.cfnames = cfnames;
+ }
+
+ @Override
+ public String name()
+ {
+ return "Repair";
+ }
+
+ @Override
+ public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor) throws Exception
+ {
+ // the local node also needs to be included in the set of participants, since coordinator sessions aren't persisted
+ Set<InetAddressAndPort> allParticipants = ImmutableSet.<InetAddressAndPort>builder()
+ .addAll(neighborsAndRanges.participants)
+ .add(FBUtilities.getBroadcastAddressAndPort())
+ .build();
+ // Not necessary to include self for filtering. The common ranges only contains neighbhor node endpoints.
+ List<CommonRange> allRanges = neighborsAndRanges.filterCommonRanges(keyspace, cfnames);
+
+ CoordinatorSession coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession, allParticipants, neighborsAndRanges.shouldExcludeDeadParticipants);
+
+ return coordinatorSession.execute(() -> runRepair(parentSession, true, executor, allRanges, cfnames));
+
+ }
+}
diff --git a/src/java/org/apache/cassandra/repair/NormalRepairTask.java b/src/java/org/apache/cassandra/repair/NormalRepairTask.java
new file mode 100644
index 0000000..532271c
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/NormalRepairTask.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.utils.concurrent.Future;
+
+public class NormalRepairTask extends AbstractRepairTask
+{
+ private final UUID parentSession;
+ private final List<CommonRange> commonRanges;
+ private final String[] cfnames;
+
+ protected NormalRepairTask(RepairOption options,
+ String keyspace,
+ RepairNotifier notifier,
+ UUID parentSession,
+ List<CommonRange> commonRanges,
+ String[] cfnames)
+ {
+ super(options, keyspace, notifier);
+ this.parentSession = parentSession;
+ this.commonRanges = commonRanges;
+ this.cfnames = cfnames;
+ }
+
+ @Override
+ public String name()
+ {
+ return "Repair";
+ }
+
+ @Override
+ public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor)
+ {
+ return runRepair(parentSession, false, executor, commonRanges, cfnames);
+ }
+}
diff --git a/src/java/org/apache/cassandra/repair/PreviewRepairTask.java b/src/java/org/apache/cassandra/repair/PreviewRepairTask.java
new file mode 100644
index 0000000..e2eb08c
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/PreviewRepairTask.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.RepairMetrics;
+import org.apache.cassandra.repair.consistent.SyncStatSummary;
+import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.DiagnosticSnapshotService;
+import org.apache.cassandra.utils.concurrent.Future;
+
+public class PreviewRepairTask extends AbstractRepairTask
+{
+ private final UUID parentSession;
+ private final List<CommonRange> commonRanges;
+ private final String[] cfnames;
+
+ protected PreviewRepairTask(RepairOption options, String keyspace, RepairNotifier notifier, UUID parentSession, List<CommonRange> commonRanges, String[] cfnames)
+ {
+ super(options, keyspace, notifier);
+ this.parentSession = parentSession;
+ this.commonRanges = commonRanges;
+ this.cfnames = cfnames;
+ }
+
+ @Override
+ public String name()
+ {
+ return "Repair preview";
+ }
+
+ @Override
+ public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor)
+ {
+ Future<CoordinatedRepairResult> f = runRepair(parentSession, false, executor, commonRanges, cfnames);
+ return f.map(result -> {
+ if (result.hasFailed())
+ return result;
+
+ PreviewKind previewKind = options.getPreviewKind();
+ Preconditions.checkState(previewKind != PreviewKind.NONE, "Preview is NONE");
+ SyncStatSummary summary = new SyncStatSummary(true);
+ summary.consumeSessionResults(result.results);
+
+ final String message;
+ if (summary.isEmpty())
+ {
+ message = previewKind == PreviewKind.REPAIRED ? "Repaired data is in sync" : "Previewed data was in sync";
+ }
+ else
+ {
+ message = (previewKind == PreviewKind.REPAIRED ? "Repaired data is inconsistent\n" : "Preview complete\n") + summary;
+ RepairMetrics.previewFailures.inc();
+ if (previewKind == PreviewKind.REPAIRED)
+ maybeSnapshotReplicas(parentSession, keyspace, result.results.get()); // we know its present as summary used it
+ }
+ notifier.notification(message);
+
+ return result;
+ });
+ }
+
+ private void maybeSnapshotReplicas(UUID parentSession, String keyspace, List<RepairSessionResult> results)
+ {
+ if (!DatabaseDescriptor.snapshotOnRepairedDataMismatch())
+ return;
+
+ try
+ {
+ Set<String> mismatchingTables = new HashSet<>();
+ Set<InetAddressAndPort> nodes = new HashSet<>();
+ for (RepairSessionResult sessionResult : results)
+ {
+ for (RepairResult repairResult : emptyIfNull(sessionResult.repairJobResults))
+ {
+ for (SyncStat stat : emptyIfNull(repairResult.stats))
+ {
+ if (stat.numberOfDifferences > 0)
+ mismatchingTables.add(repairResult.desc.columnFamily);
+ // snapshot all replicas, even if they don't have any differences
+ nodes.add(stat.nodes.coordinator);
+ nodes.add(stat.nodes.peer);
+ }
+ }
+ }
+
+ String snapshotName = DiagnosticSnapshotService.getSnapshotName(DiagnosticSnapshotService.REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX);
+ for (String table : mismatchingTables)
+ {
+ // we can just check snapshot existence locally since the repair coordinator is always a replica (unlike in the read case)
+ if (!Keyspace.open(keyspace).getColumnFamilyStore(table).snapshotExists(snapshotName))
+ {
+ logger.info("{} Snapshotting {}.{} for preview repair mismatch with tag {} on instances {}",
+ options.getPreviewKind().logPrefix(parentSession),
+ keyspace, table, snapshotName, nodes);
+ DiagnosticSnapshotService.repairedDataMismatch(Keyspace.open(keyspace).getColumnFamilyStore(table).metadata(), nodes);
+ }
+ else
+ {
+ logger.info("{} Not snapshotting {}.{} - snapshot {} exists",
+ options.getPreviewKind().logPrefix(parentSession),
+ keyspace, table, snapshotName);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ logger.error("{} Failed snapshotting replicas", options.getPreviewKind().logPrefix(parentSession), e);
+ }
+ }
+
+ private static <T> Iterable<T> emptyIfNull(Iterable<T> iter)
+ {
+ if (iter == null)
+ return Collections.emptyList();
+ return iter;
+ }
+}
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index a57ca84..c78cc73 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -135,7 +135,7 @@ public class RepairJob extends AsyncFuture<RepairResult> implements Runnable
}
// When all snapshot complete, send validation requests
- treeResponses = allSnapshotTasks.andThenAsync(endpoints -> {
+ treeResponses = allSnapshotTasks.flatMap(endpoints -> {
if (parallelismDegree == RepairParallelism.SEQUENTIAL)
return sendSequentialValidationRequest(endpoints);
else
@@ -149,7 +149,7 @@ public class RepairJob extends AsyncFuture<RepairResult> implements Runnable
}
// When all validations complete, submit sync tasks
- Future<List<SyncStat>> syncResults = treeResponses.andThenAsync(session.optimiseStreams && !session.pullRepair ? this::optimisedSyncing : this::standardSyncing, taskExecutor);
+ Future<List<SyncStat>> syncResults = treeResponses.flatMap(session.optimiseStreams && !session.pullRepair ? this::optimisedSyncing : this::standardSyncing, taskExecutor);
// When all sync complete, set the final result
syncResults.addCallback(new FutureCallback<List<SyncStat>>()
diff --git a/src/java/org/apache/cassandra/repair/SomeRepairFailedException.java b/src/java/org/apache/cassandra/repair/RepairNotifier.java
similarity index 61%
copy from src/java/org/apache/cassandra/repair/SomeRepairFailedException.java
copy to src/java/org/apache/cassandra/repair/RepairNotifier.java
index 4b077b8..977bc4e 100644
--- a/src/java/org/apache/cassandra/repair/SomeRepairFailedException.java
+++ b/src/java/org/apache/cassandra/repair/RepairNotifier.java
@@ -15,17 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.cassandra.repair;
-/**
- * This is a special exception which states "I know something failed but I don't have access to the failure". This
- * is mostly used to make sure the error notifications are clean and the history table has a meaningful exception.
- *
- * The expected behavior is that when this is thrown, this error should be ignored from history table and not used
- * for notifications
- */
-public class SomeRepairFailedException extends RuntimeException
+public interface RepairNotifier
{
- public static final SomeRepairFailedException INSTANCE = new SomeRepairFailedException();
+ void notification(String message);
+ void notifyError(Throwable error);
+ void notifyProgress(String message);
}
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 30244a7..cc9c770 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -19,16 +19,24 @@ package org.apache.cassandra.repair;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Preconditions;
-import com.google.common.collect.*;
-import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,10 +58,7 @@ import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.locator.EndpointsForRange;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.metrics.RepairMetrics;
import org.apache.cassandra.metrics.StorageMetrics;
-import org.apache.cassandra.repair.consistent.CoordinatorSession;
-import org.apache.cassandra.repair.consistent.SyncStatSummary;
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.SystemDistributedKeyspace;
@@ -61,20 +66,16 @@ import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.tracing.TraceKeyspace;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.DiagnosticSnapshotService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.utils.concurrent.Future;
-import org.apache.cassandra.utils.concurrent.FutureCombiner;
-import org.apache.cassandra.utils.concurrent.ImmediateFuture;
import org.apache.cassandra.utils.progress.ProgressEvent;
import org.apache.cassandra.utils.progress.ProgressEventNotifier;
import org.apache.cassandra.utils.progress.ProgressEventType;
@@ -85,7 +86,7 @@ import static org.apache.cassandra.service.QueryState.forInternalCalls;
import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
-public class RepairRunnable implements Runnable, ProgressEventNotifier
+public class RepairRunnable implements Runnable, ProgressEventNotifier, RepairNotifier
{
private static final Logger logger = LoggerFactory.getLogger(RepairRunnable.class);
@@ -141,26 +142,14 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
}
}
+ @Override
public void notification(String msg)
{
logger.info(msg);
fireProgressEvent(new ProgressEvent(ProgressEventType.NOTIFICATION, progressCounter.get(), totalProgress, msg));
}
- private void skip(String msg)
- {
- notification("Repair " + parentSession + " skipped: " + msg);
- success(msg);
- }
-
- private void success(String msg)
- {
- fireProgressEvent(new ProgressEvent(ProgressEventType.SUCCESS, progressCounter.get(), totalProgress, msg));
- ActiveRepairService.instance.recordRepairStatus(cmd, ActiveRepairService.ParentRepairStatus.COMPLETED,
- ImmutableList.of(msg));
- complete(null);
- }
-
+ @Override
public void notifyError(Throwable error)
{
// exception should be ignored
@@ -187,6 +176,30 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
maybeStoreParentRepairFailure(error);
}
+ @Override
+ public void notifyProgress(String message)
+ {
+ logger.info(message);
+ fireProgressEvent(new ProgressEvent(ProgressEventType.PROGRESS,
+ progressCounter.incrementAndGet(),
+ totalProgress,
+ message));
+ }
+
+ private void skip(String msg)
+ {
+ notification("Repair " + parentSession + " skipped: " + msg);
+ success(msg);
+ }
+
+ private void success(String msg)
+ {
+ fireProgressEvent(new ProgressEvent(ProgressEventType.SUCCESS, progressCounter.get(), totalProgress, msg));
+ ActiveRepairService.instance.recordRepairStatus(cmd, ActiveRepairService.ParentRepairStatus.COMPLETED,
+ ImmutableList.of(msg));
+ complete(null);
+ }
+
private void fail(String reason)
{
if (reason == null)
@@ -397,264 +410,51 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
private void repair(String[] cfnames, NeighborsAndRanges neighborsAndRanges)
{
+ RepairTask task;
if (options.isPreview())
{
- previewRepair(parentSession,
- neighborsAndRanges.filterCommonRanges(keyspace, cfnames),
- neighborsAndRanges.participants,
- cfnames);
+ task = new PreviewRepairTask(options, keyspace, this, parentSession, neighborsAndRanges.filterCommonRanges(keyspace, cfnames), cfnames);
}
else if (options.isIncremental())
{
- incrementalRepair(parentSession,
- traceState,
- neighborsAndRanges,
- neighborsAndRanges.participants,
- cfnames);
+ task = new IncrementalRepairTask(options, keyspace, this, parentSession, neighborsAndRanges, cfnames);
}
else
{
- normalRepair(parentSession,
- traceState,
- neighborsAndRanges.filterCommonRanges(keyspace, cfnames),
- neighborsAndRanges.participants,
- cfnames);
+ task = new NormalRepairTask(options, keyspace, this, parentSession, neighborsAndRanges.filterCommonRanges(keyspace, cfnames), cfnames);
}
- }
- @SuppressWarnings("UnstableApiUsage")
- private void normalRepair(UUID parentSession,
- TraceState traceState,
- List<CommonRange> commonRanges,
- Set<InetAddressAndPort> preparedEndpoints,
- String... cfnames)
- {
-
- // Set up RepairJob executor for this repair command.
ExecutorPlus executor = createExecutor();
-
- // Setting the repairedAt time to UNREPAIRED_SSTABLE causes the repairedAt times to be preserved across streamed sstables
- final Future<List<RepairSessionResult>> allSessions = submitRepairSessions(parentSession, false, executor, commonRanges, cfnames);
-
- // After all repair sessions completes(successful or not),
- // run anticompaction if necessary and send finish notice back to client
- final Collection<Range<Token>> successfulRanges = new ArrayList<>();
- final AtomicBoolean hasFailure = new AtomicBoolean();
- allSessions.andThenAsync(results -> {
- logger.debug("Repair result: {}", results);
- // filter out null(=failed) results and get successful ranges
- for (RepairSessionResult sessionResult : results)
+ Future<CoordinatedRepairResult> f = task.perform(executor);
+ f.addCallback((result, failure) -> {
+ try
{
- if (sessionResult != null)
+ if (failure != null)
{
- // don't record successful repair if we had to skip ranges
- if (!sessionResult.skippedReplicas)
- {
- successfulRanges.addAll(sessionResult.ranges);
- }
+ notifyError(failure);
+ fail(failure.getMessage());
}
else
{
- hasFailure.compareAndSet(false, true);
- }
- }
- return ImmediateFuture.success(null);
- }).addCallback(new RepairCompleteCallback(parentSession,
- successfulRanges,
- preparedEndpoints,
- traceState,
- hasFailure,
- executor));
- }
-
- private void incrementalRepair(UUID parentSession,
- TraceState traceState,
- NeighborsAndRanges neighborsAndRanges,
- Set<InetAddressAndPort> preparedEndpoints,
- String... cfnames)
- {
- // the local node also needs to be included in the set of participants, since coordinator sessions aren't persisted
- Set<InetAddressAndPort> allParticipants = ImmutableSet.<InetAddressAndPort>builder()
- .addAll(neighborsAndRanges.participants)
- .add(FBUtilities.getBroadcastAddressAndPort())
- .build();
- // Not necessary to include self for filtering. The common ranges only contains neighbhor node endpoints.
- List<CommonRange> allRanges = neighborsAndRanges.filterCommonRanges(keyspace, cfnames);
-
- CoordinatorSession coordinatorSession;
- try
- {
- coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession, allParticipants, neighborsAndRanges.shouldExcludeDeadParticipants);
- }
- catch (NoSuchRepairSessionException e)
- {
- logger.warn("Aborting repair session: "+e.getMessage());
- fail(e.getMessage());
- return;
- }
- ExecutorPlus executor = createExecutor();
- AtomicBoolean hasFailure = new AtomicBoolean(false);
- Future<?> repairResult = coordinatorSession.execute(() -> submitRepairSessions(parentSession, true, executor, allRanges, cfnames),
- hasFailure);
- Collection<Range<Token>> ranges = new HashSet<>();
- for (Collection<Range<Token>> range : Iterables.transform(allRanges, cr -> cr.ranges))
- {
- ranges.addAll(range);
- }
- repairResult.addCallback(new RepairCompleteCallback(parentSession, ranges, preparedEndpoints, traceState, hasFailure, executor));
- }
-
- private void previewRepair(UUID parentSession,
- List<CommonRange> commonRanges,
- Set<InetAddressAndPort> preparedEndpoints,
- String... cfnames)
- {
-
- logger.debug("Starting preview repair for {}", parentSession);
- // Set up RepairJob executor for this repair command.
- ExecutorPlus executor = createExecutor();
-
- final Future<List<RepairSessionResult>> allSessions = submitRepairSessions(parentSession, false, executor, commonRanges, cfnames);
-
- allSessions.addCallback(new FutureCallback<List<RepairSessionResult>>()
- {
- public void onSuccess(List<RepairSessionResult> results)
- {
- try
- {
- if (results == null || results.stream().anyMatch(s -> s == null))
+ maybeStoreParentRepairSuccess(result.successfulRanges);
+ if (result.hasFailed())
{
- // something failed
fail(null);
- return;
- }
- PreviewKind previewKind = options.getPreviewKind();
- Preconditions.checkState(previewKind != PreviewKind.NONE, "Preview is NONE");
- SyncStatSummary summary = new SyncStatSummary(true);
- summary.consumeSessionResults(results);
-
- final String message;
- if (summary.isEmpty())
- {
- message = previewKind == PreviewKind.REPAIRED ? "Repaired data is in sync" : "Previewed data was in sync";
}
else
{
- message = (previewKind == PreviewKind.REPAIRED ? "Repaired data is inconsistent\n" : "Preview complete\n") + summary;
- RepairMetrics.previewFailures.inc();
- if (previewKind == PreviewKind.REPAIRED)
- maybeSnapshotReplicas(parentSession, keyspace, results);
+ success(task.name() + " completed successfully");
+ ActiveRepairService.instance.cleanUp(parentSession, neighborsAndRanges.participants);
}
- notification(message);
-
- success("Repair preview completed successfully");
- ActiveRepairService.instance.cleanUp(parentSession, preparedEndpoints);
- }
- catch (Throwable t)
- {
- logger.error("Error completing preview repair", t);
- onFailure(t);
- }
- finally
- {
- executor.shutdownNow();
}
}
-
- public void onFailure(Throwable t)
+ finally
{
- notifyError(t);
- fail("Error completing preview repair: " + t.getMessage());
executor.shutdownNow();
}
});
}
- private void maybeSnapshotReplicas(UUID parentSession, String keyspace, List<RepairSessionResult> results)
- {
- if (!DatabaseDescriptor.snapshotOnRepairedDataMismatch())
- return;
-
- try
- {
- Set<String> mismatchingTables = new HashSet<>();
- Set<InetAddressAndPort> nodes = new HashSet<>();
- for (RepairSessionResult sessionResult : results)
- {
- for (RepairResult repairResult : emptyIfNull(sessionResult.repairJobResults))
- {
- for (SyncStat stat : emptyIfNull(repairResult.stats))
- {
- if (stat.numberOfDifferences > 0)
- mismatchingTables.add(repairResult.desc.columnFamily);
- // snapshot all replicas, even if they don't have any differences
- nodes.add(stat.nodes.coordinator);
- nodes.add(stat.nodes.peer);
- }
- }
- }
-
- String snapshotName = DiagnosticSnapshotService.getSnapshotName(DiagnosticSnapshotService.REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX);
- for (String table : mismatchingTables)
- {
- // we can just check snapshot existence locally since the repair coordinator is always a replica (unlike in the read case)
- if (!Keyspace.open(keyspace).getColumnFamilyStore(table).snapshotExists(snapshotName))
- {
- logger.info("{} Snapshotting {}.{} for preview repair mismatch with tag {} on instances {}",
- options.getPreviewKind().logPrefix(parentSession),
- keyspace, table, snapshotName, nodes);
- DiagnosticSnapshotService.repairedDataMismatch(Keyspace.open(keyspace).getColumnFamilyStore(table).metadata(), nodes);
- }
- else
- {
- logger.info("{} Not snapshotting {}.{} - snapshot {} exists",
- options.getPreviewKind().logPrefix(parentSession),
- keyspace, table, snapshotName);
- }
- }
- }
- catch (Exception e)
- {
- logger.error("{} Failed snapshotting replicas", options.getPreviewKind().logPrefix(parentSession), e);
- }
- }
-
- private static <T> Iterable<T> emptyIfNull(Iterable<T> iter)
- {
- if (iter == null)
- return Collections.emptyList();
- return iter;
- }
-
- private Future<List<RepairSessionResult>> submitRepairSessions(UUID parentSession,
- boolean isIncremental,
- ExecutorPlus executor,
- List<CommonRange> commonRanges,
- String... cfnames)
- {
- List<Future<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size());
-
- for (CommonRange commonRange : commonRanges)
- {
- logger.info("Starting RepairSession for {}", commonRange);
- RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession,
- commonRange,
- keyspace,
- options.getParallelism(),
- isIncremental,
- options.isPullRepair(),
- options.getPreviewKind(),
- options.optimiseStreams(),
- executor,
- cfnames);
- if (session == null)
- continue;
- session.addCallback(new RepairSessionCallback(session));
- futures.add(session);
- }
- return FutureCombiner.successfulOf(futures);
- }
-
private ExecutorPlus createExecutor()
{
return executorFactory()
@@ -663,81 +463,6 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
.pooled("Repair#" + cmd, options.getJobThreads());
}
- private class RepairSessionCallback implements FutureCallback<RepairSessionResult>
- {
- private final RepairSession session;
-
- public RepairSessionCallback(RepairSession session)
- {
- this.session = session;
- }
-
- public void onSuccess(RepairSessionResult result)
- {
- String message = String.format("Repair session %s for range %s finished", session.getId(),
- session.ranges().toString());
- logger.info(message);
- fireProgressEvent(new ProgressEvent(ProgressEventType.PROGRESS,
- progressCounter.incrementAndGet(),
- totalProgress,
- message));
- }
-
- public void onFailure(Throwable t)
- {
- String message = String.format("Repair session %s for range %s failed with error %s",
- session.getId(), session.ranges().toString(), t.getMessage());
- notifyError(new RuntimeException(message, t));
- }
- }
-
- private class RepairCompleteCallback implements FutureCallback<Object>
- {
- final UUID parentSession;
- final Collection<Range<Token>> successfulRanges;
- final Set<InetAddressAndPort> preparedEndpoints;
- final TraceState traceState;
- final AtomicBoolean hasFailure;
- final ExecutorService executor;
-
- public RepairCompleteCallback(UUID parentSession,
- Collection<Range<Token>> successfulRanges,
- Set<InetAddressAndPort> preparedEndpoints,
- TraceState traceState,
- AtomicBoolean hasFailure,
- ExecutorService executor)
- {
- this.parentSession = parentSession;
- this.successfulRanges = successfulRanges;
- this.preparedEndpoints = preparedEndpoints;
- this.traceState = traceState;
- this.hasFailure = hasFailure;
- this.executor = executor;
- }
-
- public void onSuccess(Object result)
- {
- maybeStoreParentRepairSuccess(successfulRanges);
- if (hasFailure.get())
- {
- fail(null);
- }
- else
- {
- success("Repair completed successfully");
- ActiveRepairService.instance.cleanUp(parentSession, preparedEndpoints);
- }
- executor.shutdownNow();
- }
-
- public void onFailure(Throwable t)
- {
- notifyError(t);
- fail(t.getMessage());
- executor.shutdownNow();
- }
- }
-
private static void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, EndpointsForRange neighbors)
{
Set<InetAddressAndPort> endpoints = neighbors.endpoints();
@@ -842,9 +567,9 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
static final class NeighborsAndRanges
{
- private final boolean shouldExcludeDeadParticipants;
- private final Set<InetAddressAndPort> participants;
- private final List<CommonRange> commonRanges;
+ final boolean shouldExcludeDeadParticipants;
+ final Set<InetAddressAndPort> participants;
+ final List<CommonRange> commonRanges;
NeighborsAndRanges(boolean shouldExcludeDeadParticipants, Set<InetAddressAndPort> participants, List<CommonRange> commonRanges)
{
diff --git a/src/java/org/apache/cassandra/repair/SomeRepairFailedException.java b/src/java/org/apache/cassandra/repair/RepairTask.java
similarity index 55%
copy from src/java/org/apache/cassandra/repair/SomeRepairFailedException.java
copy to src/java/org/apache/cassandra/repair/RepairTask.java
index 4b077b8..12d65c1 100644
--- a/src/java/org/apache/cassandra/repair/SomeRepairFailedException.java
+++ b/src/java/org/apache/cassandra/repair/RepairTask.java
@@ -15,17 +15,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.cassandra.repair;
-/**
- * This is a special exception which states "I know something failed but I don't have access to the failure". This
- * is mostly used to make sure the error notifications are clean and the history table has a meaningful exception.
- *
- * The expected behavior is that when this is thrown, this error should be ignored from history table and not used
- * for notifications
- */
-public class SomeRepairFailedException extends RuntimeException
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
+
+public interface RepairTask
{
- public static final SomeRepairFailedException INSTANCE = new SomeRepairFailedException();
+ String name();
+
+ Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor) throws Exception;
+
+ default Future<CoordinatedRepairResult> perform(ExecutorPlus executor)
+ {
+ try
+ {
+ return performUnsafe(executor);
+ }
+ catch (Exception | Error e)
+ {
+ JVMStabilityInspector.inspectThrowable(e);
+ return ImmediateFuture.failure(e);
+ }
+ }
}
diff --git a/src/java/org/apache/cassandra/repair/SomeRepairFailedException.java b/src/java/org/apache/cassandra/repair/SomeRepairFailedException.java
index 4b077b8..13ee0ae 100644
--- a/src/java/org/apache/cassandra/repair/SomeRepairFailedException.java
+++ b/src/java/org/apache/cassandra/repair/SomeRepairFailedException.java
@@ -28,4 +28,9 @@ package org.apache.cassandra.repair;
public class SomeRepairFailedException extends RuntimeException
{
public static final SomeRepairFailedException INSTANCE = new SomeRepairFailedException();
+
+ private SomeRepairFailedException()
+ {
+ super(null, null, false, false);
+ }
}
diff --git a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
index 1027d0a..24e24fa 100644
--- a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
@@ -19,21 +19,18 @@
package org.apache.cassandra.repair.consistent;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
-import javax.annotation.Nullable;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.FutureCallback;
+
+import org.apache.cassandra.concurrent.ImmediateExecutor;
+import org.apache.cassandra.repair.CoordinatedRepairResult;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Future;
-import org.apache.cassandra.utils.concurrent.ImmediateFuture;
-import org.apache.cassandra.utils.concurrent.Promise;
+
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +40,6 @@ import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.Verb;
-import org.apache.cassandra.repair.RepairSessionResult;
import org.apache.cassandra.repair.SomeRepairFailedException;
import org.apache.cassandra.repair.messages.FailSession;
import org.apache.cassandra.repair.messages.FinalizeCommit;
@@ -64,8 +60,8 @@ public class CoordinatorSession extends ConsistentSession
private static final Logger logger = LoggerFactory.getLogger(CoordinatorSession.class);
private final Map<InetAddressAndPort, State> participantStates = new HashMap<>();
- private final AsyncPromise<Boolean> prepareFuture = AsyncPromise.uncancellable();
- private final AsyncPromise<Boolean> finalizeProposeFuture = AsyncPromise.uncancellable();
+ private final AsyncPromise<Void> prepareFuture = AsyncPromise.uncancellable();
+ private final AsyncPromise<Void> finalizeProposeFuture = AsyncPromise.uncancellable();
private volatile long sessionStart = Long.MIN_VALUE;
private volatile long repairStart = Long.MIN_VALUE;
@@ -148,7 +144,7 @@ public class CoordinatorSession extends ConsistentSession
MessagingService.instance().send(message, destination);
}
- public Future<Boolean> prepare()
+ public Future<Void> prepare()
{
Preconditions.checkArgument(allStates(State.PREPARING));
@@ -188,12 +184,11 @@ public class CoordinatorSession extends ConsistentSession
if (getState() == State.PREPARED)
{
logger.info("Incremental repair session {} successfully prepared.", sessionID);
- prepareFuture.trySuccess(true);
+ prepareFuture.trySuccess(null);
}
else
{
fail();
- prepareFuture.trySuccess(false);
}
}
@@ -202,7 +197,7 @@ public class CoordinatorSession extends ConsistentSession
setAll(State.REPAIRING);
}
- public synchronized Future<Boolean> finalizePropose()
+ public synchronized Future<Void> finalizePropose()
{
Preconditions.checkArgument(allStates(State.REPAIRING));
logger.info("Proposing finalization of repair session {}", sessionID);
@@ -224,7 +219,6 @@ public class CoordinatorSession extends ConsistentSession
{
logger.warn("Finalization proposal of session {} rejected by {}. Aborting session", sessionID, participant);
fail();
- finalizeProposeFuture.trySuccess(false);
}
else
{
@@ -233,7 +227,7 @@ public class CoordinatorSession extends ConsistentSession
if (getState() == State.FINALIZE_PROMISED)
{
logger.info("Finalization proposal for repair session {} accepted by all participants.", sessionID);
- finalizeProposeFuture.trySuccess(true);
+ finalizeProposeFuture.trySuccess(null);
}
}
}
@@ -287,103 +281,51 @@ public class CoordinatorSession extends ConsistentSession
/**
* Runs the asynchronous consistent repair session. Actual repair sessions are scheduled via a submitter to make unit testing easier
*/
- public Future execute(Supplier<Future<List<RepairSessionResult>>> sessionSubmitter, AtomicBoolean hasFailure)
+ public Future<CoordinatedRepairResult> execute(Supplier<Future<CoordinatedRepairResult>> sessionSubmitter)
{
logger.info("Beginning coordination of incremental repair session {}", sessionID);
sessionStart = currentTimeMillis();
- Future<Boolean> prepareResult = prepare();
+ Future<Void> prepareResult = prepare();
// run repair sessions normally
- Future<List<RepairSessionResult>> repairSessionResults = prepareResult.andThenAsync(success ->
- {
- if (success)
- {
- repairStart = currentTimeMillis();
- if (logger.isDebugEnabled())
- {
- logger.debug("Incremental repair {} prepare phase completed in {}", sessionID, formatDuration(sessionStart, repairStart));
- }
- setRepairing();
- return sessionSubmitter.get();
- }
- else
- {
- return ImmediateFuture.success(null);
- }
+ Future<CoordinatedRepairResult> repairSessionResults = prepareResult.flatMap(ignore -> {
+ repairStart = currentTimeMillis();
+ if (logger.isDebugEnabled())
+ logger.debug("Incremental repair {} prepare phase completed in {}", sessionID, formatDuration(sessionStart, repairStart));
+ setRepairing();
+ return sessionSubmitter.get();
});
- // mark propose finalization
- Future<Boolean> proposeFuture = repairSessionResults.andThenAsync(results ->
- {
- if (results == null || results.isEmpty() || Iterables.any(results, r -> r == null))
+ // if any session failed, then fail the future
+ Future<CoordinatedRepairResult> onlySuccessSessionResults = repairSessionResults.map(result -> {
+ finalizeStart = currentTimeMillis();
+ if (result.hasFailed())
{
- finalizeStart = currentTimeMillis();
if (logger.isDebugEnabled())
- {
logger.debug("Incremental repair {} validation/stream phase completed in {}", sessionID, formatDuration(repairStart, finalizeStart));
- }
- return ImmediateFuture.failure(SomeRepairFailedException.INSTANCE);
- }
- else
- {
- return finalizePropose();
+ throw SomeRepairFailedException.INSTANCE;
}
+ return result;
});
- // return execution result as set by following callback
- Promise<Boolean> resultFuture = AsyncPromise.uncancellable();
-
- // commit repaired data
- proposeFuture.addCallback(new FutureCallback<Boolean>()
- {
- public void onSuccess(@Nullable Boolean result)
- {
- try
- {
- if (result != null && result)
- {
- if (logger.isDebugEnabled())
- {
- logger.debug("Incremental repair {} finalization phase completed in {}", sessionID, formatDuration(finalizeStart, currentTimeMillis()));
- }
- finalizeCommit();
- if (logger.isDebugEnabled())
- {
- logger.debug("Incremental repair {} phase completed in {}", sessionID, formatDuration(sessionStart, currentTimeMillis()));
- }
- }
- else
- {
- hasFailure.set(true);
- fail();
- }
- resultFuture.trySuccess(result);
- }
- catch (Exception e)
- {
- resultFuture.tryFailure(e);
- }
- }
-
- public void onFailure(Throwable t)
+ // mark propose finalization and commit
+ Future<CoordinatedRepairResult> proposeFuture = onlySuccessSessionResults.flatMap(results -> finalizePropose().map(ignore -> {
+ if (logger.isDebugEnabled())
+ logger.debug("Incremental repair {} finalization phase completed in {}", sessionID, formatDuration(finalizeStart, currentTimeMillis()));
+ finalizeCommit();
+ if (logger.isDebugEnabled())
+ logger.debug("Incremental repair {} phase completed in {}", sessionID, formatDuration(sessionStart, currentTimeMillis()));
+ return results;
+ }));
+
+ return proposeFuture.addCallback((ignore, failure) -> {
+ if (failure != null)
{
- try
- {
- if (logger.isDebugEnabled())
- {
- logger.debug("Incremental repair {} phase failed in {}", sessionID, formatDuration(sessionStart, currentTimeMillis()));
- }
- hasFailure.set(true);
- fail();
- }
- finally
- {
- resultFuture.tryFailure(t);
- }
+ if (logger.isDebugEnabled())
+ logger.debug("Incremental repair {} phase failed in {}", sessionID, formatDuration(sessionStart, currentTimeMillis()));
+ fail();
}
- });
-
- return resultFuture;
+ }, ImmediateExecutor.INSTANCE);
}
}
diff --git a/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java b/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java
index b145fb6..249d1a4 100644
--- a/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java
+++ b/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import com.google.common.collect.Lists;
@@ -179,11 +180,11 @@ public class SyncStatSummary
summaries.get(cf).consumeStats(result.stats);
}
- public void consumeSessionResults(List<RepairSessionResult> results)
+ public void consumeSessionResults(Optional<List<RepairSessionResult>> results)
{
- if (results != null)
+ if (results.isPresent())
{
- filter(results, Objects::nonNull).forEach(r -> filter(r.repairJobResults, Objects::nonNull).forEach(this::consumeRepairResult));
+ filter(results.get(), Objects::nonNull).forEach(r -> filter(r.repairJobResults, Objects::nonNull).forEach(this::consumeRepairResult));
}
}
diff --git a/src/java/org/apache/cassandra/security/CipherFactory.java b/src/java/org/apache/cassandra/security/CipherFactory.java
index 3c13629..c42ebca 100644
--- a/src/java/org/apache/cassandra/security/CipherFactory.java
+++ b/src/java/org/apache/cassandra/security/CipherFactory.java
@@ -40,6 +40,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.FastThreadLocal;
+import org.apache.cassandra.concurrent.ImmediateExecutor;
import org.apache.cassandra.config.TransparentDataEncryptionOptions;
/**
@@ -81,7 +82,7 @@ public class CipherFactory
cache = Caffeine.newBuilder() // by default cache is unbounded
.maximumSize(64) // a value large enough that we should never even get close (so nothing gets evicted)
- .executor(MoreExecutors.directExecutor())
+ .executor(ImmediateExecutor.INSTANCE)
.removalListener((key, value, cause) ->
{
// maybe reload the key? (to avoid the reload being on the user's dime)
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 2532c8b..fbcb745 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -340,17 +340,10 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
LocalSessions.registerListener(session);
// remove session at completion
- session.addListener(new Runnable()
- {
- /**
- * When repair finished, do clean up
- */
- public void run()
- {
- sessions.remove(session.getId());
- LocalSessions.unregisterListener(session);
- }
- }, MoreExecutors.directExecutor());
+ session.addListener(() -> {
+ sessions.remove(session.getId());
+ LocalSessions.unregisterListener(session);
+ });
session.start(executor);
return session;
}
diff --git a/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java b/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java
index b8944f9..86e3c12 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java
@@ -280,7 +280,14 @@ public abstract class AbstractFuture<V> implements Future<V>
@Override
public AbstractFuture<V> addCallback(BiConsumer<? super V, Throwable> callback)
{
- appendListener(new CallbackBiConsumerListener<>(this, callback));
+ appendListener(new CallbackBiConsumerListener<>(this, callback, null));
+ return this;
+ }
+
+ @Override
+ public Future<V> addCallback(BiConsumer<? super V, Throwable> callback, Executor executor)
+ {
+ appendListener(new CallbackBiConsumerListener<>(this, callback, executor));
return this;
}
@@ -305,19 +312,42 @@ public abstract class AbstractFuture<V> implements Future<V>
@Override
public AbstractFuture<V> addCallback(Consumer<? super V> onSuccess, Consumer<? super Throwable> onFailure)
{
- appendListener(new CallbackLambdaListener<>(this, onSuccess, onFailure));
+ appendListener(new CallbackLambdaListener<>(this, onSuccess, onFailure, null));
return this;
}
/**
- * Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively
+ * Support more fluid version of {@link com.google.common.util.concurrent.Futures#addCallback}
*
* See {@link #addListener(GenericFutureListener)} for ordering semantics.
*/
@Override
- public <T> Future<T> andThenAsync(Function<? super V, ? extends Future<T>> andThen)
+ public AbstractFuture<V> addCallback(Consumer<? super V> onSuccess, Consumer<? super Throwable> onFailure, Executor executor)
{
- return andThenAsync(andThen, null);
+ appendListener(new CallbackLambdaListener<>(this, onSuccess, onFailure, executor));
+ return this;
+ }
+
+ /**
+ * Support {@link com.google.common.util.concurrent.Futures#transform(ListenableFuture, com.google.common.base.Function, Executor)} natively
+ *
+ * See {@link #addListener(GenericFutureListener)} for ordering semantics.
+ */
+ protected <T> Future<T> map(AbstractFuture<T> result, Function<? super V, ? extends T> mapper, @Nullable Executor executor)
+ {
+ addListener(() -> {
+ try
+ {
+ if (isSuccess()) result.trySet(mapper.apply(getNow()));
+ else result.tryFailure(cause());
+ }
+ catch (Throwable t)
+ {
+ result.tryFailure(t);
+ throw t;
+ }
+ }, executor);
+ return result;
}
/**
@@ -325,12 +355,12 @@ public abstract class AbstractFuture<V> implements Future<V>
*
* See {@link #addListener(GenericFutureListener)} for ordering semantics.
*/
- protected <T> Future<T> andThenAsync(AbstractFuture<T> result, Function<? super V, ? extends Future<T>> andThen, @Nullable Executor executor)
+ protected <T> Future<T> flatMap(AbstractFuture<T> result, Function<? super V, ? extends Future<T>> flatMapper, @Nullable Executor executor)
{
addListener(() -> {
try
{
- if (isSuccess()) andThen.apply(getNow()).addListener(propagate(result));
+ if (isSuccess()) flatMapper.apply(getNow()).addListener(propagate(result));
else result.tryFailure(cause());
}
catch (Throwable t)
diff --git a/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java b/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java
index a7b7a6a..b09eeb7 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java
@@ -123,14 +123,25 @@ public class AsyncFuture<V> extends AbstractFuture<V>
}
/**
+ * Support {@link com.google.common.util.concurrent.Futures#transform(ListenableFuture, com.google.common.base.Function, Executor)} natively
+ *
+ * See {@link #addListener(GenericFutureListener)} for ordering semantics.
+ */
+ @Override
+ public <T> Future<T> map(Function<? super V, ? extends T> mapper, Executor executor)
+ {
+ return map(new AsyncFuture<>(), mapper, executor);
+ }
+
+ /**
* Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively
*
* See {@link #addListener(GenericFutureListener)} for ordering semantics.
*/
@Override
- public <T> Future<T> andThenAsync(Function<? super V, ? extends Future<T>> andThen, @Nullable Executor executor)
+ public <T> Future<T> flatMap(Function<? super V, ? extends Future<T>> flatMapper, @Nullable Executor executor)
{
- return andThenAsync(new AsyncFuture<>(), andThen, executor);
+ return flatMap(new AsyncFuture<>(), flatMapper, executor);
}
/**
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Future.java b/src/java/org/apache/cassandra/utils/concurrent/Future.java
index 22b15cc..69dc83d 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Future.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Future.java
@@ -118,6 +118,11 @@ public interface Future<V> extends io.netty.util.concurrent.Future<V>, Listenabl
/**
* Support {@link com.google.common.util.concurrent.Futures#addCallback} natively
*/
+ Future<V> addCallback(BiConsumer<? super V, Throwable> callback, Executor executor);
+
+ /**
+ * Support {@link com.google.common.util.concurrent.Futures#addCallback} natively
+ */
Future<V> addCallback(FutureCallback<? super V> callback);
/**
@@ -131,14 +136,35 @@ public interface Future<V> extends io.netty.util.concurrent.Future<V>, Listenabl
Future<V> addCallback(Consumer<? super V> onSuccess, Consumer<? super Throwable> onFailure);
/**
+ * Support {@link com.google.common.util.concurrent.Futures#addCallback} natively
+ */
+ Future<V> addCallback(Consumer<? super V> onSuccess, Consumer<? super Throwable> onFailure, Executor executor);
+
+ /**
+ * Support {@link com.google.common.util.concurrent.Futures#transform(ListenableFuture, com.google.common.base.Function, Executor)} natively
+ */
+ default <T> Future<T> map(Function<? super V, ? extends T> mapper)
+ {
+ return map(mapper, null);
+ }
+
+ /**
+ * Support {@link com.google.common.util.concurrent.Futures#transform(ListenableFuture, com.google.common.base.Function, Executor)} natively
+ */
+ <T> Future<T> map(Function<? super V, ? extends T> mapper, Executor executor);
+
+ /**
* Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively
*/
- <T> Future<T> andThenAsync(Function<? super V, ? extends Future<T>> andThen);
+ default <T> Future<T> flatMap(Function<? super V, ? extends Future<T>> flatMapper)
+ {
+ return flatMap(flatMapper, null);
+ }
/**
* Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively
*/
- <T> Future<T> andThenAsync(Function<? super V, ? extends Future<T>> andThen, Executor executor);
+ <T> Future<T> flatMap(Function<? super V, ? extends Future<T>> flatMapper, Executor executor);
/**
* Invoke {@code runnable} on completion, using {@code executor}.
diff --git a/src/java/org/apache/cassandra/utils/concurrent/ListenerList.java b/src/java/org/apache/cassandra/utils/concurrent/ListenerList.java
index c150ea2..57737ea 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/ListenerList.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/ListenerList.java
@@ -22,7 +22,6 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
-
import javax.annotation.Nullable;
import com.google.common.util.concurrent.FutureCallback;
@@ -151,7 +150,7 @@ abstract class ListenerList<V> extends IntrusiveStack<ListenerList<V>>
static <F extends io.netty.util.concurrent.Future<?>> void notifyListener(Executor notifyExecutor, GenericFutureListener<F> listener, F future)
{
if (notifyExecutor == null) notifyListener(listener, future);
- else notifyExecutor.execute(() -> notifyListener(listener, future));
+ else safeExecute(notifyExecutor, () -> notifyListener(listener, future));
}
/**
@@ -159,8 +158,22 @@ abstract class ListenerList<V> extends IntrusiveStack<ListenerList<V>>
*/
static void notifyListener(@Nullable Executor notifyExecutor, Runnable listener)
{
- if (notifyExecutor == null) ImmediateExecutor.INSTANCE.execute(listener);
- else notifyExecutor.execute(listener);
+ safeExecute(notifyExecutor, listener);
+ }
+
+ private static void safeExecute(@Nullable Executor notifyExecutor, Runnable runnable)
+ {
+ if (notifyExecutor == null)
+ notifyExecutor = ImmediateExecutor.INSTANCE;
+ try
+ {
+ notifyExecutor.execute(runnable);
+ }
+ catch (Exception | Error e)
+ {
+ // TODO: suboptimal package interdependency - move FutureTask etc here?
+ ExecutionFailure.handle(e);
+ }
}
/**
@@ -219,11 +232,13 @@ abstract class ListenerList<V> extends IntrusiveStack<ListenerList<V>>
{
final Future<V> future;
final BiConsumer<? super V, Throwable> callback;
+ final Executor executor;
- CallbackBiConsumerListener(Future<V> future, BiConsumer<? super V, Throwable> callback)
+ CallbackBiConsumerListener(Future<V> future, BiConsumer<? super V, Throwable> callback, Executor executor)
{
this.future = future;
this.callback = callback;
+ this.executor = executor;
}
@Override
@@ -236,7 +251,7 @@ abstract class ListenerList<V> extends IntrusiveStack<ListenerList<V>>
@Override
void notifySelf(Executor notifyExecutor, Future<V> future)
{
- notifyListener(notifyExecutor, this);
+ notifyListener(executor == null ? notifyExecutor : executor, this);
}
}
@@ -269,12 +284,14 @@ abstract class ListenerList<V> extends IntrusiveStack<ListenerList<V>>
final Future<V> future;
final Consumer<? super V> onSuccess;
final Consumer<? super Throwable> onFailure;
+ final Executor executor;
- CallbackLambdaListener(Future<V> future, Consumer<? super V> onSuccess, Consumer<? super Throwable> onFailure)
+ CallbackLambdaListener(Future<V> future, Consumer<? super V> onSuccess, Consumer<? super Throwable> onFailure, Executor executor)
{
this.future = future;
this.onSuccess = onSuccess;
this.onFailure = onFailure;
+ this.executor = executor;
}
@Override
@@ -287,7 +304,7 @@ abstract class ListenerList<V> extends IntrusiveStack<ListenerList<V>>
@Override
void notifySelf(Executor notifyExecutor, Future future)
{
- notifyListener(notifyExecutor, this);
+ notifyListener(executor == null ? notifyExecutor : executor, this);
}
}
diff --git a/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java b/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java
index 9635969..43648c0 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java
@@ -90,14 +90,25 @@ public class SyncFuture<V> extends AbstractFuture<V>
}
/**
+ * Support {@link com.google.common.util.concurrent.Futures#transform(ListenableFuture, com.google.common.base.Function, Executor)} natively
+ *
+ * See {@link #addListener(GenericFutureListener)} for ordering semantics.
+ */
+ @Override
+ public <T> Future<T> map(Function<? super V, ? extends T> mapper, Executor executor)
+ {
+ return map(new SyncFuture<>(), mapper, executor);
+ }
+
+ /**
* Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively
*
* See {@link #addListener(GenericFutureListener)} for ordering semantics.
*/
@Override
- public <T> Future<T> andThenAsync(Function<? super V, ? extends Future<T>> andThen, @Nullable Executor executor)
+ public <T> Future<T> flatMap(Function<? super V, ? extends Future<T>> flatMapper, @Nullable Executor executor)
{
- return andThenAsync(new SyncFuture<>(), andThen, executor);
+ return flatMap(new SyncFuture<>(), flatMapper, executor);
}
/**
diff --git a/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java b/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java
index baada12..0bf9b9f 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java
@@ -77,8 +77,10 @@ public abstract class AbstractNetstatsBootstrapStreaming extends AbstractNetstat
final Future<?> startupRunnable = executorService.submit((Runnable) secondNode::startup);
final Future<AbstractNetstatsStreaming.NetstatResults> netstatsFuture = executorService.submit(new NetstatsCallable(cluster.get(1)));
+ startupRunnable.get(3, MINUTES);
+ // 1m is a bit much, but should be fine on slower environments. Node2 can't come up without streaming
+ // completing, so if node2 is up 1m is enough time for the nodetool watcher to yield
final AbstractNetstatsStreaming.NetstatResults results = netstatsFuture.get(1, MINUTES);
- startupRunnable.get(2, MINUTES);
results.assertSuccessful();
diff --git a/test/distributed/org/apache/cassandra/distributed/test/repair/ForceRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/repair/ForceRepairTest.java
new file mode 100644
index 0000000..479dac3
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/repair/ForceRepairTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.repair;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.junit.Test;
+
+import com.carrotsearch.hppc.LongArrayList;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.QueryResults;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.shared.AssertUtils;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.assertj.core.api.Assertions;
+
+public class ForceRepairTest extends TestBaseImpl
+{
+ /**
+ * Port of python dtest "repair_tests/incremental_repair_test.py::TestIncRepair::test_force" but extends to test
+ * all types of repair.
+ */
+ @Test
+ public void force() throws IOException
+ {
+ force(false);
+ }
+
+ @Test
+ public void forceWithDifference() throws IOException
+ {
+ force(true);
+ }
+
+ private void force(boolean includeDifference) throws IOException
+ {
+ long nowInMicro = System.currentTimeMillis() * 1000;
+ try (Cluster cluster = Cluster.build(3)
+ .withConfig(c -> c.set("hinted_handoff_enabled", false)
+ .with(Feature.values()))
+ .start())
+ {
+ init(cluster);
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (k INT PRIMARY KEY, v INT)"));
+
+ for (int i = 0; i < 10; i++)
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (k,v) VALUES (?, ?) USING TIMESTAMP ?"), ConsistencyLevel.ALL, i, i, nowInMicro++);
+
+ ClusterUtils.stopUnchecked(cluster.get(2));
+
+ // repair should fail because node2 is down
+ IInvokableInstance node1 = cluster.get(1);
+
+ for (String[] args : Arrays.asList(new String[]{ "--full" },
+ new String[]{ "--full", "--preview" },
+ new String[]{ "--full", "--validate"}, // nothing should be in the repaired set, so shouldn't stream
+ new String[]{ "--preview" }, // IR Preview
+ new String[]{ "--validate"}, // nothing should be in the repaired set, so shouldn't stream
+ new String[0])) // IR
+ {
+ if (includeDifference)
+ node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (k,v) VALUES (?, ?) USING TIMESTAMP ?"), -1, -1, nowInMicro++); // each loop should have a different timestamp, causing a new difference
+
+ try
+ {
+ node1.nodetoolResult(ArrayUtils.addAll(new String[] {"repair", KEYSPACE}, args)).asserts().failure();
+ node1.nodetoolResult(ArrayUtils.addAll(new String[] {"repair", KEYSPACE, "--force"}, args)).asserts().success();
+
+ assertNoRepairedAt(cluster);
+ }
+ catch (Exception | Error e)
+ {
+ // tag the error to include which args broke
+ e.addSuppressed(new AssertionError("Failure for args: " + Arrays.toString(args)));
+ throw e;
+ }
+ }
+
+ if (includeDifference)
+ {
+ SimpleQueryResult expected = QueryResults.builder()
+ .row(-1, -1)
+ .build();
+ for (IInvokableInstance node : Arrays.asList(node1, cluster.get(3)))
+ {
+ SimpleQueryResult results = node.executeInternalWithResult(withKeyspace("SELECT * FROM %s.tbl WHERE k=?"), -1);
+ expected.reset();
+ AssertUtils.assertRows(results, expected);
+ }
+ }
+ }
+ }
+
+ private static void assertNoRepairedAt(Cluster cluster)
+ {
+ List<long[]> repairedAt = getRepairedAt(cluster, KEYSPACE, "tbl");
+ Assertions.assertThat(repairedAt).hasSize(cluster.size());
+ for (int i = 0; i < repairedAt.size(); i++)
+ {
+ long[] array = repairedAt.get(i);
+ if (array == null)
+ {
+ // ignore downed nodes
+ Assertions.assertThat(cluster.get(i + 1).isShutdown()).isTrue();
+ continue;
+ }
+ Assertions.assertThat(array).isNotEmpty();
+ for (long a : array)
+ Assertions.assertThat(a).describedAs("node%d had a repaired sstable", i + 1).isEqualTo(0);
+ }
+ }
+
+ private static List<long[]> getRepairedAt(Cluster cluster, String keyspace, String table)
+ {
+ return cluster.stream().map(i -> {
+ if (i.isShutdown())
+ return null;
+
+ return i.callOnInstance(() -> {
+ TableMetadata meta = Schema.instance.getTableMetadata(keyspace, table);
+ ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(meta.id);
+
+ View view = cfs.getTracker().getView();
+ LongArrayList list = new LongArrayList();
+ for (SSTableReader sstable : view.liveSSTables())
+ {
+ try
+ {
+ StatsMetadata metadata = sstable.getSSTableMetadata();
+ list.add(metadata.repairedAt);
+ }
+ catch (Exception e)
+ {
+ // ignore
+ }
+ }
+ return list.toArray();
+ });
+ }).collect(Collectors.toList());
+ }
+}
diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
index ef08022..2bfd4dc 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.repair.consistent;
import java.util.Collection;
import java.util.Collections;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@@ -31,6 +30,8 @@ import java.util.function.Function;
import java.util.function.Supplier;
import com.google.common.collect.Lists;
+
+import org.apache.cassandra.repair.CoordinatedRepairResult;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.Promise;
@@ -106,29 +107,27 @@ public class CoordinatorMessagingTest extends AbstractRepairTest
UUID uuid = registerSession(cfs, true, true);
CoordinatorSession coordinator = ActiveRepairService.instance.consistent.coordinated.registerSession(uuid, PARTICIPANTS, false);
AtomicBoolean repairSubmitted = new AtomicBoolean(false);
- Promise<List<RepairSessionResult>> repairFuture = AsyncPromise.uncancellable();
- Supplier<Future<List<RepairSessionResult>>> sessionSupplier = () ->
+ Promise<CoordinatedRepairResult> repairFuture = AsyncPromise.uncancellable();
+ Supplier<Future<CoordinatedRepairResult>> sessionSupplier = () ->
{
repairSubmitted.set(true);
return repairFuture;
};
// coordinator sends prepare requests to create local session and perform anticompaction
- AtomicBoolean hasFailures = new AtomicBoolean(false);
Assert.assertFalse(repairSubmitted.get());
// execute repair and start prepare phase
- Future<Boolean> sessionResult = coordinator.execute(sessionSupplier, hasFailures);
+ Future<CoordinatedRepairResult> sessionResult = coordinator.execute(sessionSupplier);
Assert.assertFalse(sessionResult.isDone());
- Assert.assertFalse(hasFailures.get());
+
// prepare completed
prepareLatch.countDown();
spyPrepare.interceptMessageOut(3).get(1, TimeUnit.SECONDS);
Assert.assertFalse(sessionResult.isDone());
- Assert.assertFalse(hasFailures.get());
// set result from local repair session
- repairFuture.trySuccess(Lists.newArrayList(createResult(coordinator), createResult(coordinator), createResult(coordinator)));
+ repairFuture.trySuccess(CoordinatedRepairResult.success(Lists.newArrayList(createResult(coordinator), createResult(coordinator), createResult(coordinator))));
// finalize phase
finalizeLatch.countDown();
@@ -136,8 +135,7 @@ public class CoordinatorMessagingTest extends AbstractRepairTest
// commit phase
spyCommit.interceptMessageOut(3).get(1, TimeUnit.SECONDS);
- Assert.assertTrue(sessionResult.get());
- Assert.assertFalse(hasFailures.get());
+ Assert.assertFalse(sessionResult.get().hasFailed());
// expect no other messages except from intercepted so far
spyPrepare.interceptNoMsg(100, TimeUnit.MILLISECONDS);
@@ -197,19 +195,18 @@ public class CoordinatorMessagingTest extends AbstractRepairTest
UUID uuid = registerSession(cfs, true, true);
CoordinatorSession coordinator = ActiveRepairService.instance.consistent.coordinated.registerSession(uuid, PARTICIPANTS, false);
AtomicBoolean repairSubmitted = new AtomicBoolean(false);
- Promise<List<RepairSessionResult>> repairFuture = AsyncPromise.uncancellable();
- Supplier<Future<List<RepairSessionResult>>> sessionSupplier = () ->
+ Promise<CoordinatedRepairResult> repairFuture = AsyncPromise.uncancellable();
+ Supplier<Future<CoordinatedRepairResult>> sessionSupplier = () ->
{
repairSubmitted.set(true);
return repairFuture;
};
// coordinator sends prepare requests to create local session and perform anticompaction
- AtomicBoolean proposeFailed = new AtomicBoolean(false);
Assert.assertFalse(repairSubmitted.get());
// execute repair and start prepare phase
- Future<Boolean> sessionResult = coordinator.execute(sessionSupplier, proposeFailed);
+ Future<CoordinatedRepairResult> sessionResult = coordinator.execute(sessionSupplier);
prepareLatch.countDown();
// prepare completed
try
@@ -222,7 +219,8 @@ public class CoordinatorMessagingTest extends AbstractRepairTest
}
sendFailSessionExpectedSpy.interceptMessageOut(3).get(1, TimeUnit.SECONDS);
Assert.assertFalse(repairSubmitted.get());
- Assert.assertTrue(proposeFailed.get());
+ Assert.assertTrue(sessionResult.isDone());
+ Assert.assertNotNull(sessionResult.cause());
Assert.assertEquals(ConsistentSession.State.FAILED, coordinator.getState());
Assert.assertFalse(ActiveRepairService.instance.consistent.local.isSessionInProgress(uuid));
}
@@ -236,19 +234,18 @@ public class CoordinatorMessagingTest extends AbstractRepairTest
UUID uuid = registerSession(cfs, true, true);
CoordinatorSession coordinator = ActiveRepairService.instance.consistent.coordinated.registerSession(uuid, PARTICIPANTS, false);
AtomicBoolean repairSubmitted = new AtomicBoolean(false);
- Promise<List<RepairSessionResult>> repairFuture = AsyncPromise.uncancellable();
- Supplier<Future<List<RepairSessionResult>>> sessionSupplier = () ->
+ Promise<CoordinatedRepairResult> repairFuture = AsyncPromise.uncancellable();
+ Supplier<Future<CoordinatedRepairResult>> sessionSupplier = () ->
{
repairSubmitted.set(true);
return repairFuture;
};
// coordinator sends prepare requests to create local session and perform anticompaction
- AtomicBoolean hasFailures = new AtomicBoolean(false);
Assert.assertFalse(repairSubmitted.get());
// execute repair and start prepare phase
- Future<Boolean> sessionResult = coordinator.execute(sessionSupplier, hasFailures);
+ Future<CoordinatedRepairResult> sessionResult = coordinator.execute(sessionSupplier);
try
{
sessionResult.get(1, TimeUnit.SECONDS);
@@ -266,7 +263,6 @@ public class CoordinatorMessagingTest extends AbstractRepairTest
spyPrepare.expectMockedMessage(2).get(100, TimeUnit.MILLISECONDS);
sendFailSessionUnexpectedSpy.interceptNoMsg(100, TimeUnit.MILLISECONDS);
Assert.assertFalse(repairSubmitted.get());
- Assert.assertFalse(hasFailures.get());
Assert.assertEquals(ConsistentSession.State.PREPARING, coordinator.getState());
Assert.assertFalse(ActiveRepairService.instance.consistent.local.isSessionInProgress(uuid));
}
diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
index 42453d4..0421ad3 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
@@ -19,6 +19,8 @@
package org.apache.cassandra.repair.consistent;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -28,16 +30,15 @@ import java.util.function.Supplier;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-
-import org.apache.cassandra.utils.concurrent.AsyncPromise;
-import org.apache.cassandra.utils.concurrent.Future;
-import org.apache.cassandra.utils.concurrent.Promise;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.repair.AbstractRepairTest;
+import org.apache.cassandra.repair.CoordinatedRepairResult;
import org.apache.cassandra.repair.RepairSessionResult;
import org.apache.cassandra.repair.messages.FailSession;
import org.apache.cassandra.repair.messages.FinalizeCommit;
@@ -45,8 +46,15 @@ import org.apache.cassandra.repair.messages.FinalizePropose;
import org.apache.cassandra.repair.messages.PrepareConsistentRequest;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.Promise;
-import static org.apache.cassandra.repair.consistent.ConsistentSession.State.*;
+import static org.apache.cassandra.repair.consistent.ConsistentSession.State.FAILED;
+import static org.apache.cassandra.repair.consistent.ConsistentSession.State.FINALIZE_PROMISED;
+import static org.apache.cassandra.repair.consistent.ConsistentSession.State.PREPARED;
+import static org.apache.cassandra.repair.consistent.ConsistentSession.State.PREPARING;
+import static org.apache.cassandra.repair.consistent.ConsistentSession.State.REPAIRING;
public class CoordinatorSessionTest extends AbstractRepairTest
{
@@ -210,18 +218,17 @@ public class CoordinatorSessionTest extends AbstractRepairTest
{
InstrumentedCoordinatorSession coordinator = createInstrumentedSession();
AtomicBoolean repairSubmitted = new AtomicBoolean(false);
- Promise<List<RepairSessionResult>> repairFuture = AsyncPromise.uncancellable();
- Supplier<Future<List<RepairSessionResult>>> sessionSupplier = () ->
+ Promise<CoordinatedRepairResult> repairFuture = AsyncPromise.uncancellable();
+ Supplier<Future<CoordinatedRepairResult>> sessionSupplier = () ->
{
repairSubmitted.set(true);
return repairFuture;
};
// coordinator sends prepare requests to create local session and perform anticompaction
- AtomicBoolean hasFailures = new AtomicBoolean(false);
Assert.assertFalse(repairSubmitted.get());
Assert.assertTrue(coordinator.sentMessages.isEmpty());
- Future sessionResult = coordinator.execute(sessionSupplier, hasFailures);
+ Future<CoordinatedRepairResult> sessionResult = coordinator.execute(sessionSupplier);
for (InetAddressAndPort participant : PARTICIPANTS)
{
@@ -253,7 +260,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
createResult(coordinator));
coordinator.sentMessages.clear();
- repairFuture.trySuccess(results);
+ repairFuture.trySuccess(CoordinatedRepairResult.success(results));
// propose messages should have been sent once all repair sessions completed successfully
for (InetAddressAndPort participant : PARTICIPANTS)
@@ -286,7 +293,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
}
Assert.assertTrue(sessionResult.isDone());
- Assert.assertFalse(hasFailures.get());
+ sessionResult.syncUninterruptibly();
}
@Test
@@ -294,18 +301,17 @@ public class CoordinatorSessionTest extends AbstractRepairTest
{
InstrumentedCoordinatorSession coordinator = createInstrumentedSession();
AtomicBoolean repairSubmitted = new AtomicBoolean(false);
- Promise<List<RepairSessionResult>> repairFuture = AsyncPromise.uncancellable();
- Supplier<Future<List<RepairSessionResult>>> sessionSupplier = () ->
+ Promise<CoordinatedRepairResult> repairFuture = AsyncPromise.uncancellable();
+ Supplier<Future<CoordinatedRepairResult>> sessionSupplier = () ->
{
repairSubmitted.set(true);
return repairFuture;
};
// coordinator sends prepare requests to create local session and perform anticompaction
- AtomicBoolean hasFailures = new AtomicBoolean(false);
Assert.assertFalse(repairSubmitted.get());
Assert.assertTrue(coordinator.sentMessages.isEmpty());
- Future sessionResult = coordinator.execute(sessionSupplier, hasFailures);
+ Future<CoordinatedRepairResult> sessionResult = coordinator.execute(sessionSupplier);
for (InetAddressAndPort participant : PARTICIPANTS)
{
PrepareConsistentRequest expected = new PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new HashSet<>(PARTICIPANTS));
@@ -330,6 +336,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
Assert.assertEquals(ConsistentSession.State.REPAIRING, coordinator.getState());
+ List<Collection<Range<Token>>> ranges = Arrays.asList(coordinator.ranges, coordinator.ranges, coordinator.ranges);
ArrayList<RepairSessionResult> results = Lists.newArrayList(createResult(coordinator),
null,
createResult(coordinator));
@@ -337,7 +344,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
coordinator.sentMessages.clear();
Assert.assertFalse(coordinator.failCalled);
coordinator.onFail = () -> Assert.assertEquals(REPAIRING, coordinator.getState());
- repairFuture.trySuccess(results);
+ repairFuture.trySuccess(CoordinatedRepairResult.create(ranges, results));
Assert.assertTrue(coordinator.failCalled);
// all participants should have been notified of session failure
@@ -348,7 +355,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
}
Assert.assertTrue(sessionResult.isDone());
- Assert.assertTrue(hasFailures.get());
+ Assert.assertNotNull(sessionResult.cause());
}
@Test
@@ -356,18 +363,17 @@ public class CoordinatorSessionTest extends AbstractRepairTest
{
InstrumentedCoordinatorSession coordinator = createInstrumentedSession();
AtomicBoolean repairSubmitted = new AtomicBoolean(false);
- Promise<List<RepairSessionResult>> repairFuture = AsyncPromise.uncancellable();
- Supplier<Future<List<RepairSessionResult>>> sessionSupplier = () ->
+ Promise<CoordinatedRepairResult> repairFuture = AsyncPromise.uncancellable();
+ Supplier<Future<CoordinatedRepairResult>> sessionSupplier = () ->
{
repairSubmitted.set(true);
return repairFuture;
};
// coordinator sends prepare requests to create local session and perform anticompaction
- AtomicBoolean hasFailures = new AtomicBoolean(false);
Assert.assertFalse(repairSubmitted.get());
Assert.assertTrue(coordinator.sentMessages.isEmpty());
- Future sessionResult = coordinator.execute(sessionSupplier, hasFailures);
+ Future<CoordinatedRepairResult> sessionResult = coordinator.execute(sessionSupplier);
for (InetAddressAndPort participant : PARTICIPANTS)
{
PrepareConsistentRequest expected = new PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new HashSet<>(PARTICIPANTS));
@@ -414,7 +420,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
Assert.assertFalse(coordinator.sentMessages.containsKey(PARTICIPANT2));
Assert.assertTrue(sessionResult.isDone());
- Assert.assertTrue(hasFailures.get());
+ Assert.assertNotNull(sessionResult.cause());
}
@Test
@@ -422,18 +428,17 @@ public class CoordinatorSessionTest extends AbstractRepairTest
{
InstrumentedCoordinatorSession coordinator = createInstrumentedSession();
AtomicBoolean repairSubmitted = new AtomicBoolean(false);
- Promise<List<RepairSessionResult>> repairFuture = AsyncPromise.uncancellable();
- Supplier<Future<List<RepairSessionResult>>> sessionSupplier = () ->
+ Promise<CoordinatedRepairResult> repairFuture = AsyncPromise.uncancellable();
+ Supplier<Future<CoordinatedRepairResult>> sessionSupplier = () ->
{
repairSubmitted.set(true);
return repairFuture;
};
// coordinator sends prepare requests to create local session and perform anticompaction
- AtomicBoolean hasFailures = new AtomicBoolean(false);
Assert.assertFalse(repairSubmitted.get());
Assert.assertTrue(coordinator.sentMessages.isEmpty());
- Future sessionResult = coordinator.execute(sessionSupplier, hasFailures);
+ Future<CoordinatedRepairResult> sessionResult = coordinator.execute(sessionSupplier);
for (InetAddressAndPort participant : PARTICIPANTS)
{
@@ -465,7 +470,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
createResult(coordinator));
coordinator.sentMessages.clear();
- repairFuture.trySuccess(results);
+ repairFuture.trySuccess(CoordinatedRepairResult.success(results));
// propose messages should have been sent once all repair sessions completed successfully
for (InetAddressAndPort participant : PARTICIPANTS)
@@ -501,6 +506,6 @@ public class CoordinatorSessionTest extends AbstractRepairTest
}
Assert.assertTrue(sessionResult.isDone());
- Assert.assertTrue(hasFailures.get());
+ Assert.assertNotNull(sessionResult.cause());
}
}
diff --git a/test/unit/org/apache/cassandra/utils/concurrent/AbstractTestAsyncPromise.java b/test/unit/org/apache/cassandra/utils/concurrent/AbstractTestAsyncPromise.java
index e1c069b..0c297ec 100644
--- a/test/unit/org/apache/cassandra/utils/concurrent/AbstractTestAsyncPromise.java
+++ b/test/unit/org/apache/cassandra/utils/concurrent/AbstractTestAsyncPromise.java
@@ -35,9 +35,15 @@ import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.Assert;
import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.cassandra.config.DatabaseDescriptor;
public abstract class AbstractTestAsyncPromise extends AbstractTestPromise
{
+ static
+ {
+ DatabaseDescriptor.clientInitialization();
+ }
+
public static <V> Promise<V> cancelSuccess(Promise<V> promise)
{
success(promise, Promise::isCancellable, true);
@@ -117,20 +123,30 @@ public abstract class AbstractTestAsyncPromise extends AbstractTestPromise
int id = count++;
return result -> { results.add(result); order.add(id); return ImmediateFuture.success(result); };
}
+ public Function<V, V> getFunction()
+ {
+ int id = count++;
+ return result -> { results.add(result); order.add(id); return result; };
+ }
public Function<V, Future<V>> getRecursiveAsyncFunction(Promise<V> promise)
{
int id = count++;
- return result -> { promise.andThenAsync(getAsyncFunction()); results.add(result); order.add(id); return ImmediateFuture.success(result); };
+ return result -> { promise.flatMap(getAsyncFunction()); results.add(result); order.add(id); return ImmediateFuture.success(result); };
}
public Function<V, Future<V>> getAsyncFailingFunction()
{
int id = count++;
return result -> { results.add(result); order.add(id); return ImmediateFuture.failure(new RuntimeException()); };
}
+ public Function<V, V> getFailingFunction()
+ {
+ int id = count++;
+ return result -> { results.add(result); order.add(id); throw new RuntimeException(); };
+ }
public Function<V, Future<V>> getRecursiveAsyncFailingFunction(Promise<V> promise)
{
int id = count++;
- return result -> { promise.andThenAsync(getAsyncFailingFunction()); results.add(result); order.add(id); return ImmediateFuture.failure(new RuntimeException()); };
+ return result -> { promise.flatMap(getAsyncFailingFunction()); results.add(result); order.add(id); return ImmediateFuture.failure(new RuntimeException()); };
}
public FutureCallback<V> getCallback(Future<V> p)
{
@@ -181,17 +197,26 @@ public abstract class AbstractTestAsyncPromise extends AbstractTestPromise
promise.addListener(listeners.getRecursiveRunnable(promise), MoreExecutors.directExecutor());
promise.addListener(listeners.getRecursive());
promise.addCallback(listeners.getCallback(promise));
+ promise.addCallback(listeners.getCallback(promise), MoreExecutors.directExecutor());
promise.addCallback(listeners.getRecursiveCallback(promise));
+ promise.addCallback(listeners.getRecursiveCallback(promise), MoreExecutors.directExecutor());
promise.addCallback(listeners.getConsumer(), fail -> Assert.fail());
+ promise.addCallback(listeners.getConsumer(), fail -> Assert.fail(), MoreExecutors.directExecutor());
promise.addCallback(listeners.getRecursiveConsumer(), fail -> Assert.fail());
- promise.andThenAsync(listeners.getAsyncFunction()).addListener(listeners.get());
- promise.andThenAsync(listeners.getAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get());
- promise.andThenAsync(listeners.getRecursiveAsyncFunction(promise)).addListener(listeners.get());
- promise.andThenAsync(listeners.getRecursiveAsyncFunction(promise), MoreExecutors.directExecutor()).addListener(listeners.get());
- promise.andThenAsync(listeners.getAsyncFailingFunction()).addListener(listeners.getListenerToFailure(promise));
- promise.andThenAsync(listeners.getAsyncFailingFunction(), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise));
- promise.andThenAsync(listeners.getRecursiveAsyncFailingFunction(promise)).addListener(listeners.getListenerToFailure(promise));
- promise.andThenAsync(listeners.getRecursiveAsyncFailingFunction(promise), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise));
+ promise.addCallback(listeners.getRecursiveConsumer(), fail -> Assert.fail(), MoreExecutors.directExecutor());
+ promise.map(listeners.getFunction()).addListener(listeners.get());
+ promise.map(listeners.getFunction(), MoreExecutors.directExecutor()).addListener(listeners.get());
+ promise.map(listeners.getFailingFunction()).addListener(listeners.getListenerToFailure(promise));
+ promise.map(listeners.getFailingFunction(), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise));
+ promise.flatMap(listeners.getAsyncFunction()).addListener(listeners.get());
+ promise.flatMap(listeners.getAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get());
+ promise.flatMap(listeners.getRecursiveAsyncFunction(promise)).addListener(listeners.get());
+ promise.flatMap(listeners.getRecursiveAsyncFunction(promise), MoreExecutors.directExecutor()).addListener(listeners.get());
+ promise.flatMap(listeners.getAsyncFailingFunction()).addListener(listeners.getListenerToFailure(promise));
+ promise.flatMap(listeners.getAsyncFailingFunction(), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise));
+ promise.flatMap(listeners.getRecursiveAsyncFailingFunction(promise)).addListener(listeners.getListenerToFailure(promise));
+ promise.flatMap(listeners.getRecursiveAsyncFailingFunction(promise), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise));
+
success(promise, Promise::getNow, null);
success(promise, Promise::isSuccess, false);
success(promise, Promise::isDone, false);
@@ -224,17 +249,25 @@ public abstract class AbstractTestAsyncPromise extends AbstractTestPromise
promise.addListener(listeners.getRecursiveRunnable(promise), MoreExecutors.directExecutor());
promise.addListener(listeners.getRecursive());
promise.addCallback(listeners.getCallback(promise));
+ promise.addCallback(listeners.getCallback(promise), MoreExecutors.directExecutor());
promise.addCallback(listeners.getRecursiveCallback(promise));
+ promise.addCallback(listeners.getRecursiveCallback(promise), MoreExecutors.directExecutor());
promise.addCallback(listeners.getConsumer(), fail -> Assert.fail());
+ promise.addCallback(listeners.getConsumer(), fail -> Assert.fail(), MoreExecutors.directExecutor());
promise.addCallback(listeners.getRecursiveConsumer(), fail -> Assert.fail());
- promise.andThenAsync(listeners.getAsyncFunction()).addListener(listeners.get());
- promise.andThenAsync(listeners.getAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get());
- promise.andThenAsync(listeners.getRecursiveAsyncFunction(promise)).addListener(listeners.get());
- promise.andThenAsync(listeners.getRecursiveAsyncFunction(promise), MoreExecutors.directExecutor()).addListener(listeners.get());
- promise.andThenAsync(listeners.getAsyncFailingFunction()).addListener(listeners.getListenerToFailure(promise));
- promise.andThenAsync(listeners.getAsyncFailingFunction(), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise));
- promise.andThenAsync(listeners.getRecursiveAsyncFailingFunction(promise)).addListener(listeners.getListenerToFailure(promise));
- promise.andThenAsync(listeners.getRecursiveAsyncFailingFunction(promise), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise));
+ promise.addCallback(listeners.getRecursiveConsumer(), fail -> Assert.fail(), MoreExecutors.directExecutor());
+ promise.map(listeners.getFunction()).addListener(listeners.get());
+ promise.map(listeners.getFunction(), MoreExecutors.directExecutor()).addListener(listeners.get());
+ promise.map(listeners.getFailingFunction()).addListener(listeners.getListenerToFailure(promise));
+ promise.map(listeners.getFailingFunction(), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise));
+ promise.flatMap(listeners.getAsyncFunction()).addListener(listeners.get());
+ promise.flatMap(listeners.getAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get());
+ promise.flatMap(listeners.getRecursiveAsyncFunction(promise)).addListener(listeners.get());
+ promise.flatMap(listeners.getRecursiveAsyncFunction(promise), MoreExecutors.directExecutor()).addListener(listeners.get());
+ promise.flatMap(listeners.getAsyncFailingFunction()).addListener(listeners.getListenerToFailure(promise));
+ promise.flatMap(listeners.getAsyncFailingFunction(), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise));
+ promise.flatMap(listeners.getRecursiveAsyncFailingFunction(promise)).addListener(listeners.getListenerToFailure(promise));
+ promise.flatMap(listeners.getRecursiveAsyncFailingFunction(promise), MoreExecutors.directExecutor()).addListener(listeners.getListenerToFailure(promise));
success(promise, Promise::isSuccess, true);
success(promise, Promise::isDone, true);
success(promise, Promise::isCancelled, false);
@@ -292,7 +325,7 @@ public abstract class AbstractTestAsyncPromise extends AbstractTestPromise
}
public Function<V, Future<V>> getRecursiveAsyncFunction()
{
- return result -> { promise.andThenAsync(getAsyncFunction()); return ImmediateFuture.success(result); };
+ return result -> { promise.flatMap(getAsyncFunction()); return ImmediateFuture.success(result); };
}
public FutureCallback<V> getCallback(Future<V> p)
{
@@ -346,10 +379,10 @@ public abstract class AbstractTestAsyncPromise extends AbstractTestPromise
promise.addCallback(listeners.getRecursiveCallback(promise));
promise.addCallback(fail -> Assert.fail(), listeners.getConsumer());
promise.addCallback(fail -> Assert.fail(), listeners.getRecursiveConsumer());
- promise.andThenAsync(listeners.getAsyncFunction()).addListener(listeners.get());
- promise.andThenAsync(listeners.getAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get());
- promise.andThenAsync(listeners.getRecursiveAsyncFunction()).addListener(listeners.get());
- promise.andThenAsync(listeners.getRecursiveAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get());
+ promise.flatMap(listeners.getAsyncFunction()).addListener(listeners.get());
+ promise.flatMap(listeners.getAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get());
+ promise.flatMap(listeners.getRecursiveAsyncFunction()).addListener(listeners.get());
+ promise.flatMap(listeners.getRecursiveAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get());
success(promise, Promise::isSuccess, false);
success(promise, Promise::isDone, false);
success(promise, Promise::isCancelled, false);
@@ -393,8 +426,8 @@ public abstract class AbstractTestAsyncPromise extends AbstractTestPromise
promise.addCallback(listeners.getRecursiveCallback(promise));
promise.addCallback(fail -> Assert.fail(), listeners.getConsumer());
promise.addCallback(fail -> Assert.fail(), listeners.getRecursiveConsumer());
- promise.andThenAsync(listeners.getAsyncFunction()).addListener(listeners.get());
- promise.andThenAsync(listeners.getAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get());
+ promise.flatMap(listeners.getAsyncFunction()).addListener(listeners.get());
+ promise.flatMap(listeners.getAsyncFunction(), MoreExecutors.directExecutor()).addListener(listeners.get());
success(promise, Promise::isSuccess, false);
success(promise, Promise::isDone, true);
success(promise, Promise::isCancelled, false);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org