You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/12/31 23:02:41 UTC
[pulsar] branch master updated: Relay on CompletableFuture utils
(#3270)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new adc6fc0 Relay on CompletableFuture utils (#3270)
adc6fc0 is described below
commit adc6fc0ebeaca514b38de23373edfc72737ddb38
Author: Ezequiel Lovelle <ez...@gmail.com>
AuthorDate: Mon Dec 31 20:02:36 2018 -0300
Relay on CompletableFuture utils (#3270)
Use static method allOf() from CompletableFuture instead of custom logic to wait
for each future to complete from a collection.
---
.../org/apache/pulsar/common/util/FutureUtil.java | 30 ++--------------------
1 file changed, 2 insertions(+), 28 deletions(-)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index e07941a..0aa6a96 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -20,8 +20,6 @@ package org.apache.pulsar.common.util;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
public class FutureUtil {
@@ -31,32 +29,8 @@ public class FutureUtil {
* @param futures
* @return
*/
- public static <T> CompletableFuture<T> waitForAll(List<CompletableFuture<T>> futures) {
- if (futures.isEmpty()) {
- return CompletableFuture.completedFuture(null);
- }
-
- final CompletableFuture<T> compositeFuture = new CompletableFuture<>();
- final AtomicInteger count = new AtomicInteger(futures.size());
- final AtomicReference<Throwable> exception = new AtomicReference<>();
-
- for (CompletableFuture<T> future : futures) {
- future.whenComplete((r, ex) -> {
- if (ex != null) {
- exception.compareAndSet(null, ex);
- }
- if (count.decrementAndGet() == 0) {
- // All the pending futures did complete
- if (exception.get() != null) {
- compositeFuture.completeExceptionally(exception.get());
- } else {
- compositeFuture.complete(null);
- }
- }
- });
- }
-
- return compositeFuture;
+ public static <T> CompletableFuture<Void> waitForAll(List<CompletableFuture<T>> futures) {
+ return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
public static <T> CompletableFuture<T> failedFuture(Throwable t) {