You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/22 13:05:40 UTC

flink git commit: [FLINK-5851] [streaming API] Rename AsyncCollector into ResultFuture

Repository: flink
Updated Branches:
  refs/heads/master 9077e5169 -> 40cec17f4


[FLINK-5851] [streaming API] Rename AsyncCollector into ResultFuture

Complete renaming AsyncCollector -> ResultFuture

This closes #4243.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40cec17f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40cec17f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40cec17f

Branch: refs/heads/master
Commit: 40cec17f4303b43bbf65d8be542f0646eada57e8
Parents: 9077e51
Author: zhangminglei <zm...@163.com>
Authored: Sun Jul 2 23:14:21 2017 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Aug 22 15:05:13 2017 +0200

----------------------------------------------------------------------
 docs/dev/stream/operators/asyncio.md            | 20 ++++----
 .../examples/async/AsyncIOExample.java          | 10 ++--
 .../scala/examples/async/AsyncIOExample.scala   |  6 +--
 .../api/functions/async/AsyncFunction.java      | 21 ++++----
 .../api/functions/async/ResultFuture.java       | 50 ++++++++++++++++++++
 .../api/functions/async/RichAsyncFunction.java  |  3 +-
 .../async/collector/AsyncCollector.java         | 50 --------------------
 .../api/operators/async/AsyncWaitOperator.java  |  6 +--
 .../streaming/api/operators/async/Emitter.java  |  2 +-
 .../async/queue/StreamRecordQueueEntry.java     | 10 ++--
 .../functions/async/RichAsyncFunctionTest.java  |  5 +-
 .../operators/async/AsyncWaitOperatorTest.java  | 22 ++++-----
 .../api/operators/async/EmitterTest.java        | 14 +++---
 .../queue/OrderedStreamElementQueueTest.java    |  6 +--
 .../async/queue/StreamElementQueueTest.java     |  6 +--
 .../queue/UnorderedStreamElementQueueTest.java  | 12 ++---
 .../streaming/api/scala/AsyncDataStream.scala   | 30 ++++++------
 .../api/scala/async/AsyncCollector.scala        | 50 --------------------
 .../api/scala/async/AsyncFunction.scala         | 10 ++--
 .../scala/async/JavaAsyncCollectorWrapper.scala | 43 -----------------
 .../scala/async/JavaResultFutureWrapper.scala   | 44 +++++++++++++++++
 .../api/scala/async/ResultFuture.scala          | 50 ++++++++++++++++++++
 .../streaming/api/StreamingOperatorsITCase.java |  7 ++-
 23 files changed, 237 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/docs/dev/stream/operators/asyncio.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/asyncio.md b/docs/dev/stream/operators/asyncio.md
index 1ea0792..c5bafa1 100644
--- a/docs/dev/stream/operators/asyncio.md
+++ b/docs/dev/stream/operators/asyncio.md
@@ -74,7 +74,7 @@ Assuming one has an asynchronous client for the target database, three parts are
 with asynchronous I/O against the database:
 
   - An implementation of `AsyncFunction` that dispatches the requests
-  - A *callback* that takes the result of the operation and hands it to the `AsyncCollector`
+  - A *callback* that takes the result of the operation and hands it to the `ResultFuture`
   - Applying the async I/O operation on a DataStream as a transformation
 
 The following code example illustrates the basic pattern:
@@ -104,16 +104,16 @@ class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, Stri
     }
 
     @Override
