You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2022/06/29 18:49:46 UTC
[accumulo] branch main updated: Improve use of Futures in Gatherer (#2752)
This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 8a517b0d8c Improve use of Futures in Gatherer (#2752)
8a517b0d8c is described below
commit 8a517b0d8c8bf669908afe6dc3713c5dc258e7f9
Author: Nicholas Coltharp <co...@pdx.edu>
AuthorDate: Wed Jun 29 18:49:41 2022 +0000
Improve use of Futures in Gatherer (#2752)
- Add a new utility method `iterateUntil` that creates a future
that iterates some action (that returns a `CompletableFuture`) until
some condition is met.
- Change the `PartitionFuture` class to be a thin wrapper around a
future created by `iterateUntil`. (We can't remove the class
entirely, because we still need to carry around the `cancelFlag` so that
we can pass it to `FileProcessor`.)
---
.../org/apache/accumulo/core/summary/Gatherer.java | 120 +++------------------
.../accumulo/core/util/CompletableFutureUtil.java | 24 +++++
.../core/util/CompletableFutureUtilTest.java | 44 ++++++++
3 files changed, 85 insertions(+), 103 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
index 8435b8ab2c..90f8523173 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
@@ -19,7 +19,6 @@
package org.apache.accumulo.core.summary;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LAST;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
@@ -42,6 +41,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Pattern;
@@ -346,23 +346,11 @@ public class Gatherer {
}
private class PartitionFuture implements Future<SummaryCollection> {
-
- private CompletableFuture<ProcessedFiles> future;
- private int modulus;
- private int remainder;
- private ExecutorService execSrv;
- private TInfo tinfo;
- private AtomicBoolean cancelFlag = new AtomicBoolean(false);
+ private final CompletableFuture<SummaryCollection> future;
+ private final AtomicBoolean cancelFlag = new AtomicBoolean(false);
PartitionFuture(TInfo tinfo, ExecutorService execSrv, int modulus, int remainder) {
- this.tinfo = tinfo;
- this.execSrv = execSrv;
- this.modulus = modulus;
- this.remainder = remainder;
- }
-
- private synchronized void initiateProcessing(ProcessedFiles previousWork) {
- try {
+ Function<ProcessedFiles,CompletableFuture<ProcessedFiles>> go = previousWork -> {
Predicate<TabletFile> fileSelector = file -> Math
.abs(Hashing.murmur3_32_fixed().hashString(file.getPathStr(), UTF_8).asInt()) % modulus
== remainder;
@@ -386,53 +374,17 @@ public class Gatherer {
.supplyAsync(new FilesProcessor(tinfo, location, allFiles, cancelFlag), execSrv));
}
- future = CompletableFutureUtil.merge(futures,
+ return CompletableFutureUtil.merge(futures,
(pf1, pf2) -> ProcessedFiles.merge(pf1, pf2, factory), ProcessedFiles::new);
-
- // when all processing is done, check for failed files... and if found starting processing
- // again
- @SuppressWarnings("unused")
- CompletableFuture<Void> unused = future.thenRun(() -> {
- CompletableFuture<ProcessedFiles> unused2 = this.updateFuture();
- });
- } catch (Exception e) {
- future = CompletableFuture.completedFuture(new ProcessedFiles());
- // force future to have this exception
- future.obtrudeException(e);
- }
- }
-
- private ProcessedFiles _get() {
- try {
- return future.get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new RuntimeException(e);
- }
- }
-
- private synchronized CompletableFuture<ProcessedFiles> updateFuture() {
- if (future.isDone()) {
- if (!future.isCancelled() && !future.isCompletedExceptionally()) {
- ProcessedFiles pf = _get();
- if (!pf.failedFiles.isEmpty()) {
- initiateProcessing(pf);
- }
- }
- }
-
- return future;
- }
-
- synchronized void initiateProcessing() {
- Preconditions.checkState(future == null);
- initiateProcessing(null);
+ };
+ future = CompletableFutureUtil
+ .iterateUntil(go,
+ previousWork -> previousWork != null && previousWork.failedFiles.isEmpty(), null)
+ .thenApply(pf -> pf.summaries);
}
@Override
- public synchronized boolean cancel(boolean mayInterruptIfRunning) {
+ public boolean cancel(boolean mayInterruptIfRunning) {
boolean canceled = future.cancel(mayInterruptIfRunning);
if (canceled) {
cancelFlag.set(true);
@@ -441,59 +393,24 @@ public class Gatherer {
}
@Override
- public synchronized boolean isCancelled() {
+ public boolean isCancelled() {
return future.isCancelled();
}
@Override
- public synchronized boolean isDone() {
- @SuppressWarnings("unused")
- CompletableFuture<ProcessedFiles> unused = updateFuture();
- if (future.isDone()) {
- if (future.isCancelled() || future.isCompletedExceptionally()) {
- return true;
- }
-
- ProcessedFiles pf = _get();
- if (pf.failedFiles.isEmpty()) {
- return true;
- } else {
- unused = updateFuture();
- }
- }
-
- return false;
+ public boolean isDone() {
+ return future.isDone();
}
@Override
public SummaryCollection get() throws InterruptedException, ExecutionException {
- CompletableFuture<ProcessedFiles> futureRef = updateFuture();
- ProcessedFiles processedFiles = futureRef.get();
- while (!processedFiles.failedFiles.isEmpty()) {
- futureRef = updateFuture();
- processedFiles = futureRef.get();
- }
- return processedFiles.summaries;
+ return future.get();
}
@Override
public SummaryCollection get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
- long nanosLeft = unit.toNanos(timeout);
- long t1, t2;
- CompletableFuture<ProcessedFiles> futureRef = updateFuture();
- t1 = System.nanoTime();
- ProcessedFiles processedFiles = futureRef.get(Long.max(1, nanosLeft), NANOSECONDS);
- t2 = System.nanoTime();
- nanosLeft -= (t2 - t1);
- while (!processedFiles.failedFiles.isEmpty()) {
- futureRef = updateFuture();
- t1 = System.nanoTime();
- processedFiles = futureRef.get(Long.max(1, nanosLeft), NANOSECONDS);
- t2 = System.nanoTime();
- nanosLeft -= (t2 - t1);
- }
- return processedFiles.summaries;
+ return future.get(timeout, unit);
}
}
@@ -504,10 +421,7 @@ public class Gatherer {
*/
public Future<SummaryCollection> processPartition(ExecutorService execSrv, int modulus,
int remainder) {
- PartitionFuture future =
- new PartitionFuture(TraceUtil.traceInfo(), execSrv, modulus, remainder);
- future.initiateProcessing();
- return future;
+ return new PartitionFuture(TraceUtil.traceInfo(), execSrv, modulus, remainder);
}
public interface FileSystemResolver {
diff --git a/core/src/main/java/org/apache/accumulo/core/util/CompletableFutureUtil.java b/core/src/main/java/org/apache/accumulo/core/util/CompletableFutureUtil.java
index dac1494db6..bb686660de 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/CompletableFutureUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/CompletableFutureUtil.java
@@ -22,6 +22,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.function.Supplier;
public class CompletableFutureUtil {
@@ -49,4 +51,26 @@ public class CompletableFutureUtil {
return futures.get(0);
}
+ /**
+ * Iterate some function until a given condition is met.
+ *
+ * The step function should always return an asynchronous {@code
+ * CompletableFuture} in order to avoid stack overflows.
+ */
+ public static <T> CompletableFuture<T> iterateUntil(Function<T,CompletableFuture<T>> step,
+ Predicate<T> isDone, T init) {
+ // We'd like to use a lambda here, but lambdas don't have
+ // `this`, so we would have to use some clumsy indirection to
+ // achieve self-reference.
+ Function<T,CompletableFuture<T>> go = new Function<>() {
+ @Override
+ public CompletableFuture<T> apply(T x) {
+ if (isDone.test(x)) {
+ return CompletableFuture.completedFuture(x);
+ }
+ return step.apply(x).thenCompose(this);
+ }
+ };
+ return CompletableFuture.completedFuture(init).thenCompose(go);
+ }
}
diff --git a/core/src/test/java/org/apache/accumulo/core/util/CompletableFutureUtilTest.java b/core/src/test/java/org/apache/accumulo/core/util/CompletableFutureUtilTest.java
index 6b29a3e992..2e719540b6 100644
--- a/core/src/test/java/org/apache/accumulo/core/util/CompletableFutureUtilTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/util/CompletableFutureUtilTest.java
@@ -19,13 +19,17 @@
package org.apache.accumulo.core.util;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.function.Function;
+import java.util.function.Predicate;
import org.junit.jupiter.api.Test;
@@ -54,4 +58,44 @@ public class CompletableFutureUtilTest {
es.shutdown();
}
}
+
+ @Test
+ public void testIterateUntil() throws Exception {
+ ExecutorService es = Executors.newFixedThreadPool(1);
+ Function<Integer,CompletableFuture<Integer>> step =
+ n -> CompletableFuture.supplyAsync(() -> n - 1, es);
+ Predicate<Integer> isDone = n -> n == 0;
+ // The call stack should overflow before 10,000 calls, so this
+ // effectively tests whether iterateUntil avoids stack overflows
+ // when given async futures.
+ for (int n : new int[] {0, 1, 2, 3, 100, 10_000}) {
+ assertEquals(0, CompletableFutureUtil.iterateUntil(step, isDone, n).get());
+ }
+ // Test throwing an exception in the step function.
+ {
+ Function<Integer,CompletableFuture<Integer>> badStep = n -> {
+ throw new RuntimeException();
+ };
+ assertThrows(ExecutionException.class,
+ () -> CompletableFutureUtil.iterateUntil(badStep, isDone, 100).get());
+ }
+ // Test throwing an exception in the future returned by the step
+ // function.
+ {
+ Function<Integer,CompletableFuture<Integer>> badStep =
+ n -> CompletableFuture.supplyAsync(() -> {
+ throw new RuntimeException();
+ }, es);
+ assertThrows(ExecutionException.class,
+ () -> CompletableFutureUtil.iterateUntil(badStep, isDone, 100).get());
+ }
+ // Test throwing an exception in the predicate.
+ {
+ Predicate<Integer> badIsDone = n -> {
+ throw new RuntimeException();
+ };
+ assertThrows(ExecutionException.class,
+ () -> CompletableFutureUtil.iterateUntil(step, badIsDone, 100).get());
+ }
+ }
}