You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/11/30 11:32:00 UTC

[jira] [Commented] (KAFKA-6987) Reimplement KafkaFuture with CompletableFuture

    [ https://issues.apache.org/jira/browse/KAFKA-6987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704611#comment-16704611 ] 

ASF GitHub Bot commented on KAFKA-6987:
---------------------------------------

andrasbeni closed pull request #5131: KAFKA-6987 Reimplement KafkaFuture with CompletableFuture
URL: https://github.com/apache/kafka/pull/5131
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
index 9cd2e01dc42..4afe3c530bf 100644
--- a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
+++ b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
@@ -19,14 +19,19 @@
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 /**
- * A flexible future which supports call chaining and other asynchronous programming patterns. This will
- * eventually become a thin shim on top of Java 8's CompletableFuture.
+ * A flexible future which supports call chaining and other asynchronous programming patterns. This
+ * is a thin shim on top of Java 8's CompletableFuture.
+ *
+ * Please note that while this class offers methods similar to CompletableFuture's whenComplete and thenApply,
+ * functions passed to these methods will never be called with CompletionException. If you wish to use
+ * CompletableFuture semantics, use {@link #toCompletableFuture()}.
  *
  * The API for this class is still evolving and we may break compatibility in minor releases, if necessary.
  */
@@ -202,4 +207,13 @@ public abstract T get(long timeout, TimeUnit unit) throws InterruptedException,
      */
     @Override
     public abstract boolean isDone();
+
+    /**
+     * Returns a ComletableFuture equivalent to this Future.
+     *
+     * Implemented in {@link KafkaFuture} throws UnsuportedOperationException.
+     */
+    public CompletableFuture<T> toCompletableFuture() {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
index 33916ac952a..fd9093d763a 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
@@ -16,124 +16,38 @@
  */
 package org.apache.kafka.common.internals;
 
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.kafka.common.KafkaFuture;
+
+
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.kafka.common.KafkaFuture;
-
 /**
  * A flexible future which supports call chaining and other asynchronous programming patterns.
- * This will eventually become a thin shim on top of Java 8's CompletableFuture.
+ * This is a thin shim on top of Java 8's CompletableFuture.
+ *
+ * Please note that while this class offers methods similar to CompletableFuture's whenComplete and thenApply,
+ * functions passed to these methods will never be called with CompletionException. If you wish to use
+ * CompletableFuture semantics, use {@link #toCompletableFuture()}.
+ *
  */
 public class KafkaFutureImpl<T> extends KafkaFuture<T> {
-    /**
-     * A convenience method that throws the current exception, wrapping it if needed.
-     *
-     * In general, KafkaFuture throws CancellationException and InterruptedException directly, and
-     * wraps all other exceptions in an ExecutionException.
-     */
-    private static void wrapAndThrow(Throwable t) throws InterruptedException, ExecutionException {
-        if (t instanceof CancellationException) {
-            throw (CancellationException) t;
-        } else if (t instanceof InterruptedException) {
-            throw (InterruptedException) t;
-        } else {
-            throw new ExecutionException(t);
-        }
-    }
-
-    private static class Applicant<A, B> implements BiConsumer<A, Throwable> {
-        private final BaseFunction<A, B> function;
-        private final KafkaFutureImpl<B> future;
 
-        Applicant(BaseFunction<A, B> function, KafkaFutureImpl<B> future) {
-            this.function = function;
-            this.future = future;
-        }
+    private CompletableFuture<T> completableFuture;
 
-        @Override
-        public void accept(A a, Throwable exception) {
-            if (exception != null) {
-                future.completeExceptionally(exception);
-            } else {
-                try {
-                    B b = function.apply(a);
-                    future.complete(b);
-                } catch (Throwable t) {
-                    future.completeExceptionally(t);
-                }
-            }
-        }
+    public KafkaFutureImpl() {
+        this(new CompletableFuture<>());
     }
 
-    private static class SingleWaiter<R> implements BiConsumer<R, Throwable> {
-        private R value = null;
-        private Throwable exception = null;
-        private boolean done = false;
-
-        @Override
-        public synchronized void accept(R newValue, Throwable newException) {
-            this.value = newValue;
-            this.exception = newException;
-            this.done = true;
-            this.notifyAll();
-        }
-
-        synchronized R await() throws InterruptedException, ExecutionException {
-            while (true) {
-                if (exception != null)
-                    wrapAndThrow(exception);
-                if (done)
-                    return value;
-                this.wait();
-            }
-        }
-
-        R await(long timeout, TimeUnit unit)
-                throws InterruptedException, ExecutionException, TimeoutException {
-            long startMs = System.currentTimeMillis();
-            long waitTimeMs = unit.toMillis(timeout);
-            long delta = 0;
-            synchronized (this) {
-                while (true) {
-                    if (exception != null)
-                        wrapAndThrow(exception);
-                    if (done)
-                        return value;
-                    if (delta >= waitTimeMs) {
-                        throw new TimeoutException();
-                    }
-                    this.wait(waitTimeMs - delta);
-                    delta = System.currentTimeMillis() - startMs;
-                }
-            }
-        }
+    private KafkaFutureImpl(CompletableFuture<T> future) {
+        this.completableFuture = future;
     }
 
-    /**
-     * True if this future is done.
-     */
-    private boolean done = false;
 
-    /**
-     * The value of this future, or null.  Protected by the object monitor.
-     */
-    private T value = null;
-
-    /**
-     * The exception associated with this future, or null.  Protected by the object monitor.
-     */
-    private Throwable exception = null;
-
-    /**
-     * A list of objects waiting for this future to complete (either successfully or
-     * exceptionally).  Protected by the object monitor.
-     */
-    private List<BiConsumer<? super T, ? super Throwable>> waiters = new ArrayList<>();
 
     /**
      * Returns a new KafkaFuture that, when this future completes normally, is executed with this
@@ -141,14 +55,19 @@ R await(long timeout, TimeUnit unit)
      */
     @Override
     public <R> KafkaFuture<R> thenApply(BaseFunction<T, R> function) {
-        KafkaFutureImpl<R> future = new KafkaFutureImpl<>();
-        addWaiter(new Applicant<>(function, future));
-        return future;
+        return new KafkaFutureImpl<R>(completableFuture.thenApply(function::apply));
     }
 
+    @Deprecated
     public <R> void copyWith(KafkaFuture<R> future, BaseFunction<R, T> function) {
-        KafkaFutureImpl<R> futureImpl = (KafkaFutureImpl<R>) future;
-        futureImpl.addWaiter(new Applicant<>(function, this));
+        ((KafkaFutureImpl<R>) future).completableFuture.thenApply(function::apply).whenComplete((t, throwable) -> {
+            if (throwable != null) {
+                completableFuture.completeExceptionally(throwable);
+            } else {
+                completableFuture.complete(t);
+            }
+        });
+
     }
 
     /**
@@ -156,100 +75,68 @@ R await(long timeout, TimeUnit unit)
      */
     @Override
     public <R> KafkaFuture<R> thenApply(Function<T, R> function) {
-        return thenApply((BaseFunction<T, R>) function);
+        return new KafkaFutureImpl<>(completableFuture.thenApply(function::apply));
     }
 
-    private static class WhenCompleteBiConsumer<T> implements BiConsumer<T, Throwable> {
-        private final KafkaFutureImpl<T> future;
-        private final BiConsumer<? super T, ? super Throwable> biConsumer;
-
-        WhenCompleteBiConsumer(KafkaFutureImpl<T> future, BiConsumer<? super T, ? super Throwable> biConsumer) {
-            this.future = future;
-            this.biConsumer = biConsumer;
-        }
-
-        @Override
-        public void accept(T val, Throwable exception) {
+    @Override
+    public KafkaFuture<T> whenComplete(final BiConsumer<? super T, ? super Throwable> biConsumer) {
+        KafkaFutureImpl<T> dependent = new KafkaFutureImpl<>();
+        completableFuture.whenComplete((t, throwable) -> {
             try {
-                if (exception != null) {
-                    biConsumer.accept(null, exception);
+                if (throwable instanceof CompletionException) {
+                    if (throwable.getCause() instanceof CancellationException) {
+                        biConsumer.accept(null, throwable.getCause());
+                        dependent.cancel(false);
+                    } else {
+                        biConsumer.accept(null, throwable.getCause());
+                        dependent.completeExceptionally(throwable.getCause());
+                    }
+                } else if (throwable != null) {
+                    biConsumer.accept(null, throwable);
+                    dependent.completeExceptionally(throwable);
                 } else {
-                    biConsumer.accept(val, null);
+                    biConsumer.accept(t, null);
+                    dependent.complete(t);
                 }
-            } catch (Throwable e) {
-                if (exception == null) {
-                    exception = e;
+            } catch (Exception e) {
+                Throwable throwableToCompleteWith;
+                if (throwable == null) {
+                    throwableToCompleteWith = e;
+                } else if (throwable instanceof CompletionException) {
+                    throwableToCompleteWith = throwable.getCause();
+                } else {
+                    throwableToCompleteWith = throwable;
                 }
+                dependent.completeExceptionally(throwableToCompleteWith);
             }
-            if (exception != null) {
-                future.completeExceptionally(exception);
-            } else {
-                future.complete(val);
-            }
-        }
-    }
-
-    @Override
-    public KafkaFuture<T> whenComplete(final BiConsumer<? super T, ? super Throwable> biConsumer) {
-        final KafkaFutureImpl<T> future = new KafkaFutureImpl<>();
-        addWaiter(new WhenCompleteBiConsumer<>(future, biConsumer));
-        return future;
+        });
+        return dependent;
     }
 
     protected synchronized void addWaiter(BiConsumer<? super T, ? super Throwable> action) {
-        if (exception != null) {
-            action.accept(null, exception);
-        } else if (done) {
-            action.accept(value, null);
-        } else {
-            waiters.add(action);
-        }
+        completableFuture.whenComplete(action::accept);
     }
 
     @Override
-    public synchronized boolean complete(T newValue) {
-        List<BiConsumer<? super T, ? super Throwable>> oldWaiters = null;
-        synchronized (this) {
-            if (done)
-                return false;
-            value = newValue;
-            done = true;
-            oldWaiters = waiters;
-            waiters = null;
-        }
-        for (BiConsumer<? super T, ? super Throwable> waiter : oldWaiters) {
-            waiter.accept(newValue, null);
-        }
-        return true;
+    public boolean complete(T newValue) {
+        return completableFuture.complete(newValue);
     }
 
     @Override
     public boolean completeExceptionally(Throwable newException) {
-        List<BiConsumer<? super T, ? super Throwable>> oldWaiters = null;
-        synchronized (this) {
-            if (done)
-                return false;
-            exception = newException;
-            done = true;
-            oldWaiters = waiters;
-            waiters = null;
-        }
-        for (BiConsumer<? super T, ? super Throwable> waiter : oldWaiters) {
-            waiter.accept(null, newException);
-        }
-        return true;
+        return completableFuture.completeExceptionally(newException);
     }
 
     /**
      * If not already completed, completes this future with a CancellationException.  Dependent
      * futures that have not already completed will also complete exceptionally, with a
      * CompletionException caused by this CancellationException.
+     *
+     * TODO was this true?
      */
     @Override
-    public synchronized boolean cancel(boolean mayInterruptIfRunning) {
-        if (completeExceptionally(new CancellationException()))
-            return true;
-        return exception instanceof CancellationException;
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        return completableFuture.cancel(mayInterruptIfRunning);
     }
 
     /**
@@ -257,9 +144,7 @@ public synchronized boolean cancel(boolean mayInterruptIfRunning) {
      */
     @Override
     public T get() throws InterruptedException, ExecutionException {
-        SingleWaiter<T> waiter = new SingleWaiter<T>();
-        addWaiter(waiter);
-        return waiter.await();
+        return completableFuture.get();
     }
 
     /**
@@ -269,9 +154,7 @@ public T get() throws InterruptedException, ExecutionException {
     @Override
     public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
             TimeoutException {
-        SingleWaiter<T> waiter = new SingleWaiter<T>();
-        addWaiter(waiter);
-        return waiter.await(timeout, unit);
+        return completableFuture.get(timeout, unit);
     }
 
     /**
@@ -279,40 +162,41 @@ public T get(long timeout, TimeUnit unit) throws InterruptedException, Execution
      * the given valueIfAbsent.
      */
     @Override
-    public synchronized T getNow(T valueIfAbsent) throws InterruptedException, ExecutionException {
-        if (exception != null)
-            wrapAndThrow(exception);
-        if (done)
-            return value;
-        return valueIfAbsent;
+    public T getNow(T valueIfAbsent) throws InterruptedException, ExecutionException {
+        return completableFuture.getNow(valueIfAbsent);
     }
 
     /**
      * Returns true if this CompletableFuture was cancelled before it completed normally.
      */
     @Override
-    public synchronized boolean isCancelled() {
-        return (exception != null) && (exception instanceof CancellationException);
+    public boolean isCancelled() {
+        return completableFuture.isCancelled();
     }
 
     /**
      * Returns true if this CompletableFuture completed exceptionally, in any way.
      */
     @Override
-    public synchronized boolean isCompletedExceptionally() {
-        return exception != null;
+    public boolean isCompletedExceptionally() {
+        return completableFuture.isCompletedExceptionally();
     }
 
     /**
      * Returns true if completed in any fashion: normally, exceptionally, or via cancellation.
      */
     @Override
-    public synchronized boolean isDone() {
-        return done;
+    public boolean isDone() {
+        return completableFuture.isDone();
     }
 
     @Override
     public String toString() {
-        return String.format("KafkaFuture{value=%s,exception=%s,done=%b}", value, exception, done);
+        return String.format("KafkaFuture{future=%s}", completableFuture);
+    }
+
+    @Override
+    public CompletableFuture<T> toCompletableFuture() {
+        return completableFuture;
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
index 6f9efca7c66..4e25abc77e0 100644
--- a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
@@ -24,6 +24,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -124,6 +125,222 @@ public Integer apply(Integer integer) {
         assertTrue(futureAppliedFail.isCompletedExceptionally());
     }
 
+    @Test
+    public void testWhenCompleteNormalCompletion() {
+        String value = "Ready to roll out!";
+        KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
+        QueryableBiConsumer consumer = new QueryableBiConsumer();
+        future.whenComplete(consumer);
+        future.complete(value);
+        assertTrue(consumer.getThrowable() == null);
+        assertEquals(value, consumer.getValue());
+    }
+
+    @Test
+    public void testWhenCompleteExceptionalCompletion() {
+        RuntimeException exception = new RuntimeException("I'm in deep!");
+        KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
+        QueryableBiConsumer consumer = new QueryableBiConsumer();
+        future.whenComplete(consumer);
+        future.completeExceptionally(exception);
+        assertTrue(consumer.getValue() == null);
+        assertEquals(exception, consumer.getThrowable());
+    }
+
+    @Test
+    public void testWhenCompleteChained() {
+        String value = "Fueled up, ready to go!";
+        RuntimeException exceptionOnCompletion = new RuntimeException("I'm about to drop the hammer");
+        KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
+        QueryableBiConsumer firstConsumer = new QueryableBiConsumer() {
+            @Override
+            public void accept(String s, Throwable throwable) {
+                super.accept(s, throwable);
+                throw exceptionOnCompletion;
+            }
+        };
+        QueryableBiConsumer secondConsumer = new QueryableBiConsumer();
+        QueryableBiConsumer thirdConsumer = new QueryableBiConsumer();
+        KafkaFuture<String> futureByWhenComplete = future.whenComplete(firstConsumer);
+        future.whenComplete(secondConsumer);
+        futureByWhenComplete.whenComplete(thirdConsumer);
+        future.complete(value);
+        assertTrue(firstConsumer.getThrowable() == null);
+        assertEquals(value, firstConsumer.getValue());
+        assertTrue(secondConsumer.getThrowable() == null);
+        assertEquals(value, secondConsumer.getValue());
+        assertTrue(thirdConsumer.getValue() == null);
+        assertEquals(exceptionOnCompletion, thirdConsumer.getThrowable());
+    }
+
+    @Test
+    public void testWhenCompleteChainedCompletedExceptionally() {
+        RuntimeException exceptionOnCompletion = new RuntimeException("Can I take your order?");
+        RuntimeException exceptionOnCompleteExceptionally = new RuntimeException("In the pipe, five by five");
+
+        KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
+        QueryableBiConsumer firstConsumer = new QueryableBiConsumer() {
+            @Override
+            public void accept(String s, Throwable throwable) {
+                super.accept(s, throwable);
+                throw exceptionOnCompletion;
+            }
+        };
+        QueryableBiConsumer secondConsumer = new QueryableBiConsumer();
+        QueryableBiConsumer thirdConsumer = new QueryableBiConsumer();
+        KafkaFuture<String> futureByWhenComplete = future.whenComplete(firstConsumer);
+        future.whenComplete(secondConsumer);
+        futureByWhenComplete.whenComplete(thirdConsumer);
+
+        future.completeExceptionally(exceptionOnCompleteExceptionally);
+
+        assertTrue(firstConsumer.getThrowable() == exceptionOnCompleteExceptionally);
+        assertEquals(null, firstConsumer.getValue());
+        assertTrue(secondConsumer.getThrowable() == exceptionOnCompleteExceptionally);
+        assertEquals(null, secondConsumer.getValue());
+        assertTrue(thirdConsumer.getValue() == null);
+        assertEquals(exceptionOnCompleteExceptionally, thirdConsumer.getThrowable());
+    }
+
+    @Test
+    public void testWhenCompleteAfterThenApplyThrows() {
+        String value = "What's our target?";
+        RuntimeException exceptionOnApply = new RuntimeException("What is your major malfunction?");
+        KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
+        QueryableBiConsumer consumer = new QueryableBiConsumer();
+        KafkaFuture.BaseFunction<String, String> throwerFunction = s -> {
+            throw exceptionOnApply;
+        };
+
+        KafkaFuture<String> futureByThenApply = future.thenApply(throwerFunction);
+        KafkaFuture<String> futureByWhenComplete = futureByThenApply.whenComplete(consumer);
+        future.complete(value);
+        assertTrue(consumer.getValue() == null);
+        assertEquals(exceptionOnApply, consumer.getThrowable());
+        assertTrue(futureByWhenComplete.isCompletedExceptionally());
+    }
+
+    @Test
+    public void testWhenCompleteAfterCancel() {
+        KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
+        QueryableBiConsumer first = new QueryableBiConsumer();
+        QueryableBiConsumer second = new QueryableBiConsumer();
+        KafkaFuture<String> futureByFirst = future.whenComplete(first);
+        KafkaFuture<String> futureBySecond = futureByFirst.whenComplete(second);
+
+        future.cancel(false);
+
+        assertTrue(futureByFirst.isDone());
+        assertTrue(futureByFirst.isCancelled());
+        assertTrue(first.getThrowable() instanceof CancellationException);
+        assertTrue(futureBySecond.isDone());
+        assertTrue(futureBySecond.isCancelled());
+        assertTrue(second.getThrowable() instanceof CancellationException);
+
+
+    }
+
+    @Test
+    public void testCopyWith() throws Exception {
+        String newValue = "I have returned";
+        KafkaFutureImpl<String> dependee = new KafkaFutureImpl<>();
+        KafkaFutureImpl<Integer> dependent = new KafkaFutureImpl<>();
+        dependent.copyWith(dependee, String::length);
+
+        dependee.complete(newValue);
+
+        assertEquals(newValue, dependee.get());
+        assertEquals(newValue.length(), (int) dependent.get());
+    }
+
+    @Test
+    public void testCopyWithFunctionFails() throws Exception {
+        String newValue = "Orders received";
+        RuntimeException exception = new RuntimeException("I can't build there");
+        KafkaFutureImpl<String> dependee = new KafkaFutureImpl<>();
+        KafkaFutureImpl<String> dependent = new KafkaFutureImpl<>();
+        dependent.copyWith(dependee, s -> {
+            throw exception;
+        });
+        QueryableBiConsumer consumer = new QueryableBiConsumer();
+        dependent.whenComplete(consumer);
+
+        dependee.complete(newValue);
+
+        assertEquals(newValue, dependee.get());
+        assertTrue(consumer.getValue() == null);
+        assertEquals(exception, consumer.getThrowable());
+        assertEquals(newValue, dependee.get());
+
+    }
+
+    @Test
+    public void testCopyWithCompletedExceptionally() throws Exception {
+        String newValue = "Orders received";
+        RuntimeException exception = new RuntimeException("I can't build it.");
+        KafkaFutureImpl<String> dependee = new KafkaFutureImpl<>();
+        KafkaFutureImpl<String> dependent = new KafkaFutureImpl<>();
+        dependent.copyWith(dependee, s -> s + "Something in the way");
+        QueryableBiConsumer consumer = new QueryableBiConsumer();
+        dependent.whenComplete(consumer);
+
+        dependee.completeExceptionally(exception);
+
+        assertTrue(dependent.isCompletedExceptionally());
+        assertTrue(consumer.getValue() == null);
+        assertEquals(exception, consumer.getThrowable());
+        assertTrue(dependee.isCompletedExceptionally());
+    }
+
+    @Test
+    public void testCopyWithCancelled() throws Exception {
+        KafkaFutureImpl<String> dependee = new KafkaFutureImpl<>();
+        KafkaFutureImpl<String> dependent = new KafkaFutureImpl<>();
+        dependent.copyWith(dependee, s -> "Affirmative");
+        QueryableBiConsumer consumer = new QueryableBiConsumer();
+        dependent.whenComplete(consumer);
+
+        dependee.cancel(false);
+
+        assertTrue(dependee.isCompletedExceptionally());
+        assertTrue(dependent.isDone());
+        assertTrue(dependent.isCompletedExceptionally());
+        assertTrue(consumer.getValue() == null);
+        assertTrue(consumer.getThrowable() instanceof CancellationException);
+    }
+
+
+    @Test
+    public void testCancel() {
+        KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
+
+        assertTrue("Must be able to cancel", future.cancel(false));
+
+        assertTrue(future.isCancelled());
+        assertTrue(future.isCompletedExceptionally());
+        assertTrue(future.isDone());
+    }
+
+    private static class QueryableBiConsumer implements KafkaFuture.BiConsumer<String, Throwable> {
+
+        private String value;
+        private Throwable throwable;
+
+        @Override
+        public void accept(String s, Throwable throwable) {
+            this.value = s;
+            this.throwable = throwable;
+        }
+
+        public String getValue() {
+            return value;
+        }
+
+        public Throwable getThrowable() {
+            return throwable;
+        }
+    }
+
     private static class CompleterThread<T> extends Thread {
 
         private final KafkaFutureImpl<T> future;
@@ -222,4 +439,6 @@ public void testFutureTimeoutWithZeroWait() throws Exception {
         future.get(0, TimeUnit.MILLISECONDS);
     }
 
+
+
 }
diff --git a/gradle/findbugs-exclude.xml b/gradle/findbugs-exclude.xml
index 70c9e770ec3..4af052bc483 100644
--- a/gradle/findbugs-exclude.xml
+++ b/gradle/findbugs-exclude.xml
@@ -94,6 +94,12 @@ For a detailed description of findbugs bug categories, see http://findbugs.sourc
         <Bug pattern="EQ_UNUSUAL"/>
     </Match>
 
+   <Match>
+        <!-- Null is a valid parameter to CompletableFuture.complete(). Exclude false positive. -->
+        <Source name="KafkaAdminClient.java"/>
+        <Bug pattern="NP_NULL_PARAM_DEREF_ALL_TARGETS_DANGEROUS"/>
+    </Match>
+
     <Match>
         <!-- Add a suppression for auto-generated calls to instanceof in kafka.utils.Json -->
         <Source name="Json.scala"/>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Reimplement KafkaFuture with CompletableFuture
> ----------------------------------------------
>
>                 Key: KAFKA-6987
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6987
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 2.0.0
>            Reporter: Andras Beni
>            Priority: Minor
>
> KafkaFuture documentation states:
> {{This will eventually become a thin shim on top of Java 8's CompletableFuture.}}
> With Java 7 support dropped in 2.0, it is time to get rid of custom code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)