You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/12/20 05:05:43 UTC

[7/7] flink git commit: [FLINK-4391] Add Scala API for asynchronous I/O operations

[FLINK-4391] Add Scala API for asynchronous I/O operations

This commit also adds a small example for asynchronous I/O with Scala.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bfdaa382
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bfdaa382
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bfdaa382

Branch: refs/heads/master
Commit: bfdaa3821c71f9fa3a3ff85f56154995d98b18b5
Parents: 6c5a871
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Dec 20 04:56:38 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Dec 20 05:29:03 2016 +0100

----------------------------------------------------------------------
 .../examples/async/AsyncIOExample.java          |   2 +-
 .../scala/examples/async/AsyncIOExample.scala   |  71 +++++
 .../async/collector/AsyncCollector.java         |   3 -
 .../streaming/api/operators/async/Emitter.java  |   6 +-
 .../streaming/api/scala/AsyncDataStream.scala   | 298 +++++++++++++++++++
 .../api/scala/async/AsyncCollector.scala        |  50 ++++
 .../api/scala/async/AsyncFunction.scala         |  48 +++
 .../scala/async/JavaAsyncCollectorWrapper.scala |  43 +++
 8 files changed, 515 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bfdaa382/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
index 2b05983..9b1f78f 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
@@ -93,7 +93,7 @@ public class AsyncIOExample {
 						start = 0;
 					}
 				}
