You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/01/30 23:56:00 UTC

[kafka] branch trunk updated: KAFKA-6018: Make KafkaFuture.Future an interface (KIP-218)

This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ae42cc8  KAFKA-6018: Make KafkaFuture.Future an interface (KIP-218)
ae42cc8 is described below

commit ae42cc803023c795eae00d138ea22a6a7951db31
Author: Steven Aerts <st...@gmail.com>
AuthorDate: Tue Jan 30 15:55:49 2018 -0800

    KAFKA-6018: Make KafkaFuture.Future an interface (KIP-218)
    
    Changing KafkaFuture.Future and KafkaFuture.BiConsumer into an interface makes
    them a functional interface.  This makes them Java 8 lambda compatible.
    
    Author: Colin P. Mccabe <cm...@confluent.io>
    Author: Steven Aerts <st...@gmail.com>
    
    Reviewers: Colin P. Mccabe <cm...@confluent.io>, Ismael Juma <is...@juma.me.uk>, Xavier Léauté <xl...@xvrl.net>, Tom Bentley <tb...@redhat.com>, Ewen Cheslack-Postava <ew...@confluent.io>
    
    Closes #4033 from steven-aerts/KAFKA-6018
---
 .../kafka/clients/admin/DeleteAclsResult.java      |  2 +-
 .../kafka/clients/admin/DescribeConfigsResult.java |  2 +-
 .../kafka/clients/admin/DescribeLogDirsResult.java |  2 +-
 .../admin/DescribeReplicaLogDirsResult.java        |  2 +-
 .../kafka/clients/admin/DescribeTopicsResult.java  |  2 +-
 .../kafka/clients/admin/ListTopicsResult.java      |  4 +-
 .../java/org/apache/kafka/common/KafkaFuture.java  | 52 +++++++++++++++++---
 .../kafka/common/internals/KafkaFutureImpl.java    | 57 +++++++++++++++++++---
 .../org/apache/kafka/common/KafkaFutureTest.java   | 41 ++++++++++++++++
 .../apache/kafka/common/utils/MockScheduler.java   |  2 +-
 .../apache/kafka/trogdor/agent/WorkerManager.java  |  2 +-
 11 files changed, 147 insertions(+), 21 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
