You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/12/20 05:05:43 UTC
[7/7] flink git commit: [FLINK-4391] Add Scala API for asynchronous
I/O operations
[FLINK-4391] Add Scala API for asynchronous I/O operations
This commit also adds a small example for asynchronous I/O with Scala.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bfdaa382
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bfdaa382
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bfdaa382
Branch: refs/heads/master
Commit: bfdaa3821c71f9fa3a3ff85f56154995d98b18b5
Parents: 6c5a871
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Dec 20 04:56:38 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Dec 20 05:29:03 2016 +0100
----------------------------------------------------------------------
.../examples/async/AsyncIOExample.java | 2 +-
.../scala/examples/async/AsyncIOExample.scala | 71 +++++
.../async/collector/AsyncCollector.java | 3 -
.../streaming/api/operators/async/Emitter.java | 6 +-
.../streaming/api/scala/AsyncDataStream.scala | 298 +++++++++++++++++++
.../api/scala/async/AsyncCollector.scala | 50 ++++
.../api/scala/async/AsyncFunction.scala | 48 +++
.../scala/async/JavaAsyncCollectorWrapper.scala | 43 +++
8 files changed, 515 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bfdaa382/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
index 2b05983..9b1f78f 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
@@ -93,7 +93,7 @@ public class AsyncIOExample {
start = 0;
}
}
- Thread.sleep(10);
+ Thread.sleep(10L);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bfdaa382/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala
new file mode 100644
index 0000000..69c4c0a
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.async
+
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.scala.async.AsyncCollector
+
+import scala.concurrent.{ExecutionContext, Future}
+
+object AsyncIOExample {
+
+ def main(args: Array[String]) {
+ val timeout = 10000L
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+ val input = env.addSource(new SimpleSource())
+
+ val asyncMapped = AsyncDataStream.orderedWait(input, timeout, TimeUnit.MILLISECONDS, 10) {
+ (input, collector: AsyncCollector[Int]) =>
+ Future {
+ collector.collect(Seq(input))
+ } (ExecutionContext.global)
+ }
+
+ asyncMapped.print()
+
+ env.execute("Async I/O job")
+ }
+}
+
+class SimpleSource extends ParallelSourceFunction[Int] {
+ var running = true
+ var counter = 0
+
+ override def run(ctx: SourceContext[Int]): Unit = {
+ while (running) {
+ ctx.getCheckpointLock.synchronized {
+ ctx.collect(counter)
+ }
+ counter += 1
+
+ Thread.sleep(10L)
+ }
+ }
+
+ override def cancel(): Unit = {
+ running = false
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bfdaa382/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
index a072aca..25078ae 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
@@ -37,9 +37,6 @@ public interface AsyncCollector<OUT> {
* <p>
* Put all results in a {@link Collection} and then issue
* {@link AsyncCollector#collect(Collection)}.
- * <p>
- * If the result is NULL, it will cause task fail. If collecting empty result set is allowable and
- * should not cause task fail-over, then try to collect an empty list collection.
*
* @param result A list of results.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/bfdaa382/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
index c122d23..a07abe1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
@@ -136,8 +136,10 @@ public class Emitter<OUT> implements Runnable {
try {
Collection<OUT> resultCollection = streamRecordResult.get();
- for (OUT result : resultCollection) {
- timestampedCollector.collect(result);
+ if (resultCollection != null) {
+ for (OUT result : resultCollection) {
+ timestampedCollector.collect(result);
+ }
}
} catch (Exception e) {
operatorActions.failOperator(
http://git-wip-us.apache.org/repos/asf/flink/blob/bfdaa382/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
new file mode 100644
index 0000000..67af484
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.datastream.{AsyncDataStream => JavaAsyncDataStream}
+import org.apache.flink.streaming.api.functions.async.collector.{AsyncCollector => JavaAsyncCollector}
+import org.apache.flink.streaming.api.functions.async.{AsyncFunction => JavaAsyncFunction}
+import org.apache.flink.streaming.api.scala.async.{AsyncCollector, AsyncFunction, JavaAsyncCollectorWrapper}
+import org.apache.flink.util.Preconditions
+
+import scala.concurrent.duration.TimeUnit
+
+/**
+ * A helper class to apply [[AsyncFunction]] to a data stream.
+ *
+ * Example:
+ * {{{
+ * val input: DataStream[String] = ...
+ * val asyncFunction: (String, AsyncCollector[String]) => Unit = ...
+ *
+ * AsyncDataStream.orderedWait(input, asyncFunction, timeout, TimeUnit.MILLISECONDS, 100)
+ * }}}
+ */
+@PublicEvolving
+object AsyncDataStream {
+
+ private val DEFAULT_QUEUE_CAPACITY = 100
+
+ /**
+ * Apply an asynchronous function on the input data stream. The output order is only maintained
+ * with respect to watermarks. Stream records which lie between the same two watermarks, can be
+ * re-ordered.
+ *
+ * @param input to apply the async function on
+ * @param asyncFunction to use
+ * @param timeout for the asynchronous operation to complete
+ * @param timeUnit of the timeout
+ * @param capacity of the operator which is equivalent to the number of concurrent asynchronous
+ * operations
+ * @tparam IN Type of the input record
+ * @tparam OUT Type of the output record
+ * @return the resulting stream containing the asynchronous results
+ */
+ def unorderedWait[IN, OUT: TypeInformation](
+ input: DataStream[IN],
+ asyncFunction: AsyncFunction[IN, OUT],
+ timeout: Long,
+ timeUnit: TimeUnit,
+ capacity: Int)
+ : DataStream[OUT] = {
+
+ val javaAsyncFunction = new JavaAsyncFunction[IN, OUT] {
+ override def asyncInvoke(input: IN, collector: JavaAsyncCollector[OUT]): Unit = {
+ asyncFunction.asyncInvoke(input, new JavaAsyncCollectorWrapper(collector))
+ }
+ }
+
+ val outType : TypeInformation[OUT] = implicitly[TypeInformation[OUT]]
+
+ asScalaStream(JavaAsyncDataStream.unorderedWait[IN, OUT](
+ input.javaStream,
+ javaAsyncFunction,
+ timeout,
+ timeUnit,
+ capacity).returns(outType))
+ }
+
+ /**
+ * Apply an asynchronous function on the input data stream. The output order is only maintained
+ * with respect to watermarks. Stream records which lie between the same two watermarks, can be
+ * re-ordered.
+ *
+ * @param input to apply the async function on
+ * @param asyncFunction to use
+ * @param timeout for the asynchronous operation to complete
+ * @param timeUnit of the timeout
+ * @tparam IN Type of the input record
+ * @tparam OUT Type of the output record
+ * @return the resulting stream containing the asynchronous results
+ */
+ def unorderedWait[IN, OUT: TypeInformation](
+ input: DataStream[IN],
+ asyncFunction: AsyncFunction[IN, OUT],
+ timeout: Long,
+ timeUnit: TimeUnit)
+ : DataStream[OUT] = {
+
+ unorderedWait(input, asyncFunction, timeout, timeUnit, DEFAULT_QUEUE_CAPACITY)
+ }
+
+ /**
+ * Apply an asynchronous function on the input data stream. The output order is only maintained
+ * with respect to watermarks. Stream records which lie between the same two watermarks, can be
+ * re-ordered.
+ *
+ * @param input to apply the async function on
+ * @param timeout for the asynchronous operation to complete
+ * @param timeUnit of the timeout
+ * @param capacity of the operator which is equivalent to the number of concurrent asynchronous
+ * operations
+ * @param asyncFunction to use
+ * @tparam IN Type of the input record
+ * @tparam OUT Type of the output record
+ * @return the resulting stream containing the asynchronous results
+ */
+ def unorderedWait[IN, OUT: TypeInformation](
+ input: DataStream[IN],
+ timeout: Long,
+ timeUnit: TimeUnit,
+ capacity: Int) (
+ asyncFunction: (IN, AsyncCollector[OUT]) => Unit)
+ : DataStream[OUT] = {
+
+ Preconditions.checkNotNull(asyncFunction)
+
+ val cleanAsyncFunction = input.executionEnvironment.scalaClean(asyncFunction)
+
+ val func = new JavaAsyncFunction[IN, OUT] {
+ override def asyncInvoke(input: IN, collector: JavaAsyncCollector[OUT]): Unit = {
+
+ cleanAsyncFunction(input, new JavaAsyncCollectorWrapper[OUT](collector))
+ }
+ }
+
+ val outType : TypeInformation[OUT] = implicitly[TypeInformation[OUT]]
+
+ asScalaStream(JavaAsyncDataStream.unorderedWait[IN, OUT](
+ input.javaStream,
+ func,
+ timeout,
+ timeUnit,
+ capacity).returns(outType))
+ }
+
+ /**
+ * Apply an asynchronous function on the input data stream. The output order is only maintained
+ * with respect to watermarks. Stream records which lie between the same two watermarks, can be
+ * re-ordered.
+ *
+ * @param input to apply the async function on
+ * @param timeout for the asynchronous operation to complete
+ * @param timeUnit of the timeout
+ * @param asyncFunction to use
+ * @tparam IN Type of the input record
+ * @tparam OUT Type of the output record
+ * @return the resulting stream containing the asynchronous results
+ */
+ def unorderedWait[IN, OUT: TypeInformation](
+ input: DataStream[IN],
+ timeout: Long,
+ timeUnit: TimeUnit) (
+ asyncFunction: (IN, AsyncCollector[OUT]) => Unit)
+ : DataStream[OUT] = {
+ unorderedWait(input, timeout, timeUnit, DEFAULT_QUEUE_CAPACITY)(asyncFunction)
+ }
+
+ /**
+ * Apply an asynchronous function on the input data stream. The output order is the same as the
+ * input order of the elements.
+ *
+ * @param input to apply the async function on
+ * @param asyncFunction to use
+ * @param timeout for the asynchronous operation to complete
+ * @param timeUnit of the timeout
+ * @param capacity of the operator which is equivalent to the number of concurrent asynchronous
+ * operations
+ * @tparam IN Type of the input record
+ * @tparam OUT Type of the output record
+ * @return the resulting stream containing the asynchronous results
+ */
+ def orderedWait[IN, OUT: TypeInformation](
+ input: DataStream[IN],
+ asyncFunction: AsyncFunction[IN, OUT],
+ timeout: Long,
+ timeUnit: TimeUnit,
+ capacity: Int)
+ : DataStream[OUT] = {
+
+ val javaAsyncFunction = new JavaAsyncFunction[IN, OUT] {
+ override def asyncInvoke(input: IN, collector: JavaAsyncCollector[OUT]): Unit = {
+ asyncFunction.asyncInvoke(input, new JavaAsyncCollectorWrapper[OUT](collector))
+ }
+ }
+
+ val outType : TypeInformation[OUT] = implicitly[TypeInformation[OUT]]
+
+ asScalaStream(JavaAsyncDataStream.orderedWait[IN, OUT](
+ input.javaStream,
+ javaAsyncFunction,
+ timeout,
+ timeUnit,
+ capacity).returns(outType))
+ }
+
+ /**
+ * Apply an asynchronous function on the input data stream. The output order is the same as the
+ * input order of the elements.
+ *
+ * @param input to apply the async function on
+ * @param asyncFunction to use
+ * @param timeout for the asynchronous operation to complete
+ * @param timeUnit of the timeout
+ * @tparam IN Type of the input record
+ * @tparam OUT Type of the output record
+ * @return the resulting stream containing the asynchronous results
+ */
+ def orderedWait[IN, OUT: TypeInformation](
+ input: DataStream[IN],
+ asyncFunction: AsyncFunction[IN, OUT],
+ timeout: Long,
+ timeUnit: TimeUnit)
+ : DataStream[OUT] = {
+
+ orderedWait(input, asyncFunction, timeout, timeUnit, DEFAULT_QUEUE_CAPACITY)
+ }
+
+ /**
+ * Apply an asynchronous function on the input data stream. The output order is the same as the
+ * input order of the elements.
+ *
+ * @param input to apply the async function on
+ * @param timeout for the asynchronous operation to complete
+ * @param timeUnit of the timeout
+ * @param capacity of the operator which is equivalent to the number of concurrent asynchronous
+ * operations
+ * @param asyncFunction to use
+ * @tparam IN Type of the input record
+ * @tparam OUT Type of the output record
+ * @return the resulting stream containing the asynchronous results
+ */
+ def orderedWait[IN, OUT: TypeInformation](
+ input: DataStream[IN],
+ timeout: Long,
+ timeUnit: TimeUnit,
+ capacity: Int) (
+ asyncFunction: (IN, AsyncCollector[OUT]) => Unit)
+ : DataStream[OUT] = {
+
+ Preconditions.checkNotNull(asyncFunction)
+
+ val cleanAsyncFunction = input.executionEnvironment.scalaClean(asyncFunction)
+
+ val func = new JavaAsyncFunction[IN, OUT] {
+ override def asyncInvoke(input: IN, collector: JavaAsyncCollector[OUT]): Unit = {
+ cleanAsyncFunction(input, new JavaAsyncCollectorWrapper[OUT](collector))
+ }
+ }
+
+ val outType : TypeInformation[OUT] = implicitly[TypeInformation[OUT]]
+
+ asScalaStream(JavaAsyncDataStream.orderedWait[IN, OUT](
+ input.javaStream,
+ func,
+ timeout,
+ timeUnit,
+ capacity).returns(outType))
+ }
+
+ /**
+ * Apply an asynchronous function on the input data stream. The output order is the same as the
+ * input order of the elements.
+ *
+ * @param input to apply the async function on
+ * @param timeout for the asynchronous operation to complete
+ * @param timeUnit of the timeout
+ * @param asyncFunction to use
+ * @tparam IN Type of the input record
+ * @tparam OUT Type of the output record
+ * @return the resulting stream containing the asynchronous results
+ */
+ def orderedWait[IN, OUT: TypeInformation](
+ input: DataStream[IN],
+ timeout: Long,
+ timeUnit: TimeUnit) (
+ asyncFunction: (IN, AsyncCollector[OUT]) => Unit)
+ : DataStream[OUT] = {
+
+ orderedWait(input, timeout, timeUnit, DEFAULT_QUEUE_CAPACITY)(asyncFunction)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bfdaa382/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncCollector.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncCollector.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncCollector.scala
new file mode 100644
index 0000000..a149c88
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncCollector.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.async
+
+import org.apache.flink.annotation.PublicEvolving
+
+/**
+ * The async collector collects data/errors from the user code while processing
+ * asynchronous I/O operations.
+ *
+ * @tparam OUT type of the output element
+ */
+@PublicEvolving
+trait AsyncCollector[OUT] {
+
+ /**
+ * Complete the async collector with a set of result elements.
+ *
+ * Note that it should be called for exactly one time in the user code.
+ * Calling this function for multiple times will cause data lose.
+ *
+ * Put all results in a [[Iterable]] and then issue AsyncCollector.collect(Iterable).
+ *
+ * @param result to complete the async collector with
+ */
+ def collect(result: Iterable[OUT])
+
+ /**
+ * Complete this async collector with an error.
+ *
+ * @param throwable to complete the async collector with
+ */
+ def collect(throwable: Throwable)
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bfdaa382/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala
new file mode 100644
index 0000000..72e3702
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.async
+
+import org.apache.flink.annotation.PublicEvolving
+
+/**
+ * A function to trigger async I/O operations.
+ *
+ * For each asyncInvoke an async io operation can be triggered, and once it has been done,
+ * the result can be collected by calling AsyncCollector.collect. For each async operation, its
+ * context is stored in the operator immediately after invoking asyncInvoke, avoiding blocking for
+ * each stream input as long as the internal buffer is not full.
+ *
+ * [[AsyncCollector]] can be passed into callbacks or futures to collect the result data.
+ * An error can also be propagate to the async IO operator by
+ * [[AsyncCollector.collect(Throwable)]].
+ *
+ * @tparam IN The type of the input element
+ * @tparam OUT The type of the output elements
+ */
+@PublicEvolving
+trait AsyncFunction[IN, OUT] {
+
+ /**
+ * Trigger the async operation for each stream input
+ *
+ * @param input element coming from an upstream task
+ * @param collector to collect the result data
+ */
+ def asyncInvoke(input: IN, collector: AsyncCollector[OUT]): Unit
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bfdaa382/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/JavaAsyncCollectorWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/JavaAsyncCollectorWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/JavaAsyncCollectorWrapper.scala
new file mode 100644
index 0000000..3c5e95a
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/JavaAsyncCollectorWrapper.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.async
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.streaming.api.functions.async.collector.{AsyncCollector => JavaAsyncCollector}
+
+import scala.collection.JavaConverters._
+
+/**
+ * Internal wrapper class to map a Flink's Java API [[JavaAsyncCollector]] to a Scala
+ * [[AsyncCollector]].
+ *
+ * @param javaAsyncCollector to forward the calls to
+ * @tparam OUT type of the output elements
+ */
+@Internal
+class JavaAsyncCollectorWrapper[OUT](val javaAsyncCollector: JavaAsyncCollector[OUT])
+ extends AsyncCollector[OUT] {
+ override def collect(result: Iterable[OUT]): Unit = {
+ javaAsyncCollector.collect(result.asJavaCollection)
+ }
+
+ override def collect(throwable: Throwable): Unit = {
+ javaAsyncCollector.collect(throwable)
+ }
+}