You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kisimple <gi...@git.apache.org> on 2018/05/29 07:38:06 UTC
[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...
GitHub user kisimple opened a pull request:
https://github.com/apache/flink/pull/6091
[FLINK-7789][DataStream API] Add handler for Async IO operator timeouts
## What is the purpose of the change
Currently Async IO operator does not provide a mechanism to handle timeouts. This PR fixs the problem by adding a `timeout` method to `AsyncFunction`
## Brief change log
- Add a `timeout` method to `AsyncFunction` for both Java and Scala API
- Update `AsyncWaitOperator` to invoke `AsyncFunction#timeout` for Java API
- Update `AsyncDataStream` to invoke `AsyncFunction#timeout` for Scala API
## Verifying this change
- Add tests to `AsyncWaitOperatorTest`
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (yes)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (yes)
- If yes, how is the feature documented? (docs / JavaDocs)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/kisimple/flink FLINK-7789
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6091.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6091
----
commit ccf1e55e09e94c3f7749b937594425a435f81184
Author: blueszheng <ki...@...>
Date: 2018-05-10T06:41:34Z
[FLINK-7789] Add handler for Async IO operator timeouts
----
---
[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...
Posted by kisimple <gi...@git.apache.org>.
Github user kisimple commented on a diff in the pull request:
https://github.com/apache/flink/pull/6091#discussion_r191834614
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ---
@@ -648,6 +662,52 @@ public void testAsyncTimeout() throws Exception {
ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(), TimeoutException.class);
}
+ @Test
+ public void testAsyncTimeoutAware() throws Exception {
--- End diff --
Good point :)
---
[GitHub] flink issue #6091: [FLINK-7789][DataStream API] Add handler for Async IO ope...
Posted by pranjal0811 <gi...@git.apache.org>.
Github user pranjal0811 commented on the issue:
https://github.com/apache/flink/pull/6091
Hi Team,
Would this feature be included in the Flink 1.5.1 version?
Cheers,
Pranjal
---
[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6091#discussion_r191760364
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ---
@@ -212,6 +212,20 @@ public static void countDown() {
}
}
+ private static class TimeoutAwareLazyAsyncFunction extends LazyAsyncFunction {
+ private static final long serialVersionUID = 1428714561365346128L;
+
+ @Override
+ public void timeout(Integer input, ResultFuture<Integer> resultFuture) throws Exception {
+ if (input != null && input % 2 == 0) {
+ resultFuture.complete(Collections.singletonList(input * 3));
+ } else {
+ // ignore odd input number when it timeouts
--- End diff --
Move this comment to the top of this static class?
---
[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...
Posted by kisimple <gi...@git.apache.org>.
Github user kisimple commented on a diff in the pull request:
https://github.com/apache/flink/pull/6091#discussion_r192001515
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ---
@@ -648,6 +662,52 @@ public void testAsyncTimeout() throws Exception {
ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(), TimeoutException.class);
}
+ @Test
+ public void testAsyncTimeoutAware() throws Exception {
--- End diff --
Updated as suggested, plz have a look :)
---
[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6091#discussion_r191757079
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ---
@@ -212,6 +212,20 @@ public static void countDown() {
}
}
+ private static class TimeoutAwareLazyAsyncFunction extends LazyAsyncFunction {
--- End diff --
rename to `IgnoreTimeoutLazyAsyncFunction` ?
---
[GitHub] flink issue #6091: [FLINK-7789][DataStream API] Add handler for Async IO ope...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:
https://github.com/apache/flink/pull/6091
Test failure is most likely unrelated and I have crated separate ticket for it: https://issues.apache.org/jira/browse/FLINK-9481
---
[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6091#discussion_r191768672
--- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala ---
@@ -46,4 +48,16 @@ trait AsyncFunction[IN, OUT] extends Function {
* @param resultFuture to be completed with the result data
*/
def asyncInvoke(input: IN, resultFuture: ResultFuture[OUT]): Unit
+
+ /**
+ * [[AsyncFunction.asyncInvoke]] timeout occurred.
+ * By default, the result future is exceptionally completed with timeout exception.
+ *
+ * @param input element coming from an upstream task
+ * @param resultFuture to be completed with the result data
+ */
+ def timeout(input: IN, resultFuture: ResultFuture[OUT]): Unit = {
+ resultFuture.completeExceptionally(new TimeoutException("Async function call has timed out."))
--- End diff --
same question about the tests.
---
[GitHub] flink issue #6091: [FLINK-7789][DataStream API] Add handler for Async IO ope...
Posted by kisimple <gi...@git.apache.org>.
Github user kisimple commented on the issue:
https://github.com/apache/flink/pull/6091
cc @pnowojski
---
[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6091#discussion_r192026724
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ---
@@ -648,6 +662,52 @@ public void testAsyncTimeout() throws Exception {
ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(), TimeoutException.class);
}
+ @Test
+ public void testAsyncTimeoutAware() throws Exception {
--- End diff --
Looks great :)
---
[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6091#discussion_r192319359
--- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ---
@@ -209,8 +208,7 @@ public void processElement(StreamRecord<IN> element) throws Exception {
new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
- streamRecordBufferEntry.completeExceptionally(
- new TimeoutException("Async function call has timed out."));
+ userFunction.timeout(element.getValue(), streamRecordBufferEntry);
--- End diff --
I have one more (possibly major) worry regarding thread safety. Here we are passing the `element.getValue()` object to a different thread. I wonder if this could brake something.
`userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);` at least allows user to preprocess the element before handing it over to a different thread.
CC @aljoscha ?
---
[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...
Posted by kisimple <gi...@git.apache.org>.
Github user kisimple commented on a diff in the pull request:
https://github.com/apache/flink/pull/6091#discussion_r192171522
--- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala ---
@@ -71,6 +71,9 @@ object AsyncDataStream {
override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
asyncFunction.asyncInvoke(input, new JavaResultFutureWrapper(resultFuture))
}
+ override def timeout(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
--- End diff --
I have added a test for `AsyncDataStream.scala`, plz review and see if it is sufficient :)
---
[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6091#discussion_r191767070
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ---
@@ -648,6 +662,52 @@ public void testAsyncTimeout() throws Exception {
ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(), TimeoutException.class);
}
+ @Test
+ public void testAsyncTimeoutAware() throws Exception {
--- End diff --
Please deduplicate the code of this method with `testAsyncTimeout()` to sth like that:
```
@Test
public void testAsyncTimeoutFailure() throws Exception {
testAsyncTimeout(
new LazyAsyncFunction()
Optional.of(TimeoutException.class),
new StreamRecord<>(2, 5L));
}
public void testAsyncTimeoutIgnore() throws Exception {
testAsyncTimeout(
new IgnoreTimeoutLazyAsyncFunction()
Optional.of(TimeoutException.class),
new StreamRecord<>(6, 0L),
new StreamRecord<>(4, 5L));
}
private void testAsyncTimeout(
Optional<Class<?>> expectedException,
StreamRecord<Integer>... expectedRecords) throws Exception {
// your current testAsyncTimeoutAware method body adjusted to above parameters
}
```
or sth similar.
---
[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6091#discussion_r192028605
--- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala ---
@@ -71,6 +71,9 @@ object AsyncDataStream {
override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
asyncFunction.asyncInvoke(input, new JavaResultFutureWrapper(resultFuture))
}
+ override def timeout(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
--- End diff --
You are right, there are none :( They are definitely missing, however simple this code is. I know this is painful but still I would insist on adding at least some rudimentary test coverage for the new code. As it is now, if someone would change the below invocation to `asyncFunction.timeout(null, null)` we wouldn't have known.
Let's try to improve the situation here a little bit. The new test doesn't have to duplicate the test cases from `AsyncWaitOperatorTest.java`, but some most basic one that checks that expected arguments are being passed will suffice.
---
[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/6091
---
[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6091#discussion_r192316418
--- Diff: flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.scala.AsyncDataStreamITCase._
+import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}
+import org.apache.flink.test.util.AbstractTestBase
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+
+object AsyncDataStreamITCase {
+ val timeout = 1000L
+ private var testResult: mutable.ArrayBuffer[Int] = _
+}
+
+class AsyncDataStreamITCase extends AbstractTestBase {
+
+ @Test
+ def testOrderedWait(): Unit = {
+ testAsyncWait(true)
+ }
+
+ @Test
+ def testUnorderedWait(): Unit = {
+ testAsyncWait(false)
+ }
+
+ private def testAsyncWait(ordered: Boolean): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(1)
+
+ val source = env.addSource(new SourceFunction[Int]() {
+ override def run(ctx: SourceFunction.SourceContext[Int]) {
+ ctx.collect(1)
+ ctx.collect(2)
+ }
+ override def cancel() {}
+ })
+
+ val asyncMapped = if (ordered) {
+ AsyncDataStream.orderedWait(
+ source, new MyAsyncFunction(), timeout, TimeUnit.MILLISECONDS)
+ } else {
+ AsyncDataStream.unorderedWait(
+ source, new MyAsyncFunction(), timeout, TimeUnit.MILLISECONDS)
+ }
+
+ testResult = mutable.ArrayBuffer[Int]()
--- End diff --
can you extract the lines between 71 and 85 to a private function and deduplicate them with `testAsyncWaitUsingAnonymousFunction`?
---
[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6091#discussion_r192316460
--- Diff: flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.scala.AsyncDataStreamITCase._
+import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}
+import org.apache.flink.test.util.AbstractTestBase
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+
+object AsyncDataStreamITCase {
+ val timeout = 1000L
+ private var testResult: mutable.ArrayBuffer[Int] = _
+}
+
+class AsyncDataStreamITCase extends AbstractTestBase {
+
+ @Test
+ def testOrderedWait(): Unit = {
+ testAsyncWait(true)
+ }
+
+ @Test
+ def testUnorderedWait(): Unit = {
+ testAsyncWait(false)
+ }
+
+ private def testAsyncWait(ordered: Boolean): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(1)
+
+ val source = env.addSource(new SourceFunction[Int]() {
+ override def run(ctx: SourceFunction.SourceContext[Int]) {
+ ctx.collect(1)
+ ctx.collect(2)
+ }
+ override def cancel() {}
+ })
+
+ val asyncMapped = if (ordered) {
+ AsyncDataStream.orderedWait(
+ source, new MyAsyncFunction(), timeout, TimeUnit.MILLISECONDS)
+ } else {
+ AsyncDataStream.unorderedWait(
+ source, new MyAsyncFunction(), timeout, TimeUnit.MILLISECONDS)
+ }
+
+ testResult = mutable.ArrayBuffer[Int]()
+ asyncMapped.addSink(new SinkFunction[Int]() {
+ override def invoke(value: Int) {
+ testResult += value
+ }
+ })
+
+ env.execute("testAsyncWait")
+
+ val expectedResult = mutable.ArrayBuffer[Int](2, 6)
+ if (ordered) {
+ assertEquals(expectedResult, testResult)
+ } else {
+ assertEquals(expectedResult, testResult.sorted)
+ }
+ }
+
+ @Test
+ def testOrderedWaitUsingAnonymousFunction(): Unit = {
+ testAsyncWaitUsingAnonymousFunction(true)
+ }
+
+ @Test
+ def testUnorderedWaitUsingAnonymousFunction(): Unit = {
+ testAsyncWaitUsingAnonymousFunction(false)
+ }
+
+ private def testAsyncWaitUsingAnonymousFunction(ordered: Boolean): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(1)
+
+ val source = env.addSource(new SourceFunction[Int]() {
--- End diff --
`env.fromElements(1, 2)`
---
[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...
Posted by kisimple <gi...@git.apache.org>.
Github user kisimple commented on a diff in the pull request:
https://github.com/apache/flink/pull/6091#discussion_r191835482
--- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala ---
@@ -71,6 +71,9 @@ object AsyncDataStream {
override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
asyncFunction.asyncInvoke(input, new JavaResultFutureWrapper(resultFuture))
}
+ override def timeout(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
--- End diff --
I haven't found any tests for `AsyncDataStream.scala` or `AsyncFunction.scala`, I am not sure whether it is missing or unnecessary. What do you think?
---
[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6091#discussion_r192316223
--- Diff: flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.scala.AsyncDataStreamITCase._
+import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}
+import org.apache.flink.test.util.AbstractTestBase
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+
+object AsyncDataStreamITCase {
+ val timeout = 1000L
+ private var testResult: mutable.ArrayBuffer[Int] = _
+}
+
+class AsyncDataStreamITCase extends AbstractTestBase {
+
+ @Test
+ def testOrderedWait(): Unit = {
+ testAsyncWait(true)
+ }
+
+ @Test
+ def testUnorderedWait(): Unit = {
+ testAsyncWait(false)
+ }
+
+ private def testAsyncWait(ordered: Boolean): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(1)
+
+ val source = env.addSource(new SourceFunction[Int]() {
--- End diff --
`env.fromElements(1, 2)`
---
[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...
Posted by kisimple <gi...@git.apache.org>.
Github user kisimple commented on a diff in the pull request:
https://github.com/apache/flink/pull/6091#discussion_r192329480
--- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ---
@@ -209,8 +208,7 @@ public void processElement(StreamRecord<IN> element) throws Exception {
new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
- streamRecordBufferEntry.completeExceptionally(
- new TimeoutException("Async function call has timed out."));
+ userFunction.timeout(element.getValue(), streamRecordBufferEntry);
--- End diff --
Highly appreciate all the reviews :) I have pushed the fixup.
---
[GitHub] flink issue #6091: [FLINK-7789][DataStream API] Add handler for Async IO ope...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:
https://github.com/apache/flink/pull/6091
Looking at the git history it looks like it's there.
---
[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6091#discussion_r191768245
--- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala ---
@@ -71,6 +71,9 @@ object AsyncDataStream {
override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
asyncFunction.asyncInvoke(input, new JavaResultFutureWrapper(resultFuture))
}
+ override def timeout(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
--- End diff --
Are those changes in `AsyncDataStream.scala` somewhere covered by the tests? If you comment out their method bodies, does at least one test fails?
---