index 90bc297..19df228 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
@@ -102,7 +102,7 @@ public class DeleteAclsResult {
      */
     public KafkaFuture<Collection<AclBinding>> all() {
         return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply(
-            new KafkaFuture.Function<Void, Collection<AclBinding>>() {
+            new KafkaFuture.BaseFunction<Void, Collection<AclBinding>>() {
                 @Override
                 public Collection<AclBinding> apply(Void v) {
                     List<AclBinding> acls = new ArrayList<>();
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
index 478bf05..e5d79e8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
@@ -53,7 +53,7 @@ public class DescribeConfigsResult {
      */
     public KafkaFuture<Map<ConfigResource, Config>> all() {
         return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
-                thenApply(new KafkaFuture.Function<Void, Map<ConfigResource, Config>>() {
+                thenApply(new KafkaFuture.BaseFunction<Void, Map<ConfigResource, Config>>() {
                     @Override
                     public Map<ConfigResource, Config> apply(Void v) {
                         Map<ConfigResource, Config> configs = new HashMap<>(futures.size());
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java
index de186fd..7c7bde7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java
@@ -51,7 +51,7 @@ public class DescribeLogDirsResult {
      */
     public KafkaFuture<Map<Integer, Map<String, LogDirInfo>>> all() {
         return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
-            thenApply(new KafkaFuture.Function<Void, Map<Integer, Map<String, LogDirInfo>>>() {
+            thenApply(new KafkaFuture.BaseFunction<Void, Map<Integer, Map<String, LogDirInfo>>>() {
                 @Override
                 public Map<Integer, Map<String, LogDirInfo>> apply(Void v) {
                     Map<Integer, Map<String, LogDirInfo>> descriptions = new HashMap<>(futures.size());
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java
index 401b4aa..d4c8479 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java
@@ -52,7 +52,7 @@ public class DescribeReplicaLogDirsResult {
      */
     public KafkaFuture<Map<TopicPartitionReplica, ReplicaLogDirInfo>> all() {
         return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
-            thenApply(new KafkaFuture.Function<Void, Map<TopicPartitionReplica, ReplicaLogDirInfo>>() {
+            thenApply(new KafkaFuture.BaseFunction<Void, Map<TopicPartitionReplica, ReplicaLogDirInfo>>() {
                 @Override
                 public Map<TopicPartitionReplica, ReplicaLogDirInfo> apply(Void v) {
                     Map<TopicPartitionReplica, ReplicaLogDirInfo> replicaLogDirInfos = new HashMap<>();
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
index 18f5f9d..6bb24d9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
@@ -51,7 +51,7 @@ public class DescribeTopicsResult {
      */
     public KafkaFuture<Map<String, TopicDescription>> all() {
         return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
-            thenApply(new KafkaFuture.Function<Void, Map<String, TopicDescription>>() {
+            thenApply(new KafkaFuture.BaseFunction<Void, Map<String, TopicDescription>>() {
                 @Override
                 public Map<String, TopicDescription> apply(Void v) {
                     Map<String, TopicDescription> descriptions = new HashMap<>(futures.size());
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
index e54b3de..a2e17fd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
@@ -48,7 +48,7 @@ public class ListTopicsResult {
      * Return a future which yields a collection of TopicListing objects.
      */
     public KafkaFuture<Collection<TopicListing>> listings() {
-        return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<TopicListing>>() {
+        return future.thenApply(new KafkaFuture.BaseFunction<Map<String, TopicListing>, Collection<TopicListing>>() {
             @Override
             public Collection<TopicListing> apply(Map<String, TopicListing> namesToDescriptions) {
                 return namesToDescriptions.values();
@@ -60,7 +60,7 @@ public class ListTopicsResult {
      * Return a future which yields a collection of topic names.
      */
     public KafkaFuture<Set<String>> names() {
-        return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Set<String>>() {
+        return future.thenApply(new KafkaFuture.BaseFunction<Map<String, TopicListing>, Set<String>>() {
             @Override
             public Set<String> apply(Map<String, TopicListing> namesToListings) {
                 return namesToListings.keySet();
diff --git a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
index 23e2181..9cd2e01 100644
--- a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
+++ b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
@@ -35,18 +35,26 @@ public abstract class KafkaFuture<T> implements Future<T> {
     /**
      * A function which takes objects of type A and returns objects of type B.
      */
-    public static abstract class Function<A, B> {
-        public abstract B apply(A a);
+    public interface BaseFunction<A, B> {
+        B apply(A a);
     }
 
     /**
+     * A function which takes objects of type A and returns objects of type B.
+     *
+     * Prefer the functional interface {@link BaseFunction} over the class {@link Function}.  This class is here for
+     * backwards compatibility reasons and might be deprecated/removed in a future release.
+     */
+    public static abstract class Function<A, B> implements BaseFunction<A, B> { }
+
+    /**
      * A consumer of two different types of object.
      */
-    public static abstract class BiConsumer<A, B> {
-        public abstract void accept(A a, B b);
+    public interface BiConsumer<A, B> {
+        void accept(A a, B b);
     }
 
-    private static class AllOfAdapter<R> extends BiConsumer<R, Throwable> {
+    private static class AllOfAdapter<R> implements BiConsumer<R, Throwable> {
         private int remainingResponses;
         private KafkaFuture<?> future;
 
@@ -101,11 +109,43 @@ public abstract class KafkaFuture<T> implements Future<T> {
     /**
      * Returns a new KafkaFuture that, when this future completes normally, is executed with this
      * futures's result as the argument to the supplied function.
+     *
+     * The function may be invoked by the thread that calls {@code thenApply} or it may be invoked by the thread that
+     * completes the future.
+     */
+    public abstract <R> KafkaFuture<R> thenApply(BaseFunction<T, R> function);
+
+    /**
+     * @see KafkaFuture#thenApply(BaseFunction)
+     *
+     * Prefer {@link KafkaFuture#thenApply(BaseFunction)} as this function is here for backwards compatibility reasons
+     * and might be deprecated/removed in a future release.
      */
     public abstract <R> KafkaFuture<R> thenApply(Function<T, R> function);
 
-    protected abstract void addWaiter(BiConsumer<? super T, ? super Throwable> action);
+    /**
+     * Returns a new KafkaFuture with the same result or exception as this future, that executes the given action
+     * when this future completes.
+     *
+     * When this future is done, the given action is invoked with the result (or null if none) and the exception
+     * (or null if none) of this future as arguments.
+     *
+     * The returned future is completed when the action returns.
+     * The supplied action should not throw an exception. However, if it does, the following rules apply:
+     * if this future completed normally but the supplied action throws an exception, then the returned future completes
+     * exceptionally with the supplied action's exception.
+     * Or, if this future completed exceptionally and the supplied action throws an exception, then the returned future
+     * completes exceptionally with this future's exception.
+     *
+     * The action may be invoked by the thread that calls {@code whenComplete} or it may be invoked by the thread that
+     * completes the future.
+     *
+     * @param action the action to preform
+     * @return the new future
+     */
+    public abstract KafkaFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
 
+    protected abstract void addWaiter(BiConsumer<? super T, ? super Throwable> action);
     /**
      * If not already completed, sets the value returned by get() and related methods to the given
      * value.
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
index e2dbdf9..049ed49 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
@@ -46,11 +46,11 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
         }
     }
 
-    private static class Applicant<A, B> extends BiConsumer<A, Throwable> {
-        private final Function<A, B> function;
+    private static class Applicant<A, B> implements BiConsumer<A, Throwable> {
+        private final BaseFunction<A, B> function;
         private final KafkaFutureImpl<B> future;
 
-        Applicant(Function<A, B> function, KafkaFutureImpl<B> future) {
+        Applicant(BaseFunction<A, B> function, KafkaFutureImpl<B> future) {
             this.function = function;
             this.future = future;
         }
@@ -70,7 +70,7 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
         }
     }
 
-    private static class SingleWaiter<R> extends BiConsumer<R, Throwable> {
+    private static class SingleWaiter<R> implements BiConsumer<R, Throwable> {
         private R value = null;
         private Throwable exception = null;
         private boolean done = false;
@@ -140,14 +140,59 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
      * futures's result as the argument to the supplied function.
      */
     @Override
-    public <R> KafkaFuture<R> thenApply(Function<T, R> function) {
+    public <R> KafkaFuture<R> thenApply(BaseFunction<T, R> function) {
         KafkaFutureImpl<R> future = new KafkaFutureImpl<R>();
         addWaiter(new Applicant<>(function, future));
         return future;
     }
 
+
+    /**
+     * @See KafkaFutureImpl#thenApply(BaseFunction)
+     */
+    @Override
+    public <R> KafkaFuture<R> thenApply(Function<T, R> function) {
+        return thenApply((BaseFunction<T, R>) function);
+    }
+
+    private static class WhenCompleteBiConsumer<T> implements BiConsumer<T, Throwable> {
+        private final KafkaFutureImpl<T> future;
+        private final BiConsumer<? super T, ? super Throwable> biConsumer;
+
+        WhenCompleteBiConsumer(KafkaFutureImpl<T> future, BiConsumer<? super T, ? super Throwable> biConsumer) {
+            this.future = future;
+            this.biConsumer = biConsumer;
+        }
+
+        @Override
+        public void accept(T val, Throwable exception) {
+            try {
+                if (exception != null) {
+                    biConsumer.accept(null, exception);
+                } else {
+                    biConsumer.accept(val, null);
+                }
+            } catch (Throwable e) {
+                if (exception == null) {
+                    exception = e;
+                }
+            }
+            if (exception != null) {
+                future.completeExceptionally(exception);
+            } else {
+                future.complete(val);
+            }
+        }
+    }
+
     @Override
-    protected synchronized void addWaiter(BiConsumer<? super T, ? super Throwable> action) {
+    public KafkaFuture<T> whenComplete(final BiConsumer<? super T, ? super Throwable> biConsumer) {
+        final KafkaFutureImpl<T> future = new KafkaFutureImpl<>();
+        addWaiter(new WhenCompleteBiConsumer<>(future, biConsumer));
+        return future;
+    }
+
+    public synchronized void addWaiter(BiConsumer<? super T, ? super Throwable> action) {
         if (exception != null) {
             action.accept(null, exception);
         } else if (done) {
diff --git a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
index 71f3c3c..6f9efca 100644
--- a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
@@ -83,6 +83,47 @@ public class KafkaFutureTest {
         assertEquals(null, myThread.testException);
     }
 
+    @Test
+    public void testThenApply() throws Exception {
+        KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+        KafkaFuture<Integer> doubledFuture = future.thenApply(new KafkaFuture.BaseFunction<Integer, Integer>() {
+            @Override
+            public Integer apply(Integer integer) {
+                return 2 * integer;
+            }
+        });
+        assertFalse(doubledFuture.isDone());
+        KafkaFuture<Integer> tripledFuture = future.thenApply(new KafkaFuture.Function<Integer, Integer>() {
+            @Override
+            public Integer apply(Integer integer) {
+                return 3 * integer;
+            }
+        });
+        assertFalse(tripledFuture.isDone());
+        future.complete(21);
+        assertEquals(Integer.valueOf(21), future.getNow(-1));
+        assertEquals(Integer.valueOf(42), doubledFuture.getNow(-1));
+        assertEquals(Integer.valueOf(63), tripledFuture.getNow(-1));
+        KafkaFuture<Integer> quadrupledFuture = future.thenApply(new KafkaFuture.BaseFunction<Integer, Integer>() {
+            @Override
+            public Integer apply(Integer integer) {
+                return 4 * integer;
+            }
+        });
+        assertEquals(Integer.valueOf(84), quadrupledFuture.getNow(-1));
+
+        KafkaFutureImpl<Integer> futureFail = new KafkaFutureImpl<>();
+        KafkaFuture<Integer> futureAppliedFail = futureFail.thenApply(new KafkaFuture.BaseFunction<Integer, Integer>() {
+            @Override
+            public Integer apply(Integer integer) {
+                return 2 * integer;
+            }
+        });
+        futureFail.completeExceptionally(new RuntimeException());
+        assertTrue(futureFail.isCompletedExceptionally());
+        assertTrue(futureAppliedFail.isCompletedExceptionally());
+    }
+
     private static class CompleterThread<T> extends Thread {
 
         private final KafkaFutureImpl<T> future;
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java b/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
index ba5e1ed..7134f0a 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
@@ -87,7 +87,7 @@ public class MockScheduler implements Scheduler, MockTime.MockTimeListener {
                                   final Callable<T> callable, long delayMs) {
         final KafkaFutureImpl<T> future = new KafkaFutureImpl<>();
         KafkaFutureImpl<Long> waiter = new KafkaFutureImpl<>();
-        waiter.thenApply(new KafkaFuture.Function<Long, Void>() {
+        waiter.thenApply(new KafkaFuture.BaseFunction<Long, Void>() {
             @Override
             public Void apply(final Long now) {
                 executor.submit(new Callable<Void>() {
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
index 3c03e1e..cda7773 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
@@ -290,7 +290,7 @@ public final class WorkerManager {
                 return;
             }
             KafkaFutureImpl<String> haltFuture = new KafkaFutureImpl<>();
-            haltFuture.thenApply(new KafkaFuture.Function<String, Void>() {
+            haltFuture.thenApply(new KafkaFuture.BaseFunction<String, Void>() {
                 @Override
                 public Void apply(String errorString) {
                     if (errorString.isEmpty()) {

-- 
To stop receiving notification emails like this one, please contact
ewencp@apache.org.