You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/12/31 23:02:41 UTC

[pulsar] branch master updated: Relay on CompletableFuture utils (#3270)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new adc6fc0  Relay on CompletableFuture utils (#3270)
adc6fc0 is described below

commit adc6fc0ebeaca514b38de23373edfc72737ddb38
Author: Ezequiel Lovelle <ez...@gmail.com>
AuthorDate: Mon Dec 31 20:02:36 2018 -0300

    Relay on CompletableFuture utils (#3270)
    
    Use static method allOf() from CompletableFuture instead of custom logic to wait
    for each future to complete from a collection.
---
 .../org/apache/pulsar/common/util/FutureUtil.java  | 30 ++--------------------
 1 file changed, 2 insertions(+), 28 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index e07941a..0aa6a96 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -20,8 +20,6 @@ package org.apache.pulsar.common.util;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 
 public class FutureUtil {
 
@@ -31,32 +29,8 @@ public class FutureUtil {
      * @param futures
      * @return
      */
-    public static <T> CompletableFuture<T> waitForAll(List<CompletableFuture<T>> futures) {
-        if (futures.isEmpty()) {
-            return CompletableFuture.completedFuture(null);
-        }
-
-        final CompletableFuture<T> compositeFuture = new CompletableFuture<>();
-        final AtomicInteger count = new AtomicInteger(futures.size());
-        final AtomicReference<Throwable> exception = new AtomicReference<>();
-
-        for (CompletableFuture<T> future : futures) {
-            future.whenComplete((r, ex) -> {
-                if (ex != null) {
-                    exception.compareAndSet(null, ex);
-                }
-                if (count.decrementAndGet() == 0) {
-                    // All the pending futures did complete
-                    if (exception.get() != null) {
-                        compositeFuture.completeExceptionally(exception.get());
-                    } else {
-                        compositeFuture.complete(null);
-                    }
-                }
-            });
-        }
-
-        return compositeFuture;
+    public static <T> CompletableFuture<Void> waitForAll(List<CompletableFuture<T>> futures) {
+        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
     }
 
     public static <T> CompletableFuture<T> failedFuture(Throwable t) {