You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/01 12:05:59 UTC

flink git commit: [FLINK-7327] [futures] Replace Flink's future with Java 8's CompletableFuture in StreamRecordQueueEntry

Repository: flink
Updated Branches:
  refs/heads/master fcac882d2 -> f41eb4b1e


[FLINK-7327] [futures] Replace Flink's future with Java 8's CompletableFuture in StreamRecordQueueEntry

This closes #4442.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f41eb4b1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f41eb4b1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f41eb4b1

Branch: refs/heads/master
Commit: f41eb4b1ea0112cce4b4edb4a25037fafa2aac23
Parents: fcac882
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 31 21:31:26 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Aug 1 14:04:43 2017 +0200

----------------------------------------------------------------------
 .../api/operators/async/AsyncWaitOperator.java  | 10 ++++----
 .../async/queue/OrderedStreamElementQueue.java  | 10 ++++----
 .../async/queue/StreamElementQueueEntry.java    | 24 ++++++++------------
 .../async/queue/StreamRecordQueueEntry.java     |  8 +++----
 .../queue/UnorderedStreamElementQueue.java      | 10 ++++----
 .../async/queue/WatermarkQueueEntry.java        | 10 ++++----
 6 files changed, 29 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f41eb4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index 56c199d..a0f626e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.datastream.AsyncDataStream;
@@ -217,12 +216,11 @@ public class AsyncWaitOperator<IN, OUT>
 
 			// Cancel the timer once we've completed the stream record buffer entry. This will remove
 			// the register trigger task
-			streamRecordBufferEntry.onComplete(new AcceptFunction<StreamElementQueueEntry<Collection<OUT>>>() {
-				@Override
-				public void accept(StreamElementQueueEntry<Collection<OUT>> value) {
+			streamRecordBufferEntry.onComplete(
+				(StreamElementQueueEntry<Collection<OUT>> value) -> {
 					timerFuture.cancel(true);
-				}
-			}, executor);
+				},
+				executor);
 		}
 
 		addAsyncBufferEntry(streamRecordBufferEntry);

http://git-wip-us.apache.org/repos/asf/flink/blob/f41eb4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
index e573fc1..5133809 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.streaming.api.operators.async.OperatorActions;
 import org.apache.flink.util.Preconditions;
 
@@ -193,9 +192,8 @@ public class OrderedStreamElementQueue implements StreamElementQueue {
 
 		queue.addLast(streamElementQueueEntry);
 
-		streamElementQueueEntry.onComplete(new AcceptFunction<StreamElementQueueEntry<T>>() {
-			@Override
-			public void accept(StreamElementQueueEntry<T> value) {
+		streamElementQueueEntry.onComplete(
+			(StreamElementQueueEntry<T> value) -> {
 				try {
 					onCompleteHandler(value);
 				} catch (InterruptedException e) {
@@ -206,8 +204,8 @@ public class OrderedStreamElementQueue implements StreamElementQueue {
 					operatorActions.failOperator(new Exception("Could not complete the " +
 						"stream element queue entry: " + value + '.', t));
 				}
-			}
-		}, executor);
+			},
+			executor);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/f41eb4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
index 67b1f0f..c59e012 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
@@ -19,13 +19,12 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.BiFunction;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.util.Preconditions;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.function.Consumer;
 
 /**
  * Entry class for the {@link StreamElementQueue}. The stream element queue entry stores the
@@ -63,23 +62,18 @@ public abstract class StreamElementQueueEntry<T> implements AsyncResult {
 	 * @param executor to run the complete function
 	 */
 	public void onComplete(
-			final AcceptFunction<StreamElementQueueEntry<T>> completeFunction,
+			final Consumer<StreamElementQueueEntry<T>> completeFunction,
 			Executor executor) {
 		final StreamElementQueueEntry<T> thisReference = this;
 
-		getFuture().handleAsync(new BiFunction<T, Throwable, Void>() {
-			@Override
-			public Void apply(T t, Throwable throwable) {
-				// call the complete function for normal completion as well as exceptional completion
-				// see FLINK-6435
-				completeFunction.accept(thisReference);
-
-				return null;
-			}
-		}, executor);
+		getFuture().whenCompleteAsync(
+			// call the complete function for normal completion as well as exceptional completion
+			// see FLINK-6435
+			(value, throwable) -> completeFunction.accept(thisReference),
+			executor);
 	}
 
-	protected abstract Future<T> getFuture();
+	protected abstract CompletableFuture<T> getFuture();
 
 	@Override
 	public final boolean isWatermark() {

http://git-wip-us.apache.org/repos/asf/flink/blob/f41eb4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
index 708bf17..2aca10e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
@@ -19,14 +19,12 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.streaming.api.functions.async.AsyncFunction;
 import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * {@link StreamElementQueueEntry} implementation for {@link StreamRecord}. This class also acts
@@ -52,7 +50,7 @@ public class StreamRecordQueueEntry<OUT> extends StreamElementQueueEntry<Collect
 		hasTimestamp = streamRecord.hasTimestamp();
 		timestamp = streamRecord.getTimestamp();
 
-		resultFuture = new FlinkCompletableFuture<>();
+		resultFuture = new CompletableFuture<>();
 	}
 
 	@Override
@@ -71,7 +69,7 @@ public class StreamRecordQueueEntry<OUT> extends StreamElementQueueEntry<Collect
 	}
 
 	@Override
-	protected Future<Collection<OUT>> getFuture() {
+	protected CompletableFuture<Collection<OUT>> getFuture() {
 		return resultFuture;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f41eb4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
index e6f71bf..e2c3426 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.streaming.api.operators.async.OperatorActions;
 import org.apache.flink.util.Preconditions;
 
@@ -285,9 +284,8 @@ public class UnorderedStreamElementQueue implements StreamElementQueue {
 			lastSet.add(streamElementQueueEntry);
 		}
 
-		streamElementQueueEntry.onComplete(new AcceptFunction<StreamElementQueueEntry<T>>() {
-			@Override
-			public void accept(StreamElementQueueEntry<T> value) {
+		streamElementQueueEntry.onComplete(
+			(StreamElementQueueEntry<T> value) -> {
 				try {
 					onCompleteHandler(value);
 				} catch (InterruptedException e) {
@@ -299,8 +297,8 @@ public class UnorderedStreamElementQueue implements StreamElementQueue {
 					operatorActions.failOperator(new Exception("Could not complete the " +
 						"stream element queue entry: " + value + '.', t));
 				}
-			}
-		}, executor);
+			},
+			executor);
 
 		numberEntries++;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f41eb4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
index 1f48303..c9c6e74 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
@@ -19,22 +19,22 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.streaming.api.watermark.Watermark;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * {@link StreamElementQueueEntry} implementation for the {@link Watermark}.
  */
 @Internal
 public class WatermarkQueueEntry extends StreamElementQueueEntry<Watermark> implements AsyncWatermarkResult {
 
-	private final Future<Watermark> future;
+	private final CompletableFuture<Watermark> future;
 
 	public WatermarkQueueEntry(Watermark watermark) {
 		super(watermark);
 
-		this.future = FlinkCompletableFuture.completed(watermark);
+		this.future = CompletableFuture.completedFuture(watermark);
 	}
 
 	@Override
@@ -43,7 +43,7 @@ public class WatermarkQueueEntry extends StreamElementQueueEntry<Watermark> impl
 	}
 
 	@Override
-	protected Future<Watermark> getFuture() {
+	protected CompletableFuture<Watermark> getFuture() {
 		return future;
 	}
 }