-    public void asyncInvoke(final String str, final AsyncCollector<Tuple2<String, String>> asyncCollector) throws Exception {
+    public void asyncInvoke(final String str, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
 
         // issue the asynchronous request, receive a future for result
         Future<String> resultFuture = client.query(str);
 
         // set the callback to be executed once the request by the client is complete
-        // the callback simply forwards the result to the collector
+        // the callback simply forwards the result to the result future
         resultFuture.thenAccept( (String result) -> {
 
-            asyncCollector.collect(Collections.singleton(new Tuple2<>(str, result)));
+            resultFuture.complete(Collections.singleton(new Tuple2<>(str, result)));
          
         });
     }
@@ -142,15 +142,15 @@ class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {
     implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
 
 
-    override def asyncInvoke(str: String, asyncCollector: AsyncCollector[(String, String)]): Unit = {
+    override def asyncInvoke(str: String, resultFutre: ResultFuture[(String, String)]): Unit = {
 
         // issue the asynchronous request, receive a future for the result
         val resultFuture: Future[String] = client.query(str)
 
         // set the callback to be executed once the request by the client is complete
-        // the callback simply forwards the result to the collector
+        // the callback simply forwards the result to the result future
         resultFuture.onSuccess {
-            case result: String => asyncCollector.collect(Iterable((str, result)));
+            case result: String => resultFuture.complete(Iterable((str, result)));
         }
     }
 }
@@ -166,8 +166,8 @@ val resultStream: DataStream[(String, String)] =
 </div>
 </div>
 
-**Important note**: The `AsyncCollector` is completed with the first call of `AsyncCollector.collect`.
-All subsequent `collect` calls will be ignored.
+**Important note**: The `ResultFuture` is completed with the first call of `ResultFuture.complete`.
+All subsequent `complete` calls will be ignored.
 
 The following two parameters control the asynchronous operations:
 
@@ -229,7 +229,7 @@ asynchronous requests in checkpoints and restores/re-triggers the requests when
 
 For implementations with *Futures* that have an *Executor* (or *ExecutionContext* in Scala) for callbacks, we suggets to use a `DirectExecutor`, because the
 callback typically does minimal work, and a `DirectExecutor` avoids an additional thread-to-thread handover overhead. The callback typically only hands
-the result to the `AsyncCollector`, which adds it to the output buffer. From there, the heavy logic that includes record emission and interaction
+the result to the `ResultFuture`, which adds it to the output buffer. From there, the heavy logic that includes record emission and interaction
 with the checkpoint bookkeepting happens in a dedicated thread-pool anyways.
 
 A `DirectExecutor` can be obtained via `org.apache.flink.runtime.concurrent.Executors.directExecutor()` or

http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
index 748cb82..95379e3 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
@@ -29,8 +29,8 @@ import org.apache.flink.streaming.api.datastream.AsyncDataStream;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
 import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.util.Collector;
 
@@ -178,7 +178,7 @@ public class AsyncIOExample {
 		}
 
 		@Override
-		public void asyncInvoke(final Integer input, final AsyncCollector<String> collector) throws Exception {
+		public void asyncInvoke(final Integer input, final ResultFuture<String> resultFuture) throws Exception {
 			this.executorService.submit(new Runnable() {
 				@Override
 				public void run() {
@@ -188,13 +188,13 @@ public class AsyncIOExample {
 						Thread.sleep(sleep);
 
 						if (random.nextFloat() < failRatio) {
-							collector.collect(new Exception("wahahahaha..."));
+							resultFuture.completeExceptionally(new Exception("wahahahaha..."));
 						} else {
-							collector.collect(
+							resultFuture.complete(
 								Collections.singletonList("key-" + (input % 10)));
 						}
 					} catch (InterruptedException e) {
-						collector.collect(new ArrayList<String>(0));
+						resultFuture.complete(new ArrayList<String>(0));
 					}
 				}
 			});

http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala
index 69c4c0a..5808aaa 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.scala.async.AsyncCollector
+import org.apache.flink.streaming.api.scala.async.ResultFuture
 
 import scala.concurrent.{ExecutionContext, Future}
 
@@ -38,9 +38,9 @@ object AsyncIOExample {
     val input = env.addSource(new SimpleSource())
 
     val asyncMapped = AsyncDataStream.orderedWait(input, timeout, TimeUnit.MILLISECONDS, 10) {
-      (input, collector: AsyncCollector[Int]) =>
+      (input, collector: ResultFuture[Int]) =>
         Future {
-          collector.collect(Seq(input))
+          collector.complete(Seq(input))
         } (ExecutionContext.global)
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
index 5bb4459..2ac218d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.functions.async;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
 
 import java.io.Serializable;
 
@@ -28,21 +27,21 @@ import java.io.Serializable;
  * A function to trigger Async I/O operation.
  *
  * <p>For each #asyncInvoke, an async io operation can be triggered, and once it has been done,
- * the result can be collected by calling {@link AsyncCollector#collect}. For each async
+ * the result can be collected by calling {@link ResultFuture#complete}. For each async
  * operation, its context is stored in the operator immediately after invoking
  * #asyncInvoke, avoiding blocking for each stream input as long as the internal buffer is not full.
  *
- * <p>{@link AsyncCollector} can be passed into callbacks or futures to collect the result data.
+ * <p>{@link ResultFuture} can be passed into callbacks or futures to collect the result data.
  * An error can also be propagate to the async IO operator by
- * {@link AsyncCollector#collect(Throwable)}.
+ * {@link ResultFuture#completeExceptionally(Throwable)}.
  *
  * <p>Callback example usage:
  *
  * <pre>{@code
  * public class HBaseAsyncFunc implements AsyncFunction<String, String> {
  *
- *   public void asyncInvoke(String row, AsyncCollector<String> collector) throws Exception {
- *     HBaseCallback cb = new HBaseCallback(collector);
+ *   public void asyncInvoke(String row, ResultFuture<String> result) throws Exception {
+ *     HBaseCallback cb = new HBaseCallback(result);
  *     Get get = new Get(Bytes.toBytes(row));
  *     hbase.asyncGet(get, cb);
  *   }
@@ -54,16 +53,16 @@ import java.io.Serializable;
  * <pre>{@code
  * public class HBaseAsyncFunc implements AsyncFunction<String, String> {
  *
- *   public void asyncInvoke(String row, final AsyncCollector<String> collector) throws Exception {
+ *   public void asyncInvoke(String row, final ResultFuture<String> result) throws Exception {
  *     Get get = new Get(Bytes.toBytes(row));
  *     ListenableFuture<Result> future = hbase.asyncGet(get);
  *     Futures.addCallback(future, new FutureCallback<Result>() {
  *       public void onSuccess(Result result) {
  *         List<String> ret = process(result);
- *         collector.collect(ret);
+ *         result.complete(ret);
  *       }
  *       public void onFailure(Throwable thrown) {
- *         collector.collect(thrown);
+ *         result.completeExceptionally(thrown);
  *       }
  *     });
  *   }
@@ -80,9 +79,9 @@ public interface AsyncFunction<IN, OUT> extends Function, Serializable {
 	 * Trigger async operation for each stream input.
 	 *
 	 * @param input element coming from an upstream task
-	 * @param collector to collect the result data
+	 * @param resultFuture to be completed with the result data
 	 * @exception Exception in case of a user code error. An exception will make the task fail and
 	 * trigger fail-over process.
 	 */
-	void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception;
+	void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/ResultFuture.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/ResultFuture.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/ResultFuture.java
new file mode 100644
index 0000000..934341e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/ResultFuture.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Collection;
+
+/**
+ * {@link ResultFuture} collects data / error in user codes while processing async i/o.
+ *
+ * @param <OUT> Output type
+ */
+@PublicEvolving
+public interface ResultFuture<OUT> {
+	/**
+	 * Completes the result future with a collection of result objects.
+	 *
+	 * <p>Note that it should be called for exactly one time in the user code.
+	 * Calling this function for multiple times will cause data lose.
+	 *
+	 * <p>Put all results in a {@link Collection} and then emit output.
+	 *
+	 * @param result A list of results.
+	 */
+	void complete(Collection<OUT> result);
+
+	/**
+	 * Completes the result future exceptionally with an exception.
+	 *
+	 * @param error A Throwable object.
+	 */
+	void completeExceptionally(Throwable error);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
index 84f9e53..b6ce862 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
@@ -43,7 +43,6 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Preconditions;
 
@@ -85,7 +84,7 @@ public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction im
 	}
 
 	@Override
-	public abstract void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception;
+	public abstract void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;
 
 	// -----------------------------------------------------------------------------------------
 	// Wrapper classes

http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
deleted file mode 100644
index 964c13a..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.async.collector;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-import java.util.Collection;
-
-/**
- * {@link AsyncCollector} collects data / error in user codes while processing async i/o.
- *
- * @param <OUT> Output type
- */
-@PublicEvolving
-public interface AsyncCollector<OUT> {
-	/**
-	 * Set result.
-	 *
-	 * <p>Note that it should be called for exactly one time in the user code.
-	 * Calling this function for multiple times will cause data lose.
-	 *
-	 * <p>Put all results in a {@link Collection} and then emit output.
-	 *
-	 * @param result A list of results.
-	 */
-	void collect(Collection<OUT> result);
-
-	/**
-	 * Set error.
-	 *
-	 * @param error A Throwable object.
-	 */
-	void collect(Throwable error);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index a0f626e..3dfa8aa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.datastream.AsyncDataStream;
 import org.apache.flink.streaming.api.datastream.AsyncDataStream.OutputMode;
 import org.apache.flink.streaming.api.functions.async.AsyncFunction;
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -57,7 +57,7 @@ import java.util.concurrent.TimeoutException;
 
 /**
  * The {@link AsyncWaitOperator} allows to asynchronously process incoming stream records. For that
- * the operator creates an {@link AsyncCollector} which is passed to an {@link AsyncFunction}.
+ * the operator creates an {@link ResultFuture} which is passed to an {@link AsyncFunction}.
  * Within the async function, the user can complete the async collector arbitrarily. Once the async
  * collector has been completed, the result is emitted by the operator's emitter to downstream
  * operators.
@@ -209,7 +209,7 @@ public class AsyncWaitOperator<IN, OUT>
 				new ProcessingTimeCallback() {
 					@Override
 					public void onProcessingTime(long timestamp) throws Exception {
-						streamRecordBufferEntry.collect(
+						streamRecordBufferEntry.completeExceptionally(
 							new TimeoutException("Async function call has timed out."));
 					}
 				});

http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
index 2204109..0a1a2db 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
@@ -68,7 +68,7 @@ public class Emitter<OUT> implements Runnable {
 
 		this.checkpointLock = Preconditions.checkNotNull(checkpointLock, "checkpointLock");
 		this.output = Preconditions.checkNotNull(output, "output");
-		this.streamElementQueue = Preconditions.checkNotNull(streamElementQueue, "asyncCollectorBuffer");
+		this.streamElementQueue = Preconditions.checkNotNull(streamElementQueue, "streamElementQueue");
 		this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");
 
 		this.timestampedCollector = new TimestampedCollector<>(this.output);

http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
index 2aca10e..796b44f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.functions.async.AsyncFunction;
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.Collection;
@@ -28,14 +28,14 @@ import java.util.concurrent.CompletableFuture;
 
 /**
  * {@link StreamElementQueueEntry} implementation for {@link StreamRecord}. This class also acts
- * as the {@link AsyncCollector} implementation which is given to the {@link AsyncFunction}. The
+ * as the {@link ResultFuture} implementation which is given to the {@link AsyncFunction}. The
  * async function completes this class with a collection of results.
  *
  * @param <OUT> Type of the asynchronous collection result
  */
 @Internal
 public class StreamRecordQueueEntry<OUT> extends StreamElementQueueEntry<Collection<OUT>>
-	implements AsyncCollectionResult<OUT>, AsyncCollector<OUT> {
+	implements AsyncCollectionResult<OUT>, ResultFuture<OUT> {
 
 	/** Timestamp information. */
 	private final boolean hasTimestamp;
@@ -74,12 +74,12 @@ public class StreamRecordQueueEntry<OUT> extends StreamElementQueueEntry<Collect
 	}
 
 	@Override
-	public void collect(Collection<OUT> result) {
+	public void complete(Collection<OUT> result) {
 		resultFuture.complete(result);
 	}
 
 	@Override
-	public void collect(Throwable error) {
+	public void completeExceptionally(Throwable error) {
 		resultFuture.completeExceptionally(error);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
index 224b376..aba37df 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
 
 import org.junit.Test;
 
@@ -55,7 +54,7 @@ public class RichAsyncFunctionTest {
 			private static final long serialVersionUID = -2023923961609455894L;
 
 			@Override
-			public void asyncInvoke(Integer input, AsyncCollector<Integer> collector) throws Exception {
+			public void asyncInvoke(Integer input, ResultFuture<Integer> resultFuture) throws Exception {
 				// no op
 			}
 		};
@@ -94,7 +93,7 @@ public class RichAsyncFunctionTest {
 			private static final long serialVersionUID = 1707630162838967972L;
 
 			@Override
-			public void asyncInvoke(Integer input, AsyncCollector<Integer> collector) throws Exception {
+			public void asyncInvoke(Integer input, ResultFuture<Integer> resultFuture) throws Exception {
 				// no op
 			}
 		};

http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 1dd99fe..a3cb630 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -44,8 +44,8 @@ import org.apache.flink.streaming.api.datastream.AsyncDataStream;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
 import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
@@ -166,11 +166,11 @@ public class AsyncWaitOperatorTest extends TestLogger {
 		}
 
 		@Override
-		public void asyncInvoke(final Integer input, final AsyncCollector<Integer> collector) throws Exception {
+		public void asyncInvoke(final Integer input, final ResultFuture<Integer> resultFuture) throws Exception {
 			executorService.submit(new Runnable() {
 				@Override
 				public void run() {
-					collector.collect(Collections.singletonList(input * 2));
+					resultFuture.complete(Collections.singletonList(input * 2));
 				}
 			});
 		}
@@ -178,7 +178,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 
 	/**
 	 * A special {@link AsyncFunction} without issuing
-	 * {@link AsyncCollector#collect} until the latch counts to zero.
+	 * {@link ResultFuture#complete} until the latch counts to zero.
 	 * This function is used in the testStateSnapshotAndRestore, ensuring
 	 * that {@link StreamElementQueueEntry} can stay
 	 * in the {@link StreamElementQueue} to be
@@ -194,7 +194,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 		}
 
 		@Override
-		public void asyncInvoke(final Integer input, final AsyncCollector<Integer> collector) throws Exception {
+		public void asyncInvoke(final Integer input, final ResultFuture<Integer> resultFuture) throws Exception {
 			this.executorService.submit(new Runnable() {
 				@Override
 				public void run() {
@@ -205,7 +205,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 						// do nothing
 					}
 
-					collector.collect(Collections.singletonList(input));
+					resultFuture.complete(Collections.singletonList(input));
 				}
 			});
 		}
@@ -854,8 +854,8 @@ public class AsyncWaitOperatorTest extends TestLogger {
 				private static final long serialVersionUID = -3718276118074877073L;
 
 				@Override
-				public void asyncInvoke(Integer input, AsyncCollector<Integer> collector) throws Exception {
-					collector.collect(Collections.singletonList(input));
+				public void asyncInvoke(Integer input, ResultFuture<Integer> resultFuture) throws Exception {
+					resultFuture.complete(Collections.singletonList(input));
 				}
 			},
 			timeout,
@@ -949,8 +949,8 @@ public class AsyncWaitOperatorTest extends TestLogger {
 		private static final long serialVersionUID = 6326568632967110990L;
 
 		@Override
-		public void asyncInvoke(Integer input, AsyncCollector<Integer> collector) throws Exception {
-			collector.collect(new Exception("Test exception"));
+		public void asyncInvoke(Integer input, ResultFuture<Integer> resultFuture) throws Exception {
+			resultFuture.completeExceptionally(new Exception("Test exception"));
 		}
 	}
 
@@ -1012,7 +1012,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 		private static final long serialVersionUID = -3060481953330480694L;
 
 		@Override
-		public void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception {
+		public void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception {
 			// no op
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java
index da2d795..7dedd14 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.operators.async;
 
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue;
 import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
@@ -115,9 +115,9 @@ public class EmitterTest extends TestLogger {
 			queue.put(watermark1);
 			queue.put(record3);
 
-			record2.collect(Arrays.asList(3, 4));
-			record1.collect(Arrays.asList(1, 2));
-			record3.collect(Arrays.asList(5, 6));
+			record2.complete(Arrays.asList(3, 4));
+			record1.complete(Arrays.asList(1, 2));
+			record3.complete(Arrays.asList(5, 6));
 
 			synchronized (lock) {
 				while (!queue.isEmpty()) {
@@ -133,7 +133,7 @@ public class EmitterTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that the emitter handles exceptions occurring in the {@link AsyncCollector} correctly.
+	 * Tests that the emitter handles exceptions occurring in the {@link ResultFuture} correctly.
 	 */
 	@Test
 	public void testEmitterWithExceptions() throws Exception {
@@ -167,8 +167,8 @@ public class EmitterTest extends TestLogger {
 			queue.put(record2);
 			queue.put(watermark1);
 
-			record2.collect(testException);
-			record1.collect(Arrays.asList(1));
+			record2.completeExceptionally(testException);
+			record1.complete(Arrays.asList(1));
 
 			synchronized (lock) {
 				while (!queue.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
index f3b68c4..c7b811a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
@@ -110,15 +110,15 @@ public class OrderedStreamElementQueueTest extends TestLogger {
 
 		Assert.assertFalse(pollOperation.isDone());
 
-		entry2.collect(Collections.<Integer>emptyList());
+		entry2.complete(Collections.<Integer>emptyList());
 
-		entry4.collect(Collections.<Integer>emptyList());
+		entry4.complete(Collections.<Integer>emptyList());
 
 		Thread.sleep(10L);
 
 		Assert.assertEquals(4, queue.size());
 
-		entry1.collect(Collections.<Integer>emptyList());
+		entry1.complete(Collections.<Integer>emptyList());
 
 		Assert.assertEquals(expected, pollOperation.get());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
index d396756..7315f65 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
@@ -150,7 +150,7 @@ public class StreamElementQueueTest extends TestLogger {
 		Assert.assertEquals(watermarkQueueEntry, queue.poll());
 		Assert.assertEquals(1, queue.size());
 
-		streamRecordQueueEntry.collect(Collections.<Integer>emptyList());
+		streamRecordQueueEntry.complete(Collections.<Integer>emptyList());
 
 		Assert.assertEquals(streamRecordQueueEntry, queue.poll());
 
@@ -191,7 +191,7 @@ public class StreamElementQueueTest extends TestLogger {
 		// but it shouldn't ;-)
 		Assert.assertFalse(putOperation.isDone());
 
-		streamRecordQueueEntry.collect(Collections.<Integer>emptyList());
+		streamRecordQueueEntry.complete(Collections.<Integer>emptyList());
 
 		// polling the completed head element frees the queue again
 		Assert.assertEquals(streamRecordQueueEntry, queue.poll());
@@ -259,7 +259,7 @@ public class StreamElementQueueTest extends TestLogger {
 
 		Assert.assertFalse(pollOperation.isDone());
 
-		streamRecordQueueEntry.collect(Collections.<Integer>emptyList());
+		streamRecordQueueEntry.complete(Collections.<Integer>emptyList());
 
 		Assert.assertEquals(streamRecordQueueEntry, pollOperation.get());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
index cc0bc30..acc6b8e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
@@ -110,13 +110,13 @@ public class UnorderedStreamElementQueueTest extends TestLogger {
 			executor);
 
 		// this should not fulfill the poll, because R3 is behind W1
-		record3.collect(Collections.<Integer>emptyList());
+		record3.complete(Collections.<Integer>emptyList());
 
 		Thread.sleep(10L);
 
 		Assert.assertFalse(firstPoll.isDone());
 
-		record2.collect(Collections.<Integer>emptyList());
+		record2.complete(Collections.<Integer>emptyList());
 
 		Assert.assertEquals(record2, firstPoll.get());
 
@@ -130,15 +130,15 @@ public class UnorderedStreamElementQueueTest extends TestLogger {
 			},
 			executor);
 
-		record6.collect(Collections.<Integer>emptyList());
-		record4.collect(Collections.<Integer>emptyList());
+		record6.complete(Collections.<Integer>emptyList());
+		record4.complete(Collections.<Integer>emptyList());
 
 		Thread.sleep(10L);
 
 		// The future should not be completed because R1 has not been completed yet
 		Assert.assertFalse(secondPoll.isDone());
 
-		record1.collect(Collections.<Integer>emptyList());
+		record1.complete(Collections.<Integer>emptyList());
 
 		Assert.assertEquals(record1, secondPoll.get());
 
@@ -180,7 +180,7 @@ public class UnorderedStreamElementQueueTest extends TestLogger {
 
 		Assert.assertFalse(thirdPoll.isDone());
 
-		record5.collect(Collections.<Integer>emptyList());
+		record5.complete(Collections.<Integer>emptyList());
 
 		Assert.assertEquals(record5, thirdPoll.get());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
index 67af484..e91922a 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
@@ -21,9 +21,9 @@ package org.apache.flink.streaming.api.scala
 import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.{AsyncDataStream => JavaAsyncDataStream}
-import org.apache.flink.streaming.api.functions.async.collector.{AsyncCollector => JavaAsyncCollector}
+import org.apache.flink.streaming.api.functions.async.{ResultFuture => JavaResultFuture}
 import org.apache.flink.streaming.api.functions.async.{AsyncFunction => JavaAsyncFunction}
-import org.apache.flink.streaming.api.scala.async.{AsyncCollector, AsyncFunction, JavaAsyncCollectorWrapper}
+import org.apache.flink.streaming.api.scala.async.{AsyncFunction, JavaResultFutureWrapper, ResultFuture}
 import org.apache.flink.util.Preconditions
 
 import scala.concurrent.duration.TimeUnit
@@ -34,7 +34,7 @@ import scala.concurrent.duration.TimeUnit
   * Example:
   * {{{
   *   val input: DataStream[String] = ...
-  *   val asyncFunction: (String, AsyncCollector[String]) => Unit = ...
+  *   val asyncFunction: (String, ResultFuture[String]) => Unit = ...
   *
   *   AsyncDataStream.orderedWait(input, asyncFunction, timeout, TimeUnit.MILLISECONDS, 100)
   * }}}
@@ -68,8 +68,8 @@ object AsyncDataStream {
     : DataStream[OUT] = {
 
     val javaAsyncFunction = new JavaAsyncFunction[IN, OUT] {
-      override def asyncInvoke(input: IN, collector: JavaAsyncCollector[OUT]): Unit = {
-        asyncFunction.asyncInvoke(input, new JavaAsyncCollectorWrapper(collector))
+      override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
+        asyncFunction.asyncInvoke(input, new JavaResultFutureWrapper(resultFuture))
       }
     }
 
@@ -126,7 +126,7 @@ object AsyncDataStream {
       timeout: Long,
       timeUnit: TimeUnit,
       capacity: Int) (
-      asyncFunction: (IN, AsyncCollector[OUT]) => Unit)
+      asyncFunction: (IN, ResultFuture[OUT]) => Unit)
     : DataStream[OUT] = {
 
     Preconditions.checkNotNull(asyncFunction)
@@ -134,9 +134,9 @@ object AsyncDataStream {
     val cleanAsyncFunction = input.executionEnvironment.scalaClean(asyncFunction)
 
     val func = new JavaAsyncFunction[IN, OUT] {
-      override def asyncInvoke(input: IN, collector: JavaAsyncCollector[OUT]): Unit = {
+      override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
 
-        cleanAsyncFunction(input, new JavaAsyncCollectorWrapper[OUT](collector))
+        cleanAsyncFunction(input, new JavaResultFutureWrapper[OUT](resultFuture))
       }
     }
 
@@ -167,7 +167,7 @@ object AsyncDataStream {
     input: DataStream[IN],
     timeout: Long,
     timeUnit: TimeUnit) (
-    asyncFunction: (IN, AsyncCollector[OUT]) => Unit)
+    asyncFunction: (IN, ResultFuture[OUT]) => Unit)
   : DataStream[OUT] = {
     unorderedWait(input, timeout, timeUnit, DEFAULT_QUEUE_CAPACITY)(asyncFunction)
   }
@@ -195,8 +195,8 @@ object AsyncDataStream {
     : DataStream[OUT] = {
 
     val javaAsyncFunction = new JavaAsyncFunction[IN, OUT] {
-      override def asyncInvoke(input: IN, collector: JavaAsyncCollector[OUT]): Unit = {
-        asyncFunction.asyncInvoke(input, new JavaAsyncCollectorWrapper[OUT](collector))
+      override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
+        asyncFunction.asyncInvoke(input, new JavaResultFutureWrapper[OUT](resultFuture))
       }
     }
 
@@ -251,7 +251,7 @@ object AsyncDataStream {
     timeout: Long,
     timeUnit: TimeUnit,
     capacity: Int) (
-    asyncFunction: (IN, AsyncCollector[OUT]) => Unit)
+    asyncFunction: (IN, ResultFuture[OUT]) => Unit)
   : DataStream[OUT] = {
 
     Preconditions.checkNotNull(asyncFunction)
@@ -259,8 +259,8 @@ object AsyncDataStream {
     val cleanAsyncFunction = input.executionEnvironment.scalaClean(asyncFunction)
 
     val func = new JavaAsyncFunction[IN, OUT] {
-      override def asyncInvoke(input: IN, collector: JavaAsyncCollector[OUT]): Unit = {
-        cleanAsyncFunction(input, new JavaAsyncCollectorWrapper[OUT](collector))
+      override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
+        cleanAsyncFunction(input, new JavaResultFutureWrapper[OUT](resultFuture))
       }
     }
 
@@ -290,7 +290,7 @@ object AsyncDataStream {
     input: DataStream[IN],
     timeout: Long,
     timeUnit: TimeUnit) (
-    asyncFunction: (IN, AsyncCollector[OUT]) => Unit)
+    asyncFunction: (IN, ResultFuture[OUT]) => Unit)
   : DataStream[OUT] = {
 
     orderedWait(input, timeout, timeUnit, DEFAULT_QUEUE_CAPACITY)(asyncFunction)

http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncCollector.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncCollector.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncCollector.scala
deleted file mode 100644
index a149c88..0000000
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncCollector.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala.async
-
-import org.apache.flink.annotation.PublicEvolving
-
-/**
-  * The async collector collects data/errors from the user code while processing
-  * asynchronous I/O operations.
-  *
-  * @tparam OUT type of the output element
-  */
-@PublicEvolving
-trait AsyncCollector[OUT] {
-
-  /**
-    * Complete the async collector with a set of result elements.
-    *
-    * Note that it should be called for exactly one time in the user code.
-    * Calling this function for multiple times will cause data lose.
-    *
-    * Put all results in a [[Iterable]] and then issue AsyncCollector.collect(Iterable).
-    *
-    * @param result to complete the async collector with
-    */
-  def collect(result: Iterable[OUT])
-
-  /**
-    * Complete this async collector with an error.
-    *
-    * @param throwable to complete the async collector with
-    */
-  def collect(throwable: Throwable)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala
index 72e3702..aea6b57 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala
@@ -24,13 +24,13 @@ import org.apache.flink.annotation.PublicEvolving
   * A function to trigger async I/O operations.
   *
   * For each asyncInvoke an async io operation can be triggered, and once it has been done,
-  * the result can be collected by calling AsyncCollector.collect. For each async operation, its
+  * the result can be collected by calling ResultFuture.complete. For each async operation, its
   * context is stored in the operator immediately after invoking asyncInvoke, avoiding blocking for
   * each stream input as long as the internal buffer is not full.
   *
-  * [[AsyncCollector]] can be passed into callbacks or futures to collect the result data.
+  * [[ResultFuture]] can be passed into callbacks or futures to collect the result data.
   * An error can also be propagate to the async IO operator by
-  * [[AsyncCollector.collect(Throwable)]].
+  * [[ResultFuture.completeExceptionally(Throwable)]].
   *
   * @tparam IN The type of the input element
   * @tparam OUT The type of the output elements
@@ -42,7 +42,7 @@ trait AsyncFunction[IN, OUT] {
     * Trigger the async operation for each stream input
     *
     * @param input element coming from an upstream task
-    * @param collector to collect the result data
+    * @param resultFuture to be completed with the result data
     */
-  def asyncInvoke(input: IN, collector: AsyncCollector[OUT]): Unit
+  def asyncInvoke(input: IN, resultFuture: ResultFuture[OUT]): Unit
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/JavaAsyncCollectorWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/JavaAsyncCollectorWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/JavaAsyncCollectorWrapper.scala
deleted file mode 100644
index 3c5e95a..0000000
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/JavaAsyncCollectorWrapper.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala.async
-
-import org.apache.flink.annotation.Internal
-import org.apache.flink.streaming.api.functions.async.collector.{AsyncCollector => JavaAsyncCollector}
-
-import scala.collection.JavaConverters._
-
-/**
-  * Internal wrapper class to map a Flink's Java API [[JavaAsyncCollector]] to a Scala
-  * [[AsyncCollector]].
-  *
-  * @param javaAsyncCollector to forward the calls to
-  * @tparam OUT type of the output elements
-  */
-@Internal
-class JavaAsyncCollectorWrapper[OUT](val javaAsyncCollector: JavaAsyncCollector[OUT])
-  extends AsyncCollector[OUT] {
-  override def collect(result: Iterable[OUT]): Unit = {
-    javaAsyncCollector.collect(result.asJavaCollection)
-  }
-
-  override def collect(throwable: Throwable): Unit = {
-    javaAsyncCollector.collect(throwable)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/JavaResultFutureWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/JavaResultFutureWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/JavaResultFutureWrapper.scala
new file mode 100644
index 0000000..7680b89
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/JavaResultFutureWrapper.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.async
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.streaming.api.functions.async
+import org.apache.flink.streaming.api.functions.async.ResultFuture
+
+import scala.collection.JavaConverters._
+
+/**
+  * Internal wrapper class to map a Flink's Java API [[ResultFuture]] to a Scala
+  * [[ResultFuture]].
+  *
+  * @param javaResultFuture to forward the calls to
+  * @tparam OUT type of the output elements
+  */
+@Internal
+class JavaResultFutureWrapper[OUT](val javaResultFuture: async.ResultFuture[OUT])
+  extends ResultFuture[OUT] {
+  override def complete(result: Iterable[OUT]): Unit = {
+    javaResultFuture.complete(result.asJavaCollection)
+  }
+
+  override def completeExceptionally(throwable: Throwable): Unit = {
+    javaResultFuture.completeExceptionally(throwable)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/ResultFuture.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/ResultFuture.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/ResultFuture.scala
new file mode 100644
index 0000000..516e693
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/ResultFuture.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.async
+
+import org.apache.flink.annotation.PublicEvolving
+
+/**
+  * The result future collects data/errors from the user code while processing
+  * asynchronous I/O operations.
+  *
+  * @tparam OUT type of the output element
+  */
+@PublicEvolving
+trait ResultFuture[OUT] {
+
+  /**
+    * Complete the ResultFuture with a set of result elements.
+    *
+    * Note that it should be called for exactly one time in the user code.
+    * Calling this function for multiple times will cause data lose.
+    *
+    * Put all results in a [[Iterable]] and then issue ResultFuture.complete(Iterable).
+    *
+    * @param result to complete the async collector with
+    */
+  def complete(result: Iterable[OUT])
+
+  /**
+    * Complete this ResultFuture with an error.
+    *
+    * @param throwable to complete the async collector with
+    */
+  def completeExceptionally(throwable: Throwable)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
index 32a04fa..8a910d9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
@@ -30,12 +30,11 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SplitStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
 import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MathUtils;
 
@@ -243,11 +242,11 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 
 			@Override
 			public void asyncInvoke(final Tuple2<Integer, NonSerializable> input,
-									final AsyncCollector<Integer> collector) throws Exception {
+									final ResultFuture<Integer> resultFuture) throws Exception {
 				executorService.submit(new Runnable() {
 					@Override
 					public void run() {
-						collector.collect(Collections.singletonList(input.f0 + input.f0));
+						resultFuture.complete(Collections.singletonList(input.f0 + input.f0));
 					}
 				});
 			}