You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/01/12 22:28:00 UTC

[jira] [Commented] (KAFKA-6018) Make KafkaFuture.Function java 8 lambda compatible

    [ https://issues.apache.org/jira/browse/KAFKA-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16324658#comment-16324658 ] 

ASF GitHub Bot commented on KAFKA-6018:
---------------------------------------

ewencp closed pull request #4033: KAFKA-6018: Make KafkaFuture.Future an interface
URL: https://github.com/apache/kafka/pull/4033
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
index 90bc2970e13..eaa5a0185cd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
@@ -102,7 +102,7 @@ public ApiException exception() {
      */
     public KafkaFuture<Collection<AclBinding>> all() {
         return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply(
-            new KafkaFuture.Function<Void, Collection<AclBinding>>() {
+            new KafkaFuture.FunctionInterface<Void, Collection<AclBinding>>() {
                 @Override
                 public Collection<AclBinding> apply(Void v) {
                     List<AclBinding> acls = new ArrayList<>();
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
index 478bf055cfd..343a06af9d5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
@@ -53,7 +53,7 @@
      */
     public KafkaFuture<Map<ConfigResource, Config>> all() {
         return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
-                thenApply(new KafkaFuture.Function<Void, Map<ConfigResource, Config>>() {
+                thenApply(new KafkaFuture.FunctionInterface<Void, Map<ConfigResource, Config>>() {
                     @Override
                     public Map<ConfigResource, Config> apply(Void v) {
                         Map<ConfigResource, Config> configs = new HashMap<>(futures.size());
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java
index de186fd751d..9bd2d520b6b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java
@@ -51,7 +51,7 @@
      */
     public KafkaFuture<Map<Integer, Map<String, LogDirInfo>>> all() {
         return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
-            thenApply(new KafkaFuture.Function<Void, Map<Integer, Map<String, LogDirInfo>>>() {
+            thenApply(new KafkaFuture.FunctionInterface<Void, Map<Integer, Map<String, LogDirInfo>>>() {
                 @Override
                 public Map<Integer, Map<String, LogDirInfo>> apply(Void v) {
                     Map<Integer, Map<String, LogDirInfo>> descriptions = new HashMap<>(futures.size());
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java
index 401b4aa7b9d..8a73df38b46 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java
@@ -52,7 +52,7 @@
      */
     public KafkaFuture<Map<TopicPartitionReplica, ReplicaLogDirInfo>> all() {
         return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
-            thenApply(new KafkaFuture.Function<Void, Map<TopicPartitionReplica, ReplicaLogDirInfo>>() {
+            thenApply(new KafkaFuture.FunctionInterface<Void, Map<TopicPartitionReplica, ReplicaLogDirInfo>>() {
                 @Override
                 public Map<TopicPartitionReplica, ReplicaLogDirInfo> apply(Void v) {
                     Map<TopicPartitionReplica, ReplicaLogDirInfo> replicaLogDirInfos = new HashMap<>();
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
index 18f5f9d20cd..add317659f6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
@@ -51,7 +51,7 @@
      */
     public KafkaFuture<Map<String, TopicDescription>> all() {
         return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
-            thenApply(new KafkaFuture.Function<Void, Map<String, TopicDescription>>() {
+            thenApply(new KafkaFuture.FunctionInterface<Void, Map<String, TopicDescription>>() {
                 @Override
                 public Map<String, TopicDescription> apply(Void v) {
                     Map<String, TopicDescription> descriptions = new HashMap<>(futures.size());
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
index e54b3defe36..2b0955732c2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
@@ -48,7 +48,7 @@
      * Return a future which yields a collection of TopicListing objects.
      */
     public KafkaFuture<Collection<TopicListing>> listings() {
-        return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<TopicListing>>() {
+        return future.thenApply(new KafkaFuture.FunctionInterface<Map<String, TopicListing>, Collection<TopicListing>>() {
             @Override
             public Collection<TopicListing> apply(Map<String, TopicListing> namesToDescriptions) {
                 return namesToDescriptions.values();
@@ -60,7 +60,7 @@
      * Return a future which yields a collection of topic names.
      */
     public KafkaFuture<Set<String>> names() {
-        return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Set<String>>() {
+        return future.thenApply(new KafkaFuture.FunctionInterface<Map<String, TopicListing>, Set<String>>() {
             @Override
             public Set<String> apply(Map<String, TopicListing> namesToListings) {
                 return namesToListings.keySet();
diff --git a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
index 23e218137a9..2b3e5efce83 100644
--- a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
+++ b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
@@ -35,18 +35,26 @@
     /**
      * A function which takes objects of type A and returns objects of type B.
      */
-    public static abstract class Function<A, B> {
-        public abstract B apply(A a);
+    public interface FunctionInterface<A, B> {
+        B apply(A a);
     }
+    
+    /**
+     * A function which takes objects of type A and returns objects of type B.
+     *
+     * @deprecated use {@link FunctionInterface} instead
+     */
+    @Deprecated
+    public static abstract class Function<A, B> implements FunctionInterface<A, B> { }
 
     /**
      * A consumer of two different types of object.
      */
-    public static abstract class BiConsumer<A, B> {
-        public abstract void accept(A a, B b);
+    public interface BiConsumer<A, B> {
+        void accept(A a, B b);
     }
 
-    private static class AllOfAdapter<R> extends BiConsumer<R, Throwable> {
+    private static class AllOfAdapter<R> implements BiConsumer<R, Throwable> {
         private int remainingResponses;
         private KafkaFuture<?> future;
 
@@ -101,11 +109,43 @@ private void maybeComplete() {
     /**
      * Returns a new KafkaFuture that, when this future completes normally, is executed with this
      * futures's result as the argument to the supplied function.
+     *
+     * The function may be invoked by the thread that calls {@code thenApply} or it may be invoked by the thread that
+     * completes the future.
+     */
+    public abstract <R> KafkaFuture<R> thenApply(FunctionInterface<T, R> function);
+
+    /**
+     * @see KafkaFuture#thenApply(FunctionInterface)
+     *
+     * @deprecated use {@link KafkaFuture#thenApply(FunctionInterface)}
      */
+    @Deprecated
     public abstract <R> KafkaFuture<R> thenApply(Function<T, R> function);
 
-    protected abstract void addWaiter(BiConsumer<? super T, ? super Throwable> action);
+    /**
+     * Returns a new KafkaFuture with the same result or exception as this future, that executes the given action
+     * when this future completes.
+     *
+     * When this future is done, the given action is invoked with the result (or null if none) and the exception
+     * (or null if none) of this future as arguments.
+     *
+     * The returned future is completed when the action returns.
+     * The supplied action should not throw an exception. However, if it does, the following rules apply:
+     * if this future completed normally but the supplied action throws an exception, then the returned future completes
+     * exceptionally with the supplied action's exception.
+     * Or, if this future completed exceptionally and the supplied action throws an exception, then the returned future
+     * completes exceptionally with this future's exception.
+     *
+     * The action may be invoked by the thread that calls {@code whenComplete} or it may be invoked by the thread that
+     * completes the future.
+     *
+     * @param action the action to preform
+     * @return the new future
+     */
+    public abstract KafkaFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
 
+    protected abstract void addWaiter(BiConsumer<? super T, ? super Throwable> action);
     /**
      * If not already completed, sets the value returned by get() and related methods to the given
      * value.
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
index e2dbdf9be92..20cdfca86f2 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
@@ -46,11 +46,11 @@ private static void wrapAndThrow(Throwable t) throws InterruptedException, Execu
         }
     }
 
-    private static class Applicant<A, B> extends BiConsumer<A, Throwable> {
-        private final Function<A, B> function;
+    private static class Applicant<A, B> implements BiConsumer<A, Throwable> {
+        private final FunctionInterface<A, B> function;
         private final KafkaFutureImpl<B> future;
 
-        Applicant(Function<A, B> function, KafkaFutureImpl<B> future) {
+        Applicant(FunctionInterface<A, B> function, KafkaFutureImpl<B> future) {
             this.function = function;
             this.future = future;
         }
@@ -70,7 +70,7 @@ public void accept(A a, Throwable exception) {
         }
     }
 
-    private static class SingleWaiter<R> extends BiConsumer<R, Throwable> {
+    private static class SingleWaiter<R> implements BiConsumer<R, Throwable> {
         private R value = null;
         private Throwable exception = null;
         private boolean done = false;
@@ -140,14 +140,60 @@ R await(long timeout, TimeUnit unit)
      * futures's result as the argument to the supplied function.
      */
     @Override
-    public <R> KafkaFuture<R> thenApply(Function<T, R> function) {
+    public <R> KafkaFuture<R> thenApply(FunctionInterface<T, R> function) {
         KafkaFutureImpl<R> future = new KafkaFutureImpl<R>();
         addWaiter(new Applicant<>(function, future));
         return future;
     }
 
+
+    /**
+     * @See KafkaFutureImpl#thenApply(FunctionInterface)
+     */
+    @Deprecated
+    @Override
+    public <R> KafkaFuture<R> thenApply(Function<T, R> function) {
+        return thenApply((FunctionInterface<T, R>) function);
+    }
+
+    private static class WhenCompleteBiConsumer<T> implements BiConsumer<T, Throwable> {
+        private final KafkaFutureImpl<T> future;
+        private final BiConsumer<? super T, ? super Throwable> biConsumer;
+
+        WhenCompleteBiConsumer(KafkaFutureImpl<T> future, BiConsumer<? super T, ? super Throwable> biConsumer) {
+            this.future = future;
+            this.biConsumer = biConsumer;
+        }
+
+        @Override
+        public void accept(T val, Throwable exception) {
+            try {
+                if (exception != null) {
+                    biConsumer.accept(null, exception);
+                } else {
+                    biConsumer.accept(val, null);
+                }
+            } catch (Throwable e) {
+                if (exception == null) {
+                    exception = e;
+                }
+            }
+            if (exception != null) {
+                future.completeExceptionally(exception);
+            } else {
+                future.complete(val);
+            }
+        }
+    }
+
     @Override
-    protected synchronized void addWaiter(BiConsumer<? super T, ? super Throwable> action) {
+    public KafkaFuture<T> whenComplete(final BiConsumer<? super T, ? super Throwable> biConsumer) {
+        final KafkaFutureImpl<T> future = new KafkaFutureImpl<>();
+        addWaiter(new WhenCompleteBiConsumer<>(future, biConsumer));
+        return future;
+    }
+
+    public synchronized void addWaiter(BiConsumer<? super T, ? super Throwable> action) {
         if (exception != null) {
             action.accept(null, exception);
         } else if (done) {
diff --git a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
index 71f3c3c984d..eac5f9b2e4e 100644
--- a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
@@ -83,6 +83,47 @@ public void testCompletingFutures() throws Exception {
         assertEquals(null, myThread.testException);
     }
 
+    @Test
+    public void testThenApply() throws Exception {
+        KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+        KafkaFuture<Integer> doubledFuture = future.thenApply(new KafkaFuture.FunctionInterface<Integer, Integer>() {
+            @Override
+            public Integer apply(Integer integer) {
+                return 2 * integer;
+            }
+        });
+        assertFalse(doubledFuture.isDone());
+        KafkaFuture<Integer> tripledFuture = future.thenApply(new KafkaFuture.Function<Integer, Integer>() {
+            @Override
+            public Integer apply(Integer integer) {
+                return 3 * integer;
+            }
+        });
+        assertFalse(tripledFuture.isDone());
+        future.complete(21);
+        assertEquals(Integer.valueOf(21), future.getNow(-1));
+        assertEquals(Integer.valueOf(42), doubledFuture.getNow(-1));
+        assertEquals(Integer.valueOf(63), tripledFuture.getNow(-1));
+        KafkaFuture<Integer> quadrupledFuture = future.thenApply(new KafkaFuture.FunctionInterface<Integer, Integer>() {
+            @Override
+            public Integer apply(Integer integer) {
+                return 4 * integer;
+            }
+        });
+        assertEquals(Integer.valueOf(84), quadrupledFuture.getNow(-1));
+
+        KafkaFutureImpl<Integer> futureFail = new KafkaFutureImpl<>();
+        KafkaFuture<Integer> futureAppliedFail = futureFail.thenApply(new KafkaFuture.FunctionInterface<Integer, Integer>() {
+            @Override
+            public Integer apply(Integer integer) {
+                return 2 * integer;
+            }
+        });
+        futureFail.completeExceptionally(new RuntimeException());
+        assertTrue(futureFail.isCompletedExceptionally());
+        assertTrue(futureAppliedFail.isCompletedExceptionally());
+    }
+
     private static class CompleterThread<T> extends Thread {
 
         private final KafkaFutureImpl<T> future;
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java b/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
index ba5e1ed0c59..5a2dac3c337 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
@@ -87,7 +87,7 @@ public synchronized void addWaiter(long delayMs, KafkaFutureImpl<Long> waiter) {
                                   final Callable<T> callable, long delayMs) {
         final KafkaFutureImpl<T> future = new KafkaFutureImpl<>();
         KafkaFutureImpl<Long> waiter = new KafkaFutureImpl<>();
-        waiter.thenApply(new KafkaFuture.Function<Long, Void>() {
+        waiter.thenApply(new KafkaFuture.FunctionInterface<Long, Void>() {
             @Override
             public Void apply(final Long now) {
                 executor.submit(new Callable<Void>() {
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
index 3c03e1e62ca..d1031bac0c1 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
@@ -290,7 +290,7 @@ public void createWorker(final String id, TaskSpec spec) throws Exception {
                 return;
             }
             KafkaFutureImpl<String> haltFuture = new KafkaFutureImpl<>();
-            haltFuture.thenApply(new KafkaFuture.Function<String, Void>() {
+            haltFuture.thenApply(new KafkaFuture.FunctionInterface<String, Void>() {
                 @Override
                 public Void apply(String errorString) {
                     if (errorString.isEmpty()) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Make KafkaFuture.Function java 8 lambda compatible
> --------------------------------------------------
>
>                 Key: KAFKA-6018
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6018
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>            Reporter: Steven Aerts
>
> KafkaFuture.Function is currently an empty public abstract class.
> This means you cannot implement them as a java lambda.  And you end up with constructs as:
> {code:java}
> new KafkaFuture.Function<Set<String>, Object>() {
>     @Override
>     public Object apply(Set<String> strings) {
>         return foo;
>     }
> }
> {code}
> I propose to define them as interfaces.
> So this code can become in java 8:
> {code:java}
> strings -> foo
> {code}
> I know this change is backwards incompatible (extends becomes implements).
> But as {{KafkaFuture}} is marked as {{@InterfaceStability.Evolving}}.
> And KafkaFuture states in its javadoc:
> {quote}This will eventually become a thin shim on top of Java 8's CompletableFuture.{quote}
> I think this change might be worth considering.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)