You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/22 13:05:40 UTC
flink git commit: [FLINK-5851] [streaming API] Rename AsyncCollector
into ResultFuture
Repository: flink
Updated Branches:
refs/heads/master 9077e5169 -> 40cec17f4
[FLINK-5851] [streaming API] Rename AsyncCollector into ResultFuture
Complete renaming AsyncCollector -> ResultFuture
This closes #4243.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40cec17f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40cec17f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40cec17f
Branch: refs/heads/master
Commit: 40cec17f4303b43bbf65d8be542f0646eada57e8
Parents: 9077e51
Author: zhangminglei <zm...@163.com>
Authored: Sun Jul 2 23:14:21 2017 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Aug 22 15:05:13 2017 +0200
----------------------------------------------------------------------
docs/dev/stream/operators/asyncio.md | 20 ++++----
.../examples/async/AsyncIOExample.java | 10 ++--
.../scala/examples/async/AsyncIOExample.scala | 6 +--
.../api/functions/async/AsyncFunction.java | 21 ++++----
.../api/functions/async/ResultFuture.java | 50 ++++++++++++++++++++
.../api/functions/async/RichAsyncFunction.java | 3 +-
.../async/collector/AsyncCollector.java | 50 --------------------
.../api/operators/async/AsyncWaitOperator.java | 6 +--
.../streaming/api/operators/async/Emitter.java | 2 +-
.../async/queue/StreamRecordQueueEntry.java | 10 ++--
.../functions/async/RichAsyncFunctionTest.java | 5 +-
.../operators/async/AsyncWaitOperatorTest.java | 22 ++++-----
.../api/operators/async/EmitterTest.java | 14 +++---
.../queue/OrderedStreamElementQueueTest.java | 6 +--
.../async/queue/StreamElementQueueTest.java | 6 +--
.../queue/UnorderedStreamElementQueueTest.java | 12 ++---
.../streaming/api/scala/AsyncDataStream.scala | 30 ++++++------
.../api/scala/async/AsyncCollector.scala | 50 --------------------
.../api/scala/async/AsyncFunction.scala | 10 ++--
.../scala/async/JavaAsyncCollectorWrapper.scala | 43 -----------------
.../scala/async/JavaResultFutureWrapper.scala | 44 +++++++++++++++++
.../api/scala/async/ResultFuture.scala | 50 ++++++++++++++++++++
.../streaming/api/StreamingOperatorsITCase.java | 7 ++-
23 files changed, 237 insertions(+), 240 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/docs/dev/stream/operators/asyncio.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/asyncio.md b/docs/dev/stream/operators/asyncio.md
index 1ea0792..c5bafa1 100644
--- a/docs/dev/stream/operators/asyncio.md
+++ b/docs/dev/stream/operators/asyncio.md
@@ -74,7 +74,7 @@ Assuming one has an asynchronous client for the target database, three parts are
with asynchronous I/O against the database:
- An implementation of `AsyncFunction` that dispatches the requests
- - A *callback* that takes the result of the operation and hands it to the `AsyncCollector`
+ - A *callback* that takes the result of the operation and hands it to the `ResultFuture`
- Applying the async I/O operation on a DataStream as a transformation
The following code example illustrates the basic pattern:
@@ -104,16 +104,16 @@ class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, Stri
}
@Override
- public void asyncInvoke(final String str, final AsyncCollector<Tuple2<String, String>> asyncCollector) throws Exception {
+ public void asyncInvoke(final String str, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
// issue the asynchronous request, receive a future for result
Future<String> resultFuture = client.query(str);
// set the callback to be executed once the request by the client is complete
- // the callback simply forwards the result to the collector
+ // the callback simply forwards the result to the result future
resultFuture.thenAccept( (String result) -> {
- asyncCollector.collect(Collections.singleton(new Tuple2<>(str, result)));
+ resultFuture.complete(Collections.singleton(new Tuple2<>(str, result)));
});
}
@@ -142,15 +142,15 @@ class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {
implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
- override def asyncInvoke(str: String, asyncCollector: AsyncCollector[(String, String)]): Unit = {
+ override def asyncInvoke(str: String, resultFutre: ResultFuture[(String, String)]): Unit = {
// issue the asynchronous request, receive a future for the result
val resultFuture: Future[String] = client.query(str)
// set the callback to be executed once the request by the client is complete
- // the callback simply forwards the result to the collector
+ // the callback simply forwards the result to the result future
resultFuture.onSuccess {
- case result: String => asyncCollector.collect(Iterable((str, result)));
+ case result: String => resultFuture.complete(Iterable((str, result)));
}
}
}
@@ -166,8 +166,8 @@ val resultStream: DataStream[(String, String)] =
</div>
</div>
-**Important note**: The `AsyncCollector` is completed with the first call of `AsyncCollector.collect`.
-All subsequent `collect` calls will be ignored.
+**Important note**: The `ResultFuture` is completed with the first call of `ResultFuture.complete`.
+All subsequent `complete` calls will be ignored.
The following two parameters control the asynchronous operations:
@@ -229,7 +229,7 @@ asynchronous requests in checkpoints and restores/re-triggers the requests when
For implementations with *Futures* that have an *Executor* (or *ExecutionContext* in Scala) for callbacks, we suggets to use a `DirectExecutor`, because the
callback typically does minimal work, and a `DirectExecutor` avoids an additional thread-to-thread handover overhead. The callback typically only hands
-the result to the `AsyncCollector`, which adds it to the output buffer. From there, the heavy logic that includes record emission and interaction
+the result to the `ResultFuture`, which adds it to the output buffer. From there, the heavy logic that includes record emission and interaction
with the checkpoint bookkeepting happens in a dedicated thread-pool anyways.
A `DirectExecutor` can be obtained via `org.apache.flink.runtime.concurrent.Executors.directExecutor()` or
http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
index 748cb82..95379e3 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
@@ -29,8 +29,8 @@ import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
@@ -178,7 +178,7 @@ public class AsyncIOExample {
}
@Override
- public void asyncInvoke(final Integer input, final AsyncCollector<String> collector) throws Exception {
+ public void asyncInvoke(final Integer input, final ResultFuture<String> resultFuture) throws Exception {
this.executorService.submit(new Runnable() {
@Override
public void run() {
@@ -188,13 +188,13 @@ public class AsyncIOExample {
Thread.sleep(sleep);
if (random.nextFloat() < failRatio) {
- collector.collect(new Exception("wahahahaha..."));
+ resultFuture.completeExceptionally(new Exception("wahahahaha..."));
} else {
- collector.collect(
+ resultFuture.complete(
Collections.singletonList("key-" + (input % 10)));
}
} catch (InterruptedException e) {
- collector.collect(new ArrayList<String>(0));
+ resultFuture.complete(new ArrayList<String>(0));
}
}
});
http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala
index 69c4c0a..5808aaa 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.scala.async.AsyncCollector
+import org.apache.flink.streaming.api.scala.async.ResultFuture
import scala.concurrent.{ExecutionContext, Future}
@@ -38,9 +38,9 @@ object AsyncIOExample {
val input = env.addSource(new SimpleSource())
val asyncMapped = AsyncDataStream.orderedWait(input, timeout, TimeUnit.MILLISECONDS, 10) {
- (input, collector: AsyncCollector[Int]) =>
+ (input, collector: ResultFuture[Int]) =>
Future {
- collector.collect(Seq(input))
+ collector.complete(Seq(input))
} (ExecutionContext.global)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
index 5bb4459..2ac218d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.functions.async;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import java.io.Serializable;
@@ -28,21 +27,21 @@ import java.io.Serializable;
* A function to trigger Async I/O operation.
*
* <p>For each #asyncInvoke, an async io operation can be triggered, and once it has been done,
- * the result can be collected by calling {@link AsyncCollector#collect}. For each async
+ * the result can be collected by calling {@link ResultFuture#complete}. For each async
* operation, its context is stored in the operator immediately after invoking
* #asyncInvoke, avoiding blocking for each stream input as long as the internal buffer is not full.
*
- * <p>{@link AsyncCollector} can be passed into callbacks or futures to collect the result data.
+ * <p>{@link ResultFuture} can be passed into callbacks or futures to collect the result data.
* An error can also be propagate to the async IO operator by
- * {@link AsyncCollector#collect(Throwable)}.
+ * {@link ResultFuture#completeExceptionally(Throwable)}.
*
* <p>Callback example usage:
*
* <pre>{@code
* public class HBaseAsyncFunc implements AsyncFunction<String, String> {
*
- * public void asyncInvoke(String row, AsyncCollector<String> collector) throws Exception {
- * HBaseCallback cb = new HBaseCallback(collector);
+ * public void asyncInvoke(String row, ResultFuture<String> result) throws Exception {
+ * HBaseCallback cb = new HBaseCallback(result);
* Get get = new Get(Bytes.toBytes(row));
* hbase.asyncGet(get, cb);
* }
@@ -54,16 +53,16 @@ import java.io.Serializable;
* <pre>{@code
* public class HBaseAsyncFunc implements AsyncFunction<String, String> {
*
- * public void asyncInvoke(String row, final AsyncCollector<String> collector) throws Exception {
+ * public void asyncInvoke(String row, final ResultFuture<String> result) throws Exception {
* Get get = new Get(Bytes.toBytes(row));
* ListenableFuture<Result> future = hbase.asyncGet(get);
* Futures.addCallback(future, new FutureCallback<Result>() {
* public void onSuccess(Result result) {
* List<String> ret = process(result);
- * collector.collect(ret);
+ * result.complete(ret);
* }
* public void onFailure(Throwable thrown) {
- * collector.collect(thrown);
+ * result.completeExceptionally(thrown);
* }
* });
* }
@@ -80,9 +79,9 @@ public interface AsyncFunction<IN, OUT> extends Function, Serializable {
* Trigger async operation for each stream input.
*
* @param input element coming from an upstream task
- * @param collector to collect the result data
+ * @param resultFuture to be completed with the result data
* @exception Exception in case of a user code error. An exception will make the task fail and
* trigger fail-over process.
*/
- void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception;
+ void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/ResultFuture.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/ResultFuture.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/ResultFuture.java
new file mode 100644
index 0000000..934341e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/ResultFuture.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Collection;
+
+/**
+ * {@link ResultFuture} collects data / error in user codes while processing async i/o.
+ *
+ * @param <OUT> Output type
+ */
+@PublicEvolving
+public interface ResultFuture<OUT> {
+ /**
+ * Completes the result future with a collection of result objects.
+ *
+ * <p>Note that it should be called for exactly one time in the user code.
+ * Calling this function for multiple times will cause data lose.
+ *
+ * <p>Put all results in a {@link Collection} and then emit output.
+ *
+ * @param result A list of results.
+ */
+ void complete(Collection<OUT> result);
+
+ /**
+ * Completes the result future exceptionally with an exception.
+ *
+ * @param error A Throwable object.
+ */
+ void completeExceptionally(Throwable error);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
index 84f9e53..b6ce862 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
@@ -43,7 +43,6 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.apache.flink.types.Value;
import org.apache.flink.util.Preconditions;
@@ -85,7 +84,7 @@ public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction im
}
@Override
- public abstract void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception;
+ public abstract void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;
// -----------------------------------------------------------------------------------------
// Wrapper classes
http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
deleted file mode 100644
index 964c13a..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.async.collector;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-import java.util.Collection;
-
-/**
- * {@link AsyncCollector} collects data / error in user codes while processing async i/o.
- *
- * @param <OUT> Output type
- */
-@PublicEvolving
-public interface AsyncCollector<OUT> {
- /**
- * Set result.
- *
- * <p>Note that it should be called for exactly one time in the user code.
- * Calling this function for multiple times will cause data lose.
- *
- * <p>Put all results in a {@link Collection} and then emit output.
- *
- * @param result A list of results.
- */
- void collect(Collection<OUT> result);
-
- /**
- * Set error.
- *
- * @param error A Throwable object.
- */
- void collect(Throwable error);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index a0f626e..3dfa8aa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.AsyncDataStream.OutputMode;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -57,7 +57,7 @@ import java.util.concurrent.TimeoutException;
/**
* The {@link AsyncWaitOperator} allows to asynchronously process incoming stream records. For that
- * the operator creates an {@link AsyncCollector} which is passed to an {@link AsyncFunction}.
+ * the operator creates an {@link ResultFuture} which is passed to an {@link AsyncFunction}.
* Within the async function, the user can complete the async collector arbitrarily. Once the async
* collector has been completed, the result is emitted by the operator's emitter to downstream
* operators.
@@ -209,7 +209,7 @@ public class AsyncWaitOperator<IN, OUT>
new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
- streamRecordBufferEntry.collect(
+ streamRecordBufferEntry.completeExceptionally(
new TimeoutException("Async function call has timed out."));
}
});
http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
index 2204109..0a1a2db 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
@@ -68,7 +68,7 @@ public class Emitter<OUT> implements Runnable {
this.checkpointLock = Preconditions.checkNotNull(checkpointLock, "checkpointLock");
this.output = Preconditions.checkNotNull(output, "output");
- this.streamElementQueue = Preconditions.checkNotNull(streamElementQueue, "asyncCollectorBuffer");
+ this.streamElementQueue = Preconditions.checkNotNull(streamElementQueue, "streamElementQueue");
this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");
this.timestampedCollector = new TimestampedCollector<>(this.output);
http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
index 2aca10e..796b44f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.operators.async.queue;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.util.Collection;
@@ -28,14 +28,14 @@ import java.util.concurrent.CompletableFuture;
/**
* {@link StreamElementQueueEntry} implementation for {@link StreamRecord}. This class also acts
- * as the {@link AsyncCollector} implementation which is given to the {@link AsyncFunction}. The
+ * as the {@link ResultFuture} implementation which is given to the {@link AsyncFunction}. The
* async function completes this class with a collection of results.
*
* @param <OUT> Type of the asynchronous collection result
*/
@Internal
public class StreamRecordQueueEntry<OUT> extends StreamElementQueueEntry<Collection<OUT>>
- implements AsyncCollectionResult<OUT>, AsyncCollector<OUT> {
+ implements AsyncCollectionResult<OUT>, ResultFuture<OUT> {
/** Timestamp information. */
private final boolean hasTimestamp;
@@ -74,12 +74,12 @@ public class StreamRecordQueueEntry<OUT> extends StreamElementQueueEntry<Collect
}
@Override
- public void collect(Collection<OUT> result) {
+ public void complete(Collection<OUT> result) {
resultFuture.complete(result);
}
@Override
- public void collect(Throwable error) {
+ public void completeExceptionally(Throwable error) {
resultFuture.completeExceptionally(error);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
index 224b376..aba37df 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.junit.Test;
@@ -55,7 +54,7 @@ public class RichAsyncFunctionTest {
private static final long serialVersionUID = -2023923961609455894L;
@Override
- public void asyncInvoke(Integer input, AsyncCollector<Integer> collector) throws Exception {
+ public void asyncInvoke(Integer input, ResultFuture<Integer> resultFuture) throws Exception {
// no op
}
};
@@ -94,7 +93,7 @@ public class RichAsyncFunctionTest {
private static final long serialVersionUID = 1707630162838967972L;
@Override
- public void asyncInvoke(Integer input, AsyncCollector<Integer> collector) throws Exception {
+ public void asyncInvoke(Integer input, ResultFuture<Integer> resultFuture) throws Exception {
// no op
}
};
http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 1dd99fe..a3cb630 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -44,8 +44,8 @@ import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
@@ -166,11 +166,11 @@ public class AsyncWaitOperatorTest extends TestLogger {
}
@Override
- public void asyncInvoke(final Integer input, final AsyncCollector<Integer> collector) throws Exception {
+ public void asyncInvoke(final Integer input, final ResultFuture<Integer> resultFuture) throws Exception {
executorService.submit(new Runnable() {
@Override
public void run() {
- collector.collect(Collections.singletonList(input * 2));
+ resultFuture.complete(Collections.singletonList(input * 2));
}
});
}
@@ -178,7 +178,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
/**
* A special {@link AsyncFunction} without issuing
- * {@link AsyncCollector#collect} until the latch counts to zero.
+ * {@link ResultFuture#complete} until the latch counts to zero.
* This function is used in the testStateSnapshotAndRestore, ensuring
* that {@link StreamElementQueueEntry} can stay
* in the {@link StreamElementQueue} to be
@@ -194,7 +194,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
}
@Override
- public void asyncInvoke(final Integer input, final AsyncCollector<Integer> collector) throws Exception {
+ public void asyncInvoke(final Integer input, final ResultFuture<Integer> resultFuture) throws Exception {
this.executorService.submit(new Runnable() {
@Override
public void run() {
@@ -205,7 +205,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
// do nothing
}
- collector.collect(Collections.singletonList(input));
+ resultFuture.complete(Collections.singletonList(input));
}
});
}
@@ -854,8 +854,8 @@ public class AsyncWaitOperatorTest extends TestLogger {
private static final long serialVersionUID = -3718276118074877073L;
@Override
- public void asyncInvoke(Integer input, AsyncCollector<Integer> collector) throws Exception {
- collector.collect(Collections.singletonList(input));
+ public void asyncInvoke(Integer input, ResultFuture<Integer> resultFuture) throws Exception {
+ resultFuture.complete(Collections.singletonList(input));
}
},
timeout,
@@ -949,8 +949,8 @@ public class AsyncWaitOperatorTest extends TestLogger {
private static final long serialVersionUID = 6326568632967110990L;
@Override
- public void asyncInvoke(Integer input, AsyncCollector<Integer> collector) throws Exception {
- collector.collect(new Exception("Test exception"));
+ public void asyncInvoke(Integer input, ResultFuture<Integer> resultFuture) throws Exception {
+ resultFuture.completeExceptionally(new Exception("Test exception"));
}
}
@@ -1012,7 +1012,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
private static final long serialVersionUID = -3060481953330480694L;
@Override
- public void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception {
+ public void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception {
// no op
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java
index da2d795..7dedd14 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java
@@ -18,7 +18,7 @@
package org.apache.flink.streaming.api.operators.async;
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue;
import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
@@ -115,9 +115,9 @@ public class EmitterTest extends TestLogger {
queue.put(watermark1);
queue.put(record3);
- record2.collect(Arrays.asList(3, 4));
- record1.collect(Arrays.asList(1, 2));
- record3.collect(Arrays.asList(5, 6));
+ record2.complete(Arrays.asList(3, 4));
+ record1.complete(Arrays.asList(1, 2));
+ record3.complete(Arrays.asList(5, 6));
synchronized (lock) {
while (!queue.isEmpty()) {
@@ -133,7 +133,7 @@ public class EmitterTest extends TestLogger {
}
/**
- * Tests that the emitter handles exceptions occurring in the {@link AsyncCollector} correctly.
+ * Tests that the emitter handles exceptions occurring in the {@link ResultFuture} correctly.
*/
@Test
public void testEmitterWithExceptions() throws Exception {
@@ -167,8 +167,8 @@ public class EmitterTest extends TestLogger {
queue.put(record2);
queue.put(watermark1);
- record2.collect(testException);
- record1.collect(Arrays.asList(1));
+ record2.completeExceptionally(testException);
+ record1.complete(Arrays.asList(1));
synchronized (lock) {
while (!queue.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
index f3b68c4..c7b811a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
@@ -110,15 +110,15 @@ public class OrderedStreamElementQueueTest extends TestLogger {
Assert.assertFalse(pollOperation.isDone());
- entry2.collect(Collections.<Integer>emptyList());
+ entry2.complete(Collections.<Integer>emptyList());
- entry4.collect(Collections.<Integer>emptyList());
+ entry4.complete(Collections.<Integer>emptyList());
Thread.sleep(10L);
Assert.assertEquals(4, queue.size());
- entry1.collect(Collections.<Integer>emptyList());
+ entry1.complete(Collections.<Integer>emptyList());
Assert.assertEquals(expected, pollOperation.get());
http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
index d396756..7315f65 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
@@ -150,7 +150,7 @@ public class StreamElementQueueTest extends TestLogger {
Assert.assertEquals(watermarkQueueEntry, queue.poll());
Assert.assertEquals(1, queue.size());
- streamRecordQueueEntry.collect(Collections.<Integer>emptyList());
+ streamRecordQueueEntry.complete(Collections.<Integer>emptyList());
Assert.assertEquals(streamRecordQueueEntry, queue.poll());
@@ -191,7 +191,7 @@ public class StreamElementQueueTest extends TestLogger {
// but it shouldn't ;-)
Assert.assertFalse(putOperation.isDone());
- streamRecordQueueEntry.collect(Collections.<Integer>emptyList());
+ streamRecordQueueEntry.complete(Collections.<Integer>emptyList());
// polling the completed head element frees the queue again
Assert.assertEquals(streamRecordQueueEntry, queue.poll());
@@ -259,7 +259,7 @@ public class StreamElementQueueTest extends TestLogger {
Assert.assertFalse(pollOperation.isDone());
- streamRecordQueueEntry.collect(Collections.<Integer>emptyList());
+ streamRecordQueueEntry.complete(Collections.<Integer>emptyList());
Assert.assertEquals(streamRecordQueueEntry, pollOperation.get());
http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
index cc0bc30..acc6b8e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
@@ -110,13 +110,13 @@ public class UnorderedStreamElementQueueTest extends TestLogger {
executor);
// this should not fulfill the poll, because R3 is behind W1
- record3.collect(Collections.<Integer>emptyList());
+ record3.complete(Collections.<Integer>emptyList());
Thread.sleep(10L);
Assert.assertFalse(firstPoll.isDone());
- record2.collect(Collections.<Integer>emptyList());
+ record2.complete(Collections.<Integer>emptyList());
Assert.assertEquals(record2, firstPoll.get());
@@ -130,15 +130,15 @@ public class UnorderedStreamElementQueueTest extends TestLogger {
},
executor);
- record6.collect(Collections.<Integer>emptyList());
- record4.collect(Collections.<Integer>emptyList());
+ record6.complete(Collections.<Integer>emptyList());
+ record4.complete(Collections.<Integer>emptyList());
Thread.sleep(10L);
// The future should not be completed because R1 has not been completed yet
Assert.assertFalse(secondPoll.isDone());
- record1.collect(Collections.<Integer>emptyList());
+ record1.complete(Collections.<Integer>emptyList());
Assert.assertEquals(record1, secondPoll.get());
@@ -180,7 +180,7 @@ public class UnorderedStreamElementQueueTest extends TestLogger {
Assert.assertFalse(thirdPoll.isDone());
- record5.collect(Collections.<Integer>emptyList());
+ record5.complete(Collections.<Integer>emptyList());
Assert.assertEquals(record5, thirdPoll.get());
http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
index 67af484..e91922a 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
@@ -21,9 +21,9 @@ package org.apache.flink.streaming.api.scala
import org.apache.flink.annotation.PublicEvolving
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.{AsyncDataStream => JavaAsyncDataStream}
-import org.apache.flink.streaming.api.functions.async.collector.{AsyncCollector => JavaAsyncCollector}
+import org.apache.flink.streaming.api.functions.async.{ResultFuture => JavaResultFuture}
import org.apache.flink.streaming.api.functions.async.{AsyncFunction => JavaAsyncFunction}
-import org.apache.flink.streaming.api.scala.async.{AsyncCollector, AsyncFunction, JavaAsyncCollectorWrapper}
+import org.apache.flink.streaming.api.scala.async.{AsyncFunction, JavaResultFutureWrapper, ResultFuture}
import org.apache.flink.util.Preconditions
import scala.concurrent.duration.TimeUnit
@@ -34,7 +34,7 @@ import scala.concurrent.duration.TimeUnit
* Example:
* {{{
* val input: DataStream[String] = ...
- * val asyncFunction: (String, AsyncCollector[String]) => Unit = ...
+ * val asyncFunction: (String, ResultFuture[String]) => Unit = ...
*
* AsyncDataStream.orderedWait(input, asyncFunction, timeout, TimeUnit.MILLISECONDS, 100)
* }}}
@@ -68,8 +68,8 @@ object AsyncDataStream {
: DataStream[OUT] = {
val javaAsyncFunction = new JavaAsyncFunction[IN, OUT] {
- override def asyncInvoke(input: IN, collector: JavaAsyncCollector[OUT]): Unit = {
- asyncFunction.asyncInvoke(input, new JavaAsyncCollectorWrapper(collector))
+ override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
+ asyncFunction.asyncInvoke(input, new JavaResultFutureWrapper(resultFuture))
}
}
@@ -126,7 +126,7 @@ object AsyncDataStream {
timeout: Long,
timeUnit: TimeUnit,
capacity: Int) (
- asyncFunction: (IN, AsyncCollector[OUT]) => Unit)
+ asyncFunction: (IN, ResultFuture[OUT]) => Unit)
: DataStream[OUT] = {
Preconditions.checkNotNull(asyncFunction)
@@ -134,9 +134,9 @@ object AsyncDataStream {
val cleanAsyncFunction = input.executionEnvironment.scalaClean(asyncFunction)
val func = new JavaAsyncFunction[IN, OUT] {
- override def asyncInvoke(input: IN, collector: JavaAsyncCollector[OUT]): Unit = {
+ override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
- cleanAsyncFunction(input, new JavaAsyncCollectorWrapper[OUT](collector))
+ cleanAsyncFunction(input, new JavaResultFutureWrapper[OUT](resultFuture))
}
}
@@ -167,7 +167,7 @@ object AsyncDataStream {
input: DataStream[IN],
timeout: Long,
timeUnit: TimeUnit) (
- asyncFunction: (IN, AsyncCollector[OUT]) => Unit)
+ asyncFunction: (IN, ResultFuture[OUT]) => Unit)
: DataStream[OUT] = {
unorderedWait(input, timeout, timeUnit, DEFAULT_QUEUE_CAPACITY)(asyncFunction)
}
@@ -195,8 +195,8 @@ object AsyncDataStream {
: DataStream[OUT] = {
val javaAsyncFunction = new JavaAsyncFunction[IN, OUT] {
- override def asyncInvoke(input: IN, collector: JavaAsyncCollector[OUT]): Unit = {
- asyncFunction.asyncInvoke(input, new JavaAsyncCollectorWrapper[OUT](collector))
+ override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
+ asyncFunction.asyncInvoke(input, new JavaResultFutureWrapper[OUT](resultFuture))
}
}
@@ -251,7 +251,7 @@ object AsyncDataStream {
timeout: Long,
timeUnit: TimeUnit,
capacity: Int) (
- asyncFunction: (IN, AsyncCollector[OUT]) => Unit)
+ asyncFunction: (IN, ResultFuture[OUT]) => Unit)
: DataStream[OUT] = {
Preconditions.checkNotNull(asyncFunction)
@@ -259,8 +259,8 @@ object AsyncDataStream {
val cleanAsyncFunction = input.executionEnvironment.scalaClean(asyncFunction)
val func = new JavaAsyncFunction[IN, OUT] {
- override def asyncInvoke(input: IN, collector: JavaAsyncCollector[OUT]): Unit = {
- cleanAsyncFunction(input, new JavaAsyncCollectorWrapper[OUT](collector))
+ override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
+ cleanAsyncFunction(input, new JavaResultFutureWrapper[OUT](resultFuture))
}
}
@@ -290,7 +290,7 @@ object AsyncDataStream {
input: DataStream[IN],
timeout: Long,
timeUnit: TimeUnit) (
- asyncFunction: (IN, AsyncCollector[OUT]) => Unit)
+ asyncFunction: (IN, ResultFuture[OUT]) => Unit)
: DataStream[OUT] = {
orderedWait(input, timeout, timeUnit, DEFAULT_QUEUE_CAPACITY)(asyncFunction)
http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncCollector.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncCollector.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncCollector.scala
deleted file mode 100644
index a149c88..0000000
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncCollector.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala.async
-
-import org.apache.flink.annotation.PublicEvolving
-
-/**
- * The async collector collects data/errors from the user code while processing
- * asynchronous I/O operations.
- *
- * @tparam OUT type of the output element
- */
-@PublicEvolving
-trait AsyncCollector[OUT] {
-
- /**
- * Complete the async collector with a set of result elements.
- *
- * Note that it should be called for exactly one time in the user code.
- * Calling this function for multiple times will cause data lose.
- *
- * Put all results in a [[Iterable]] and then issue AsyncCollector.collect(Iterable).
- *
- * @param result to complete the async collector with
- */
- def collect(result: Iterable[OUT])
-
- /**
- * Complete this async collector with an error.
- *
- * @param throwable to complete the async collector with
- */
- def collect(throwable: Throwable)
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala
index 72e3702..aea6b57 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala
@@ -24,13 +24,13 @@ import org.apache.flink.annotation.PublicEvolving
* A function to trigger async I/O operations.
*
* For each asyncInvoke an async io operation can be triggered, and once it has been done,
- * the result can be collected by calling AsyncCollector.collect. For each async operation, its
+ * the result can be collected by calling ResultFuture.complete. For each async operation, its
* context is stored in the operator immediately after invoking asyncInvoke, avoiding blocking for
* each stream input as long as the internal buffer is not full.
*
- * [[AsyncCollector]] can be passed into callbacks or futures to collect the result data.
+ * [[ResultFuture]] can be passed into callbacks or futures to collect the result data.
* An error can also be propagate to the async IO operator by
- * [[AsyncCollector.collect(Throwable)]].
+ * [[ResultFuture.completeExceptionally(Throwable)]].
*
* @tparam IN The type of the input element
* @tparam OUT The type of the output elements
@@ -42,7 +42,7 @@ trait AsyncFunction[IN, OUT] {
* Trigger the async operation for each stream input
*
* @param input element coming from an upstream task
- * @param collector to collect the result data
+ * @param resultFuture to be completed with the result data
*/
- def asyncInvoke(input: IN, collector: AsyncCollector[OUT]): Unit
+ def asyncInvoke(input: IN, resultFuture: ResultFuture[OUT]): Unit
}
http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/JavaAsyncCollectorWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/JavaAsyncCollectorWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/JavaAsyncCollectorWrapper.scala
deleted file mode 100644
index 3c5e95a..0000000
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/JavaAsyncCollectorWrapper.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala.async
-
-import org.apache.flink.annotation.Internal
-import org.apache.flink.streaming.api.functions.async.collector.{AsyncCollector => JavaAsyncCollector}
-
-import scala.collection.JavaConverters._
-
-/**
- * Internal wrapper class to map a Flink's Java API [[JavaAsyncCollector]] to a Scala
- * [[AsyncCollector]].
- *
- * @param javaAsyncCollector to forward the calls to
- * @tparam OUT type of the output elements
- */
-@Internal
-class JavaAsyncCollectorWrapper[OUT](val javaAsyncCollector: JavaAsyncCollector[OUT])
- extends AsyncCollector[OUT] {
- override def collect(result: Iterable[OUT]): Unit = {
- javaAsyncCollector.collect(result.asJavaCollection)
- }
-
- override def collect(throwable: Throwable): Unit = {
- javaAsyncCollector.collect(throwable)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/JavaResultFutureWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/JavaResultFutureWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/JavaResultFutureWrapper.scala
new file mode 100644
index 0000000..7680b89
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/JavaResultFutureWrapper.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.async
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.streaming.api.functions.async
+import org.apache.flink.streaming.api.functions.async.ResultFuture
+
+import scala.collection.JavaConverters._
+
+/**
+ * Internal wrapper class to map a Flink's Java API [[ResultFuture]] to a Scala
+ * [[ResultFuture]].
+ *
+ * @param javaResultFuture to forward the calls to
+ * @tparam OUT type of the output elements
+ */
+@Internal
+class JavaResultFutureWrapper[OUT](val javaResultFuture: async.ResultFuture[OUT])
+ extends ResultFuture[OUT] {
+ override def complete(result: Iterable[OUT]): Unit = {
+ javaResultFuture.complete(result.asJavaCollection)
+ }
+
+ override def completeExceptionally(throwable: Throwable): Unit = {
+ javaResultFuture.completeExceptionally(throwable)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/ResultFuture.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/ResultFuture.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/ResultFuture.scala
new file mode 100644
index 0000000..516e693
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/ResultFuture.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.async
+
+import org.apache.flink.annotation.PublicEvolving
+
+/**
+ * The result future collects data/errors from the user code while processing
+ * asynchronous I/O operations.
+ *
+ * @tparam OUT type of the output element
+ */
+@PublicEvolving
+trait ResultFuture[OUT] {
+
+ /**
+ * Complete the ResultFuture with a set of result elements.
+ *
+ * Note that it should be called for exactly one time in the user code.
+ * Calling this function for multiple times will cause data lose.
+ *
+ * Put all results in a [[Iterable]] and then issue ResultFuture.complete(Iterable).
+ *
+ * @param result to complete the async collector with
+ */
+ def complete(result: Iterable[OUT])
+
+ /**
+ * Complete this ResultFuture with an error.
+ *
+ * @param throwable to complete the async collector with
+ */
+ def completeExceptionally(throwable: Throwable)
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/40cec17f/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
index 32a04fa..8a910d9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
@@ -30,12 +30,11 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-
import org.apache.flink.util.Collector;
import org.apache.flink.util.MathUtils;
@@ -243,11 +242,11 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
@Override
public void asyncInvoke(final Tuple2<Integer, NonSerializable> input,
- final AsyncCollector<Integer> collector) throws Exception {
+ final ResultFuture<Integer> resultFuture) throws Exception {
executorService.submit(new Runnable() {
@Override
public void run() {
- collector.collect(Collections.singletonList(input.f0 + input.f0));
+ resultFuture.complete(Collections.singletonList(input.f0 + input.f0));
}
});
}