You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2022/06/29 18:49:46 UTC

[accumulo] branch main updated: Improve use of Futures in Gatherer (#2752)

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a517b0d8c Improve use of Futures in Gatherer (#2752)
8a517b0d8c is described below

commit 8a517b0d8c8bf669908afe6dc3713c5dc258e7f9
Author: Nicholas Coltharp <co...@pdx.edu>
AuthorDate: Wed Jun 29 18:49:41 2022 +0000

    Improve use of Futures in Gatherer (#2752)
    
    - Add a new utility method `iterateUntil` that creates a future
    that iterates some action (that returns a `CompletableFuture`) until
    some condition is met.
    - Change the `PartitionFuture` class to be a thin wrapper around a
    future created by `iterateUntil`.  (We can't remove the class
    entirely, because we still need to carry around the `cancelFlag` so that
    we can pass it to `FileProcessor`.)
---
 .../org/apache/accumulo/core/summary/Gatherer.java | 120 +++------------------
 .../accumulo/core/util/CompletableFutureUtil.java  |  24 +++++
 .../core/util/CompletableFutureUtilTest.java       |  44 ++++++++
 3 files changed, 85 insertions(+), 103 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
index 8435b8ab2c..90f8523173 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
@@ -19,7 +19,6 @@
 package org.apache.accumulo.core.summary;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
 import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LAST;
 import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
@@ -42,6 +41,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
@@ -346,23 +346,11 @@ public class Gatherer {
   }
 
   private class PartitionFuture implements Future<SummaryCollection> {
-
-    private CompletableFuture<ProcessedFiles> future;
-    private int modulus;
-    private int remainder;
-    private ExecutorService execSrv;
-    private TInfo tinfo;
-    private AtomicBoolean cancelFlag = new AtomicBoolean(false);
+    private final CompletableFuture<SummaryCollection> future;
+    private final AtomicBoolean cancelFlag = new AtomicBoolean(false);
 
     PartitionFuture(TInfo tinfo, ExecutorService execSrv, int modulus, int remainder) {
-      this.tinfo = tinfo;
-      this.execSrv = execSrv;
-      this.modulus = modulus;
-      this.remainder = remainder;
-    }
-
-    private synchronized void initiateProcessing(ProcessedFiles previousWork) {
-      try {
+      Function<ProcessedFiles,CompletableFuture<ProcessedFiles>> go = previousWork -> {
         Predicate<TabletFile> fileSelector = file -> Math
             .abs(Hashing.murmur3_32_fixed().hashString(file.getPathStr(), UTF_8).asInt()) % modulus
             == remainder;
@@ -386,53 +374,17 @@ public class Gatherer {
               .supplyAsync(new FilesProcessor(tinfo, location, allFiles, cancelFlag), execSrv));
         }
 
-        future = CompletableFutureUtil.merge(futures,
+        return CompletableFutureUtil.merge(futures,
             (pf1, pf2) -> ProcessedFiles.merge(pf1, pf2, factory), ProcessedFiles::new);
-
-        // when all processing is done, check for failed files... and if found starting processing
-        // again
-        @SuppressWarnings("unused")
-        CompletableFuture<Void> unused = future.thenRun(() -> {
-          CompletableFuture<ProcessedFiles> unused2 = this.updateFuture();
-        });
-      } catch (Exception e) {
-        future = CompletableFuture.completedFuture(new ProcessedFiles());
-        // force future to have this exception
-        future.obtrudeException(e);
-      }
-    }
-
-    private ProcessedFiles _get() {
-      try {
-        return future.get();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(e);
-      } catch (ExecutionException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    private synchronized CompletableFuture<ProcessedFiles> updateFuture() {
-      if (future.isDone()) {
-        if (!future.isCancelled() && !future.isCompletedExceptionally()) {
-          ProcessedFiles pf = _get();
-          if (!pf.failedFiles.isEmpty()) {
-            initiateProcessing(pf);
-          }
-        }
-      }
-
-      return future;
-    }
-
-    synchronized void initiateProcessing() {
-      Preconditions.checkState(future == null);
-      initiateProcessing(null);
+      };
+      future = CompletableFutureUtil
+          .iterateUntil(go,
+              previousWork -> previousWork != null && previousWork.failedFiles.isEmpty(), null)
+          .thenApply(pf -> pf.summaries);
     }
 
     @Override
-    public synchronized boolean cancel(boolean mayInterruptIfRunning) {
+    public boolean cancel(boolean mayInterruptIfRunning) {
       boolean canceled = future.cancel(mayInterruptIfRunning);
       if (canceled) {
         cancelFlag.set(true);
@@ -441,59 +393,24 @@ public class Gatherer {
     }
 
     @Override
-    public synchronized boolean isCancelled() {
+    public boolean isCancelled() {
       return future.isCancelled();
     }
 
     @Override
-    public synchronized boolean isDone() {
-      @SuppressWarnings("unused")
-      CompletableFuture<ProcessedFiles> unused = updateFuture();
-      if (future.isDone()) {
-        if (future.isCancelled() || future.isCompletedExceptionally()) {
-          return true;
-        }
-
-        ProcessedFiles pf = _get();
-        if (pf.failedFiles.isEmpty()) {
-          return true;
-        } else {
-          unused = updateFuture();
-        }
-      }
-
-      return false;
+    public boolean isDone() {
+      return future.isDone();
     }
 
     @Override
     public SummaryCollection get() throws InterruptedException, ExecutionException {
-      CompletableFuture<ProcessedFiles> futureRef = updateFuture();
-      ProcessedFiles processedFiles = futureRef.get();
-      while (!processedFiles.failedFiles.isEmpty()) {
-        futureRef = updateFuture();
-        processedFiles = futureRef.get();
-      }
-      return processedFiles.summaries;
+      return future.get();
     }
 
     @Override
     public SummaryCollection get(long timeout, TimeUnit unit)
         throws InterruptedException, ExecutionException, TimeoutException {
-      long nanosLeft = unit.toNanos(timeout);
-      long t1, t2;
-      CompletableFuture<ProcessedFiles> futureRef = updateFuture();
-      t1 = System.nanoTime();
-      ProcessedFiles processedFiles = futureRef.get(Long.max(1, nanosLeft), NANOSECONDS);
-      t2 = System.nanoTime();
-      nanosLeft -= (t2 - t1);
-      while (!processedFiles.failedFiles.isEmpty()) {
-        futureRef = updateFuture();
-        t1 = System.nanoTime();
-        processedFiles = futureRef.get(Long.max(1, nanosLeft), NANOSECONDS);
-        t2 = System.nanoTime();
-        nanosLeft -= (t2 - t1);
-      }
-      return processedFiles.summaries;
+      return future.get(timeout, unit);
     }
 
   }
@@ -504,10 +421,7 @@ public class Gatherer {
    */
   public Future<SummaryCollection> processPartition(ExecutorService execSrv, int modulus,
       int remainder) {
-    PartitionFuture future =
-        new PartitionFuture(TraceUtil.traceInfo(), execSrv, modulus, remainder);
-    future.initiateProcessing();
-    return future;
+    return new PartitionFuture(TraceUtil.traceInfo(), execSrv, modulus, remainder);
   }
 
   public interface FileSystemResolver {
diff --git a/core/src/main/java/org/apache/accumulo/core/util/CompletableFutureUtil.java b/core/src/main/java/org/apache/accumulo/core/util/CompletableFutureUtil.java
index dac1494db6..bb686660de 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/CompletableFutureUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/CompletableFutureUtil.java
@@ -22,6 +22,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 
 public class CompletableFutureUtil {
@@ -49,4 +51,26 @@ public class CompletableFutureUtil {
     return futures.get(0);
   }
 
+  /**
+   * Iterate some function until a given condition is met.
+   *
+   * The step function should always return an asynchronous {@code
+   * CompletableFuture} in order to avoid stack overflows.
+   */
+  public static <T> CompletableFuture<T> iterateUntil(Function<T,CompletableFuture<T>> step,
+      Predicate<T> isDone, T init) {
+    // We'd like to use a lambda here, but lambdas don't have
+    // `this`, so we would have to use some clumsy indirection to
+    // achieve self-reference.
+    Function<T,CompletableFuture<T>> go = new Function<>() {
+      @Override
+      public CompletableFuture<T> apply(T x) {
+        if (isDone.test(x)) {
+          return CompletableFuture.completedFuture(x);
+        }
+        return step.apply(x).thenCompose(this);
+      }
+    };
+    return CompletableFuture.completedFuture(init).thenCompose(go);
+  }
 }
diff --git a/core/src/test/java/org/apache/accumulo/core/util/CompletableFutureUtilTest.java b/core/src/test/java/org/apache/accumulo/core/util/CompletableFutureUtilTest.java
index 6b29a3e992..2e719540b6 100644
--- a/core/src/test/java/org/apache/accumulo/core/util/CompletableFutureUtilTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/util/CompletableFutureUtilTest.java
@@ -19,13 +19,17 @@
 package org.apache.accumulo.core.util;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.function.Function;
+import java.util.function.Predicate;
 
 import org.junit.jupiter.api.Test;
 
@@ -54,4 +58,44 @@ public class CompletableFutureUtilTest {
       es.shutdown();
     }
   }
+
+  @Test
+  public void testIterateUntil() throws Exception {
+    ExecutorService es = Executors.newFixedThreadPool(1);
+    Function<Integer,CompletableFuture<Integer>> step =
+        n -> CompletableFuture.supplyAsync(() -> n - 1, es);
+    Predicate<Integer> isDone = n -> n == 0;
+    // The call stack should overflow before 10,000 calls, so this
+    // effectively tests whether iterateUntil avoids stack overflows
+    // when given async futures.
+    for (int n : new int[] {0, 1, 2, 3, 100, 10_000}) {
+      assertEquals(0, CompletableFutureUtil.iterateUntil(step, isDone, n).get());
+    }
+    // Test throwing an exception in the step function.
+    {
+      Function<Integer,CompletableFuture<Integer>> badStep = n -> {
+        throw new RuntimeException();
+      };
+      assertThrows(ExecutionException.class,
+          () -> CompletableFutureUtil.iterateUntil(badStep, isDone, 100).get());
+    }
+    // Test throwing an exception in the future returned by the step
+    // function.
+    {
+      Function<Integer,CompletableFuture<Integer>> badStep =
+          n -> CompletableFuture.supplyAsync(() -> {
+            throw new RuntimeException();
+          }, es);
+      assertThrows(ExecutionException.class,
+          () -> CompletableFutureUtil.iterateUntil(badStep, isDone, 100).get());
+    }
+    // Test throwing an exception in the predicate.
+    {
+      Predicate<Integer> badIsDone = n -> {
+        throw new RuntimeException();
+      };
+      assertThrows(ExecutionException.class,
+          () -> CompletableFutureUtil.iterateUntil(step, badIsDone, 100).get());
+    }
+  }
 }