You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/01/30 23:56:00 UTC
[kafka] branch trunk updated: KAFKA-6018: Make KafkaFuture.Future
an interface (KIP-218)
This is an automated email from the ASF dual-hosted git repository.
ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ae42cc8 KAFKA-6018: Make KafkaFuture.Future an interface (KIP-218)
ae42cc8 is described below
commit ae42cc803023c795eae00d138ea22a6a7951db31
Author: Steven Aerts <st...@gmail.com>
AuthorDate: Tue Jan 30 15:55:49 2018 -0800
KAFKA-6018: Make KafkaFuture.Future an interface (KIP-218)
Changing KafkaFuture.Future and KafkaFuture.BiConsumer into an interface makes
them a functional interface. This makes them Java 8 lambda compatible.
Author: Colin P. Mccabe <cm...@confluent.io>
Author: Steven Aerts <st...@gmail.com>
Reviewers: Colin P. Mccabe <cm...@confluent.io>, Ismael Juma <is...@juma.me.uk>, Xavier Léauté <xl...@xvrl.net>, Tom Bentley <tb...@redhat.com>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #4033 from steven-aerts/KAFKA-6018
---
.../kafka/clients/admin/DeleteAclsResult.java | 2 +-
.../kafka/clients/admin/DescribeConfigsResult.java | 2 +-
.../kafka/clients/admin/DescribeLogDirsResult.java | 2 +-
.../admin/DescribeReplicaLogDirsResult.java | 2 +-
.../kafka/clients/admin/DescribeTopicsResult.java | 2 +-
.../kafka/clients/admin/ListTopicsResult.java | 4 +-
.../java/org/apache/kafka/common/KafkaFuture.java | 52 +++++++++++++++++---
.../kafka/common/internals/KafkaFutureImpl.java | 57 +++++++++++++++++++---
.../org/apache/kafka/common/KafkaFutureTest.java | 41 ++++++++++++++++
.../apache/kafka/common/utils/MockScheduler.java | 2 +-
.../apache/kafka/trogdor/agent/WorkerManager.java | 2 +-
11 files changed, 147 insertions(+), 21 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
index 90bc297..19df228 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
@@ -102,7 +102,7 @@ public class DeleteAclsResult {
*/
public KafkaFuture<Collection<AclBinding>> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply(
- new KafkaFuture.Function<Void, Collection<AclBinding>>() {
+ new KafkaFuture.BaseFunction<Void, Collection<AclBinding>>() {
@Override
public Collection<AclBinding> apply(Void v) {
List<AclBinding> acls = new ArrayList<>();
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
index 478bf05..e5d79e8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
@@ -53,7 +53,7 @@ public class DescribeConfigsResult {
*/
public KafkaFuture<Map<ConfigResource, Config>> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
- thenApply(new KafkaFuture.Function<Void, Map<ConfigResource, Config>>() {
+ thenApply(new KafkaFuture.BaseFunction<Void, Map<ConfigResource, Config>>() {
@Override
public Map<ConfigResource, Config> apply(Void v) {
Map<ConfigResource, Config> configs = new HashMap<>(futures.size());
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java
index de186fd..7c7bde7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java
@@ -51,7 +51,7 @@ public class DescribeLogDirsResult {
*/
public KafkaFuture<Map<Integer, Map<String, LogDirInfo>>> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
- thenApply(new KafkaFuture.Function<Void, Map<Integer, Map<String, LogDirInfo>>>() {
+ thenApply(new KafkaFuture.BaseFunction<Void, Map<Integer, Map<String, LogDirInfo>>>() {
@Override
public Map<Integer, Map<String, LogDirInfo>> apply(Void v) {
Map<Integer, Map<String, LogDirInfo>> descriptions = new HashMap<>(futures.size());
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java
index 401b4aa..d4c8479 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java
@@ -52,7 +52,7 @@ public class DescribeReplicaLogDirsResult {
*/
public KafkaFuture<Map<TopicPartitionReplica, ReplicaLogDirInfo>> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
- thenApply(new KafkaFuture.Function<Void, Map<TopicPartitionReplica, ReplicaLogDirInfo>>() {
+ thenApply(new KafkaFuture.BaseFunction<Void, Map<TopicPartitionReplica, ReplicaLogDirInfo>>() {
@Override
public Map<TopicPartitionReplica, ReplicaLogDirInfo> apply(Void v) {
Map<TopicPartitionReplica, ReplicaLogDirInfo> replicaLogDirInfos = new HashMap<>();
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
index 18f5f9d..6bb24d9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
@@ -51,7 +51,7 @@ public class DescribeTopicsResult {
*/
public KafkaFuture<Map<String, TopicDescription>> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
- thenApply(new KafkaFuture.Function<Void, Map<String, TopicDescription>>() {
+ thenApply(new KafkaFuture.BaseFunction<Void, Map<String, TopicDescription>>() {
@Override
public Map<String, TopicDescription> apply(Void v) {
Map<String, TopicDescription> descriptions = new HashMap<>(futures.size());
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
index e54b3de..a2e17fd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
@@ -48,7 +48,7 @@ public class ListTopicsResult {
* Return a future which yields a collection of TopicListing objects.
*/
public KafkaFuture<Collection<TopicListing>> listings() {
- return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<TopicListing>>() {
+ return future.thenApply(new KafkaFuture.BaseFunction<Map<String, TopicListing>, Collection<TopicListing>>() {
@Override
public Collection<TopicListing> apply(Map<String, TopicListing> namesToDescriptions) {
return namesToDescriptions.values();
@@ -60,7 +60,7 @@ public class ListTopicsResult {
* Return a future which yields a collection of topic names.
*/
public KafkaFuture<Set<String>> names() {
- return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Set<String>>() {
+ return future.thenApply(new KafkaFuture.BaseFunction<Map<String, TopicListing>, Set<String>>() {
@Override
public Set<String> apply(Map<String, TopicListing> namesToListings) {
return namesToListings.keySet();
diff --git a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
index 23e2181..9cd2e01 100644
--- a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
+++ b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
@@ -35,18 +35,26 @@ public abstract class KafkaFuture<T> implements Future<T> {
/**
* A function which takes objects of type A and returns objects of type B.
*/
- public static abstract class Function<A, B> {
- public abstract B apply(A a);
+ public interface BaseFunction<A, B> {
+ B apply(A a);
}
/**
+ * A function which takes objects of type A and returns objects of type B.
+ *
+ * Prefer the functional interface {@link BaseFunction} over the class {@link Function}. This class is here for
+ * backwards compatibility reasons and might be deprecated/removed in a future release.
+ */
+ public static abstract class Function<A, B> implements BaseFunction<A, B> { }
+
+ /**
* A consumer of two different types of object.
*/
- public static abstract class BiConsumer<A, B> {
- public abstract void accept(A a, B b);
+ public interface BiConsumer<A, B> {
+ void accept(A a, B b);
}
- private static class AllOfAdapter<R> extends BiConsumer<R, Throwable> {
+ private static class AllOfAdapter<R> implements BiConsumer<R, Throwable> {
private int remainingResponses;
private KafkaFuture<?> future;
@@ -101,11 +109,43 @@ public abstract class KafkaFuture<T> implements Future<T> {
/**
* Returns a new KafkaFuture that, when this future completes normally, is executed with this
* futures's result as the argument to the supplied function.
+ *
+ * The function may be invoked by the thread that calls {@code thenApply} or it may be invoked by the thread that
+ * completes the future.
+ */
+ public abstract <R> KafkaFuture<R> thenApply(BaseFunction<T, R> function);
+
+ /**
+ * @see KafkaFuture#thenApply(BaseFunction)
+ *
+ * Prefer {@link KafkaFuture#thenApply(BaseFunction)} as this function is here for backwards compatibility reasons
+ * and might be deprecated/removed in a future release.
*/
public abstract <R> KafkaFuture<R> thenApply(Function<T, R> function);
- protected abstract void addWaiter(BiConsumer<? super T, ? super Throwable> action);
+ /**
+ * Returns a new KafkaFuture with the same result or exception as this future, that executes the given action
+ * when this future completes.
+ *
+ * When this future is done, the given action is invoked with the result (or null if none) and the exception
+ * (or null if none) of this future as arguments.
+ *
+ * The returned future is completed when the action returns.
+ * The supplied action should not throw an exception. However, if it does, the following rules apply:
+ * if this future completed normally but the supplied action throws an exception, then the returned future completes
+ * exceptionally with the supplied action's exception.
+ * Or, if this future completed exceptionally and the supplied action throws an exception, then the returned future
+ * completes exceptionally with this future's exception.
+ *
+ * The action may be invoked by the thread that calls {@code whenComplete} or it may be invoked by the thread that
+ * completes the future.
+ *
+ * @param action the action to preform
+ * @return the new future
+ */
+ public abstract KafkaFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
+ protected abstract void addWaiter(BiConsumer<? super T, ? super Throwable> action);
/**
* If not already completed, sets the value returned by get() and related methods to the given
* value.
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
index e2dbdf9..049ed49 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
@@ -46,11 +46,11 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
}
}
- private static class Applicant<A, B> extends BiConsumer<A, Throwable> {
- private final Function<A, B> function;
+ private static class Applicant<A, B> implements BiConsumer<A, Throwable> {
+ private final BaseFunction<A, B> function;
private final KafkaFutureImpl<B> future;
- Applicant(Function<A, B> function, KafkaFutureImpl<B> future) {
+ Applicant(BaseFunction<A, B> function, KafkaFutureImpl<B> future) {
this.function = function;
this.future = future;
}
@@ -70,7 +70,7 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
}
}
- private static class SingleWaiter<R> extends BiConsumer<R, Throwable> {
+ private static class SingleWaiter<R> implements BiConsumer<R, Throwable> {
private R value = null;
private Throwable exception = null;
private boolean done = false;
@@ -140,14 +140,59 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
* futures's result as the argument to the supplied function.
*/
@Override
- public <R> KafkaFuture<R> thenApply(Function<T, R> function) {
+ public <R> KafkaFuture<R> thenApply(BaseFunction<T, R> function) {
KafkaFutureImpl<R> future = new KafkaFutureImpl<R>();
addWaiter(new Applicant<>(function, future));
return future;
}
+
+ /**
+ * @See KafkaFutureImpl#thenApply(BaseFunction)
+ */
+ @Override
+ public <R> KafkaFuture<R> thenApply(Function<T, R> function) {
+ return thenApply((BaseFunction<T, R>) function);
+ }
+
+ private static class WhenCompleteBiConsumer<T> implements BiConsumer<T, Throwable> {
+ private final KafkaFutureImpl<T> future;
+ private final BiConsumer<? super T, ? super Throwable> biConsumer;
+
+ WhenCompleteBiConsumer(KafkaFutureImpl<T> future, BiConsumer<? super T, ? super Throwable> biConsumer) {
+ this.future = future;
+ this.biConsumer = biConsumer;
+ }
+
+ @Override
+ public void accept(T val, Throwable exception) {
+ try {
+ if (exception != null) {
+ biConsumer.accept(null, exception);
+ } else {
+ biConsumer.accept(val, null);
+ }
+ } catch (Throwable e) {
+ if (exception == null) {
+ exception = e;
+ }
+ }
+ if (exception != null) {
+ future.completeExceptionally(exception);
+ } else {
+ future.complete(val);
+ }
+ }
+ }
+
@Override
- protected synchronized void addWaiter(BiConsumer<? super T, ? super Throwable> action) {
+ public KafkaFuture<T> whenComplete(final BiConsumer<? super T, ? super Throwable> biConsumer) {
+ final KafkaFutureImpl<T> future = new KafkaFutureImpl<>();
+ addWaiter(new WhenCompleteBiConsumer<>(future, biConsumer));
+ return future;
+ }
+
+ public synchronized void addWaiter(BiConsumer<? super T, ? super Throwable> action) {
if (exception != null) {
action.accept(null, exception);
} else if (done) {
diff --git a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
index 71f3c3c..6f9efca 100644
--- a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
@@ -83,6 +83,47 @@ public class KafkaFutureTest {
assertEquals(null, myThread.testException);
}
+ @Test
+ public void testThenApply() throws Exception {
+ KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
+ KafkaFuture<Integer> doubledFuture = future.thenApply(new KafkaFuture.BaseFunction<Integer, Integer>() {
+ @Override
+ public Integer apply(Integer integer) {
+ return 2 * integer;
+ }
+ });
+ assertFalse(doubledFuture.isDone());
+ KafkaFuture<Integer> tripledFuture = future.thenApply(new KafkaFuture.Function<Integer, Integer>() {
+ @Override
+ public Integer apply(Integer integer) {
+ return 3 * integer;
+ }
+ });
+ assertFalse(tripledFuture.isDone());
+ future.complete(21);
+ assertEquals(Integer.valueOf(21), future.getNow(-1));
+ assertEquals(Integer.valueOf(42), doubledFuture.getNow(-1));
+ assertEquals(Integer.valueOf(63), tripledFuture.getNow(-1));
+ KafkaFuture<Integer> quadrupledFuture = future.thenApply(new KafkaFuture.BaseFunction<Integer, Integer>() {
+ @Override
+ public Integer apply(Integer integer) {
+ return 4 * integer;
+ }
+ });
+ assertEquals(Integer.valueOf(84), quadrupledFuture.getNow(-1));
+
+ KafkaFutureImpl<Integer> futureFail = new KafkaFutureImpl<>();
+ KafkaFuture<Integer> futureAppliedFail = futureFail.thenApply(new KafkaFuture.BaseFunction<Integer, Integer>() {
+ @Override
+ public Integer apply(Integer integer) {
+ return 2 * integer;
+ }
+ });
+ futureFail.completeExceptionally(new RuntimeException());
+ assertTrue(futureFail.isCompletedExceptionally());
+ assertTrue(futureAppliedFail.isCompletedExceptionally());
+ }
+
private static class CompleterThread<T> extends Thread {
private final KafkaFutureImpl<T> future;
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java b/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
index ba5e1ed..7134f0a 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
@@ -87,7 +87,7 @@ public class MockScheduler implements Scheduler, MockTime.MockTimeListener {
final Callable<T> callable, long delayMs) {
final KafkaFutureImpl<T> future = new KafkaFutureImpl<>();
KafkaFutureImpl<Long> waiter = new KafkaFutureImpl<>();
- waiter.thenApply(new KafkaFuture.Function<Long, Void>() {
+ waiter.thenApply(new KafkaFuture.BaseFunction<Long, Void>() {
@Override
public Void apply(final Long now) {
executor.submit(new Callable<Void>() {
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
index 3c03e1e..cda7773 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
@@ -290,7 +290,7 @@ public final class WorkerManager {
return;
}
KafkaFutureImpl<String> haltFuture = new KafkaFutureImpl<>();
- haltFuture.thenApply(new KafkaFuture.Function<String, Void>() {
+ haltFuture.thenApply(new KafkaFuture.BaseFunction<String, Void>() {
@Override
public Void apply(String errorString) {
if (errorString.isEmpty()) {
--
To stop receiving notification emails like this one, please contact
ewencp@apache.org.