You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/01 12:05:59 UTC
flink git commit: [FLINK-7327] [futures] Replace Flink's future with
Java 8's CompletableFuture in StreamRecordQueueEntry
Repository: flink
Updated Branches:
refs/heads/master fcac882d2 -> f41eb4b1e
[FLINK-7327] [futures] Replace Flink's future with Java 8's CompletableFuture in StreamRecordQueueEntry
This closes #4442.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f41eb4b1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f41eb4b1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f41eb4b1
Branch: refs/heads/master
Commit: f41eb4b1ea0112cce4b4edb4a25037fafa2aac23
Parents: fcac882
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 31 21:31:26 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Aug 1 14:04:43 2017 +0200
----------------------------------------------------------------------
.../api/operators/async/AsyncWaitOperator.java | 10 ++++----
.../async/queue/OrderedStreamElementQueue.java | 10 ++++----
.../async/queue/StreamElementQueueEntry.java | 24 ++++++++------------
.../async/queue/StreamRecordQueueEntry.java | 8 +++----
.../queue/UnorderedStreamElementQueue.java | 10 ++++----
.../async/queue/WatermarkQueueEntry.java | 10 ++++----
6 files changed, 29 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f41eb4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index 56c199d..a0f626e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
@@ -217,12 +216,11 @@ public class AsyncWaitOperator<IN, OUT>
// Cancel the timer once we've completed the stream record buffer entry. This will remove
// the register trigger task
- streamRecordBufferEntry.onComplete(new AcceptFunction<StreamElementQueueEntry<Collection<OUT>>>() {
- @Override
- public void accept(StreamElementQueueEntry<Collection<OUT>> value) {
+ streamRecordBufferEntry.onComplete(
+ (StreamElementQueueEntry<Collection<OUT>> value) -> {
timerFuture.cancel(true);
- }
- }, executor);
+ },
+ executor);
}
addAsyncBufferEntry(streamRecordBufferEntry);
http://git-wip-us.apache.org/repos/asf/flink/blob/f41eb4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
index e573fc1..5133809 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.api.operators.async.queue;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.streaming.api.operators.async.OperatorActions;
import org.apache.flink.util.Preconditions;
@@ -193,9 +192,8 @@ public class OrderedStreamElementQueue implements StreamElementQueue {
queue.addLast(streamElementQueueEntry);
- streamElementQueueEntry.onComplete(new AcceptFunction<StreamElementQueueEntry<T>>() {
- @Override
- public void accept(StreamElementQueueEntry<T> value) {
+ streamElementQueueEntry.onComplete(
+ (StreamElementQueueEntry<T> value) -> {
try {
onCompleteHandler(value);
} catch (InterruptedException e) {
@@ -206,8 +204,8 @@ public class OrderedStreamElementQueue implements StreamElementQueue {
operatorActions.failOperator(new Exception("Could not complete the " +
"stream element queue entry: " + value + '.', t));
}
- }
- }, executor);
+ },
+ executor);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/f41eb4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
index 67b1f0f..c59e012 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
@@ -19,13 +19,12 @@
package org.apache.flink.streaming.api.operators.async.queue;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.BiFunction;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.util.Preconditions;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
+import java.util.function.Consumer;
/**
* Entry class for the {@link StreamElementQueue}. The stream element queue entry stores the
@@ -63,23 +62,18 @@ public abstract class StreamElementQueueEntry<T> implements AsyncResult {
* @param executor to run the complete function
*/
public void onComplete(
- final AcceptFunction<StreamElementQueueEntry<T>> completeFunction,
+ final Consumer<StreamElementQueueEntry<T>> completeFunction,
Executor executor) {
final StreamElementQueueEntry<T> thisReference = this;
- getFuture().handleAsync(new BiFunction<T, Throwable, Void>() {
- @Override
- public Void apply(T t, Throwable throwable) {
- // call the complete function for normal completion as well as exceptional completion
- // see FLINK-6435
- completeFunction.accept(thisReference);
-
- return null;
- }
- }, executor);
+ getFuture().whenCompleteAsync(
+ // call the complete function for normal completion as well as exceptional completion
+ // see FLINK-6435
+ (value, throwable) -> completeFunction.accept(thisReference),
+ executor);
}
- protected abstract Future<T> getFuture();
+ protected abstract CompletableFuture<T> getFuture();
@Override
public final boolean isWatermark() {
http://git-wip-us.apache.org/repos/asf/flink/blob/f41eb4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
index 708bf17..2aca10e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
@@ -19,14 +19,12 @@
package org.apache.flink.streaming.api.operators.async.queue;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
/**
* {@link StreamElementQueueEntry} implementation for {@link StreamRecord}. This class also acts
@@ -52,7 +50,7 @@ public class StreamRecordQueueEntry<OUT> extends StreamElementQueueEntry<Collect
hasTimestamp = streamRecord.hasTimestamp();
timestamp = streamRecord.getTimestamp();
- resultFuture = new FlinkCompletableFuture<>();
+ resultFuture = new CompletableFuture<>();
}
@Override
@@ -71,7 +69,7 @@ public class StreamRecordQueueEntry<OUT> extends StreamElementQueueEntry<Collect
}
@Override
- protected Future<Collection<OUT>> getFuture() {
+ protected CompletableFuture<Collection<OUT>> getFuture() {
return resultFuture;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f41eb4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
index e6f71bf..e2c3426 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.api.operators.async.queue;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.streaming.api.operators.async.OperatorActions;
import org.apache.flink.util.Preconditions;
@@ -285,9 +284,8 @@ public class UnorderedStreamElementQueue implements StreamElementQueue {
lastSet.add(streamElementQueueEntry);
}
- streamElementQueueEntry.onComplete(new AcceptFunction<StreamElementQueueEntry<T>>() {
- @Override
- public void accept(StreamElementQueueEntry<T> value) {
+ streamElementQueueEntry.onComplete(
+ (StreamElementQueueEntry<T> value) -> {
try {
onCompleteHandler(value);
} catch (InterruptedException e) {
@@ -299,8 +297,8 @@ public class UnorderedStreamElementQueue implements StreamElementQueue {
operatorActions.failOperator(new Exception("Could not complete the " +
"stream element queue entry: " + value + '.', t));
}
- }
- }, executor);
+ },
+ executor);
numberEntries++;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f41eb4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
index 1f48303..c9c6e74 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
@@ -19,22 +19,22 @@
package org.apache.flink.streaming.api.operators.async.queue;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.streaming.api.watermark.Watermark;
+import java.util.concurrent.CompletableFuture;
+
/**
* {@link StreamElementQueueEntry} implementation for the {@link Watermark}.
*/
@Internal
public class WatermarkQueueEntry extends StreamElementQueueEntry<Watermark> implements AsyncWatermarkResult {
- private final Future<Watermark> future;
+ private final CompletableFuture<Watermark> future;
public WatermarkQueueEntry(Watermark watermark) {
super(watermark);
- this.future = FlinkCompletableFuture.completed(watermark);
+ this.future = CompletableFuture.completedFuture(watermark);
}
@Override
@@ -43,7 +43,7 @@ public class WatermarkQueueEntry extends StreamElementQueueEntry<Watermark> impl
}
@Override
- protected Future<Watermark> getFuture() {
+ protected CompletableFuture<Watermark> getFuture() {
return future;
}
}