-				Thread.sleep(10);
+				Thread.sleep(10L);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bfdaa382/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala
new file mode 100644
index 0000000..69c4c0a
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.async
+
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.scala.async.AsyncCollector
+
+import scala.concurrent.{ExecutionContext, Future}
+
+object AsyncIOExample {
+
+  def main(args: Array[String]) {
+    val timeout = 10000L
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val input = env.addSource(new SimpleSource())
+
+    val asyncMapped = AsyncDataStream.orderedWait(input, timeout, TimeUnit.MILLISECONDS, 10) {
+      (input, collector: AsyncCollector[Int]) =>
+        Future {
+          collector.collect(Seq(input))
+        } (ExecutionContext.global)
+    }
+
+    asyncMapped.print()
+
+    env.execute("Async I/O job")
+  }
+}
+
+class SimpleSource extends ParallelSourceFunction[Int] {
+  var running = true
+  var counter = 0
+
+  override def run(ctx: SourceContext[Int]): Unit = {
+    while (running) {
+      ctx.getCheckpointLock.synchronized {
+        ctx.collect(counter)
+      }
+      counter += 1
+
+      Thread.sleep(10L)
+    }
+  }
+
+  override def cancel(): Unit = {
+    running = false
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bfdaa382/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
index a072aca..25078ae 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
@@ -37,9 +37,6 @@ public interface AsyncCollector<OUT> {
 	 * <p>
 	 * Put all results in a {@link Collection} and then issue
 	 * {@link AsyncCollector#collect(Collection)}.
-	 * <p>
-	 * If the result is NULL, it will cause task fail. If collecting empty result set is allowable and
-	 * should not cause task fail-over, then try to collect an empty list collection.
 	 *
 	 * @param result A list of results.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/bfdaa382/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
index c122d23..a07abe1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
@@ -136,8 +136,10 @@ public class Emitter<OUT> implements Runnable {
 				try {
 					Collection<OUT> resultCollection = streamRecordResult.get();
 
-					for (OUT result : resultCollection) {
-						timestampedCollector.collect(result);
+					if (resultCollection != null) {
+						for (OUT result : resultCollection) {
+							timestampedCollector.collect(result);
+						}
 					}
 				} catch (Exception e) {
 					operatorActions.failOperator(

http://git-wip-us.apache.org/repos/asf/flink/blob/bfdaa382/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
new file mode 100644
index 0000000..67af484
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.datastream.{AsyncDataStream => JavaAsyncDataStream}
+import org.apache.flink.streaming.api.functions.async.collector.{AsyncCollector => JavaAsyncCollector}
+import org.apache.flink.streaming.api.functions.async.{AsyncFunction => JavaAsyncFunction}
+import org.apache.flink.streaming.api.scala.async.{AsyncCollector, AsyncFunction, JavaAsyncCollectorWrapper}
+import org.apache.flink.util.Preconditions
+
+import scala.concurrent.duration.TimeUnit
+
+/**
+  * A helper class to apply [[AsyncFunction]] to a data stream.
+  *
+  * Example:
+  * {{{
+  *   val input: DataStream[String] = ...
+  *   val asyncFunction: (String, AsyncCollector[String]) => Unit = ...
+  *
+  *   AsyncDataStream.orderedWait(input, asyncFunction, timeout, TimeUnit.MILLISECONDS, 100)
+  * }}}
+  */
+@PublicEvolving
+object AsyncDataStream {
+
+  private val DEFAULT_QUEUE_CAPACITY = 100
+
+  /**
+    * Apply an asynchronous function on the input data stream. The output order is only maintained
+    * with respect to watermarks. Stream records which lie between the same two watermarks, can be
+    * re-ordered.
+    *
+    * @param input to apply the async function on
+    * @param asyncFunction to use
+    * @param timeout for the asynchronous operation to complete
+    * @param timeUnit of the timeout
+    * @param capacity of the operator which is equivalent to the number of concurrent asynchronous
+    *                 operations
+    * @tparam IN Type of the input record
+    * @tparam OUT Type of the output record
+    * @return the resulting stream containing the asynchronous results
+    */
+  def unorderedWait[IN, OUT: TypeInformation](
+      input: DataStream[IN],
+      asyncFunction: AsyncFunction[IN, OUT],
+      timeout: Long,
+      timeUnit: TimeUnit,
+      capacity: Int)
+    : DataStream[OUT] = {
+
+    val javaAsyncFunction = new JavaAsyncFunction[IN, OUT] {
+      override def asyncInvoke(input: IN, collector: JavaAsyncCollector[OUT]): Unit = {
+        asyncFunction.asyncInvoke(input, new JavaAsyncCollectorWrapper(collector))
+      }
+    }
+
+    val outType : TypeInformation[OUT] = implicitly[TypeInformation[OUT]]
+
+    asScalaStream(JavaAsyncDataStream.unorderedWait[IN, OUT](
+      input.javaStream,
+      javaAsyncFunction,
+      timeout,
+      timeUnit,
+      capacity).returns(outType))
+  }
+
+  /**
+    * Apply an asynchronous function on the input data stream. The output order is only maintained
+    * with respect to watermarks. Stream records which lie between the same two watermarks, can be
+    * re-ordered.
+    *
+    * @param input to apply the async function on
+    * @param asyncFunction to use
+    * @param timeout for the asynchronous operation to complete
+    * @param timeUnit of the timeout
+    * @tparam IN Type of the input record
+    * @tparam OUT Type of the output record
+    * @return the resulting stream containing the asynchronous results
+    */
+  def unorderedWait[IN, OUT: TypeInformation](
+    input: DataStream[IN],
+    asyncFunction: AsyncFunction[IN, OUT],
+    timeout: Long,
+    timeUnit: TimeUnit)
+  : DataStream[OUT] = {
+
+    unorderedWait(input, asyncFunction, timeout, timeUnit, DEFAULT_QUEUE_CAPACITY)
+  }
+
+  /**
+    * Apply an asynchronous function on the input data stream. The output order is only maintained
+    * with respect to watermarks. Stream records which lie between the same two watermarks, can be
+    * re-ordered.
+    *
+    * @param input to apply the async function on
+    * @param timeout for the asynchronous operation to complete
+    * @param timeUnit of the timeout
+    * @param capacity of the operator which is equivalent to the number of concurrent asynchronous
+    *                 operations
+    * @param asyncFunction to use
+    * @tparam IN Type of the input record
+    * @tparam OUT Type of the output record
+    * @return the resulting stream containing the asynchronous results
+    */
+  def unorderedWait[IN, OUT: TypeInformation](
+      input: DataStream[IN],
+      timeout: Long,
+      timeUnit: TimeUnit,
+      capacity: Int) (
+      asyncFunction: (IN, AsyncCollector[OUT]) => Unit)
+    : DataStream[OUT] = {
+
+    Preconditions.checkNotNull(asyncFunction)
+
+    val cleanAsyncFunction = input.executionEnvironment.scalaClean(asyncFunction)
+
+    val func = new JavaAsyncFunction[IN, OUT] {
+      override def asyncInvoke(input: IN, collector: JavaAsyncCollector[OUT]): Unit = {
+
+        cleanAsyncFunction(input, new JavaAsyncCollectorWrapper[OUT](collector))
+      }
+    }
+
+    val outType : TypeInformation[OUT] = implicitly[TypeInformation[OUT]]
+
+    asScalaStream(JavaAsyncDataStream.unorderedWait[IN, OUT](
+      input.javaStream,
+      func,
+      timeout,
+      timeUnit,
+      capacity).returns(outType))
+  }
+
+  /**
+    * Apply an asynchronous function on the input data stream. The output order is only maintained
+    * with respect to watermarks. Stream records which lie between the same two watermarks, can be
+    * re-ordered.
+    *
+    * @param input to apply the async function on
+    * @param timeout for the asynchronous operation to complete
+    * @param timeUnit of the timeout
+    * @param asyncFunction to use
+    * @tparam IN Type of the input record
+    * @tparam OUT Type of the output record
+    * @return the resulting stream containing the asynchronous results
+    */
+  def unorderedWait[IN, OUT: TypeInformation](
+    input: DataStream[IN],
+    timeout: Long,
+    timeUnit: TimeUnit) (
+    asyncFunction: (IN, AsyncCollector[OUT]) => Unit)
+  : DataStream[OUT] = {
+    unorderedWait(input, timeout, timeUnit, DEFAULT_QUEUE_CAPACITY)(asyncFunction)
+  }
+
+  /**
+    * Apply an asynchronous function on the input data stream. The output order is the same as the
+    * input order of the elements.
+    *
+    * @param input to apply the async function on
+    * @param asyncFunction to use
+    * @param timeout for the asynchronous operation to complete
+    * @param timeUnit of the timeout
+    * @param capacity of the operator which is equivalent to the number of concurrent asynchronous
+    *                 operations
+    * @tparam IN Type of the input record
+    * @tparam OUT Type of the output record
+    * @return the resulting stream containing the asynchronous results
+    */
+  def orderedWait[IN, OUT: TypeInformation](
+      input: DataStream[IN],
+      asyncFunction: AsyncFunction[IN, OUT],
+      timeout: Long,
+      timeUnit: TimeUnit,
+      capacity: Int)
+    : DataStream[OUT] = {
+
+    val javaAsyncFunction = new JavaAsyncFunction[IN, OUT] {
+      override def asyncInvoke(input: IN, collector: JavaAsyncCollector[OUT]): Unit = {
+        asyncFunction.asyncInvoke(input, new JavaAsyncCollectorWrapper[OUT](collector))
+      }
+    }
+
+    val outType : TypeInformation[OUT] = implicitly[TypeInformation[OUT]]
+
+    asScalaStream(JavaAsyncDataStream.orderedWait[IN, OUT](
+      input.javaStream,
+      javaAsyncFunction,
+      timeout,
+      timeUnit,
+      capacity).returns(outType))
+  }
+
+  /**
+    * Apply an asynchronous function on the input data stream. The output order is the same as the
+    * input order of the elements.
+    *
+    * @param input to apply the async function on
+    * @param asyncFunction to use
+    * @param timeout for the asynchronous operation to complete
+    * @param timeUnit of the timeout
+    * @tparam IN Type of the input record
+    * @tparam OUT Type of the output record
+    * @return the resulting stream containing the asynchronous results
+    */
+  def orderedWait[IN, OUT: TypeInformation](
+    input: DataStream[IN],
+    asyncFunction: AsyncFunction[IN, OUT],
+    timeout: Long,
+    timeUnit: TimeUnit)
+  : DataStream[OUT] = {
+
+    orderedWait(input, asyncFunction, timeout, timeUnit, DEFAULT_QUEUE_CAPACITY)
+  }
+
+  /**
+    * Apply an asynchronous function on the input data stream. The output order is the same as the
+    * input order of the elements.
+    *
+    * @param input to apply the async function on
+    * @param timeout for the asynchronous operation to complete
+    * @param timeUnit of the timeout
+    * @param capacity of the operator which is equivalent to the number of concurrent asynchronous
+    *                 operations
+    * @param asyncFunction to use
+    * @tparam IN Type of the input record
+    * @tparam OUT Type of the output record
+    * @return the resulting stream containing the asynchronous results
+    */
+  def orderedWait[IN, OUT: TypeInformation](
+    input: DataStream[IN],
+    timeout: Long,
+    timeUnit: TimeUnit,
+    capacity: Int) (
+    asyncFunction: (IN, AsyncCollector[OUT]) => Unit)
+  : DataStream[OUT] = {
+
+    Preconditions.checkNotNull(asyncFunction)
+
+    val cleanAsyncFunction = input.executionEnvironment.scalaClean(asyncFunction)
+
+    val func = new JavaAsyncFunction[IN, OUT] {
+      override def asyncInvoke(input: IN, collector: JavaAsyncCollector[OUT]): Unit = {
+        cleanAsyncFunction(input, new JavaAsyncCollectorWrapper[OUT](collector))
+      }
+    }
+
+    val outType : TypeInformation[OUT] = implicitly[TypeInformation[OUT]]
+
+    asScalaStream(JavaAsyncDataStream.orderedWait[IN, OUT](
+      input.javaStream,
+      func,
+      timeout,
+      timeUnit,
+      capacity).returns(outType))
+  }
+
+  /**
+    * Apply an asynchronous function on the input data stream. The output order is the same as the
+    * input order of the elements.
+    *
+    * @param input to apply the async function on
+    * @param timeout for the asynchronous operation to complete
+    * @param timeUnit of the timeout
+    * @param asyncFunction to use
+    * @tparam IN Type of the input record
+    * @tparam OUT Type of the output record
+    * @return the resulting stream containing the asynchronous results
+    */
+  def orderedWait[IN, OUT: TypeInformation](
+    input: DataStream[IN],
+    timeout: Long,
+    timeUnit: TimeUnit) (
+    asyncFunction: (IN, AsyncCollector[OUT]) => Unit)
+  : DataStream[OUT] = {
+
+    orderedWait(input, timeout, timeUnit, DEFAULT_QUEUE_CAPACITY)(asyncFunction)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bfdaa382/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncCollector.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncCollector.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncCollector.scala
new file mode 100644
index 0000000..a149c88
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncCollector.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.async
+
+import org.apache.flink.annotation.PublicEvolving
+
+/**
+  * The async collector collects data/errors from the user code while processing
+  * asynchronous I/O operations.
+  *
+  * @tparam OUT type of the output element
+  */
+@PublicEvolving
+trait AsyncCollector[OUT] {
+
+  /**
+    * Complete the async collector with a set of result elements.
+    *
+    * Note that it should be called for exactly one time in the user code.
+    * Calling this function for multiple times will cause data lose.
+    *
+    * Put all results in a [[Iterable]] and then issue AsyncCollector.collect(Iterable).
+    *
+    * @param result to complete the async collector with
+    */
+  def collect(result: Iterable[OUT])
+
+  /**
+    * Complete this async collector with an error.
+    *
+    * @param throwable to complete the async collector with
+    */
+  def collect(throwable: Throwable)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bfdaa382/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala
new file mode 100644
index 0000000..72e3702
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.async
+
+import org.apache.flink.annotation.PublicEvolving
+
+/**
+  * A function to trigger async I/O operations.
+  *
+  * For each asyncInvoke an async io operation can be triggered, and once it has been done,
+  * the result can be collected by calling AsyncCollector.collect. For each async operation, its
+  * context is stored in the operator immediately after invoking asyncInvoke, avoiding blocking for
+  * each stream input as long as the internal buffer is not full.
+  *
+  * [[AsyncCollector]] can be passed into callbacks or futures to collect the result data.
+  * An error can also be propagate to the async IO operator by
+  * [[AsyncCollector.collect(Throwable)]].
+  *
+  * @tparam IN The type of the input element
+  * @tparam OUT The type of the output elements
+  */
+@PublicEvolving
+trait AsyncFunction[IN, OUT] {
+
+  /**
+    * Trigger the async operation for each stream input
+    *
+    * @param input element coming from an upstream task
+    * @param collector to collect the result data
+    */
+  def asyncInvoke(input: IN, collector: AsyncCollector[OUT]): Unit
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bfdaa382/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/JavaAsyncCollectorWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/JavaAsyncCollectorWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/JavaAsyncCollectorWrapper.scala
new file mode 100644
index 0000000..3c5e95a
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/JavaAsyncCollectorWrapper.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.async
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.streaming.api.functions.async.collector.{AsyncCollector => JavaAsyncCollector}
+
+import scala.collection.JavaConverters._
+
+/**
+  * Internal wrapper class to map a Flink's Java API [[JavaAsyncCollector]] to a Scala
+  * [[AsyncCollector]].
+  *
+  * @param javaAsyncCollector to forward the calls to
+  * @tparam OUT type of the output elements
+  */
+@Internal
+class JavaAsyncCollectorWrapper[OUT](val javaAsyncCollector: JavaAsyncCollector[OUT])
+  extends AsyncCollector[OUT] {
+  override def collect(result: Iterable[OUT]): Unit = {
+    javaAsyncCollector.collect(result.asJavaCollection)
+  }
+
+  override def collect(throwable: Throwable): Unit = {
+    javaAsyncCollector.collect(throwable)
+  }
+}