You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kisimple <gi...@git.apache.org> on 2018/05/29 07:38:06 UTC

[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...

GitHub user kisimple opened a pull request:

    https://github.com/apache/flink/pull/6091

    [FLINK-7789][DataStream API] Add handler for Async IO operator timeouts

    ## What is the purpose of the change
    
    Currently Async IO operator does not provide a mechanism to handle timeouts. This PR fixs the problem by adding a `timeout` method to `AsyncFunction`
    
    ## Brief change log
    
      - Add a `timeout` method to `AsyncFunction` for both Java and Scala API
      - Update `AsyncWaitOperator` to invoke `AsyncFunction#timeout` for Java API
      - Update `AsyncDataStream` to invoke `AsyncFunction#timeout` for Scala API
    
    ## Verifying this change
    
      - Add tests to `AsyncWaitOperatorTest`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (yes)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes)
      - If yes, how is the feature documented? (docs / JavaDocs)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kisimple/flink FLINK-7789

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6091.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6091
    
----
commit ccf1e55e09e94c3f7749b937594425a435f81184
Author: blueszheng <ki...@...>
Date:   2018-05-10T06:41:34Z

    [FLINK-7789] Add handler for Async IO operator timeouts

----


---

[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...

Posted by kisimple <gi...@git.apache.org>.
Github user kisimple commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6091#discussion_r191834614
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ---
    @@ -648,6 +662,52 @@ public void testAsyncTimeout() throws Exception {
     		ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(), TimeoutException.class);
     	}
     
    +	@Test
    +	public void testAsyncTimeoutAware() throws Exception {
    --- End diff --
    
    Good point :)


---

[GitHub] flink issue #6091: [FLINK-7789][DataStream API] Add handler for Async IO ope...

Posted by pranjal0811 <gi...@git.apache.org>.
Github user pranjal0811 commented on the issue:

    https://github.com/apache/flink/pull/6091
  
    Hi Team,
    
    Would this feature be included in the Flink 1.5.1 version?
    
    Cheers,
    Pranjal


---

[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6091#discussion_r191760364
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ---
    @@ -212,6 +212,20 @@ public static void countDown() {
     		}
     	}
     
    +	private static class TimeoutAwareLazyAsyncFunction extends LazyAsyncFunction {
    +		private static final long serialVersionUID = 1428714561365346128L;
    +
    +		@Override
    +		public void timeout(Integer input, ResultFuture<Integer> resultFuture) throws Exception {
    +			if (input != null && input % 2 == 0) {
    +				resultFuture.complete(Collections.singletonList(input * 3));
    +			} else {
    +				// ignore odd input number when it timeouts
    --- End diff --
    
    Move this comment to the top of this static class?


---

[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...

Posted by kisimple <gi...@git.apache.org>.
Github user kisimple commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6091#discussion_r192001515
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ---
    @@ -648,6 +662,52 @@ public void testAsyncTimeout() throws Exception {
     		ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(), TimeoutException.class);
     	}
     
    +	@Test
    +	public void testAsyncTimeoutAware() throws Exception {
    --- End diff --
    
    Updated as suggested, plz have a look :)


---

[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6091#discussion_r191757079
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ---
    @@ -212,6 +212,20 @@ public static void countDown() {
     		}
     	}
     
    +	private static class TimeoutAwareLazyAsyncFunction extends LazyAsyncFunction {
    --- End diff --
    
    rename to `IgnoreTimeoutLazyAsyncFunction` ?


---

[GitHub] flink issue #6091: [FLINK-7789][DataStream API] Add handler for Async IO ope...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/6091
  
    Test failure is most likely unrelated and I have crated separate ticket for it: https://issues.apache.org/jira/browse/FLINK-9481


---

[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6091#discussion_r191768672
  
    --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala ---
    @@ -46,4 +48,16 @@ trait AsyncFunction[IN, OUT] extends Function {
         * @param resultFuture to be completed with the result data
         */
       def asyncInvoke(input: IN, resultFuture: ResultFuture[OUT]): Unit
    +
    +  /**
    +    * [[AsyncFunction.asyncInvoke]] timeout occurred.
    +    * By default, the result future is exceptionally completed with timeout exception.
    +    *
    +    * @param input element coming from an upstream task
    +    * @param resultFuture to be completed with the result data
    +    */
    +  def timeout(input: IN, resultFuture: ResultFuture[OUT]): Unit = {
    +    resultFuture.completeExceptionally(new TimeoutException("Async function call has timed out."))
    --- End diff --
    
    same question about the tests.


---

[GitHub] flink issue #6091: [FLINK-7789][DataStream API] Add handler for Async IO ope...

Posted by kisimple <gi...@git.apache.org>.
Github user kisimple commented on the issue:

    https://github.com/apache/flink/pull/6091
  
    cc @pnowojski 


---

[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6091#discussion_r192026724
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ---
    @@ -648,6 +662,52 @@ public void testAsyncTimeout() throws Exception {
     		ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(), TimeoutException.class);
     	}
     
    +	@Test
    +	public void testAsyncTimeoutAware() throws Exception {
    --- End diff --
    
    Looks great :)


---

[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6091#discussion_r192319359
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ---
    @@ -209,8 +208,7 @@ public void processElement(StreamRecord<IN> element) throws Exception {
     				new ProcessingTimeCallback() {
     					@Override
     					public void onProcessingTime(long timestamp) throws Exception {
    -						streamRecordBufferEntry.completeExceptionally(
    -							new TimeoutException("Async function call has timed out."));
    +						userFunction.timeout(element.getValue(), streamRecordBufferEntry);
    --- End diff --
    
    I have one more (possibly major) worry regarding thread safety. Here we are passing the `element.getValue()` object to a different thread. I wonder if this could brake something.
    
    `userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);` at least allows user to preprocess the element before handing it over to a different thread.
    
    CC @aljoscha ?


---

[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...

Posted by kisimple <gi...@git.apache.org>.
Github user kisimple commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6091#discussion_r192171522
  
    --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala ---
    @@ -71,6 +71,9 @@ object AsyncDataStream {
           override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
             asyncFunction.asyncInvoke(input, new JavaResultFutureWrapper(resultFuture))
           }
    +      override def timeout(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
    --- End diff --
    
    I have added a test for `AsyncDataStream.scala`, plz review and see if it is sufficient :)


---

[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6091#discussion_r191767070
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ---
    @@ -648,6 +662,52 @@ public void testAsyncTimeout() throws Exception {
     		ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(), TimeoutException.class);
     	}
     
    +	@Test
    +	public void testAsyncTimeoutAware() throws Exception {
    --- End diff --
    
    Please deduplicate the code of this method with `testAsyncTimeout()` to sth like that:
    
    ```
    @Test
    public void testAsyncTimeoutFailure() throws Exception {
    	testAsyncTimeout(
    		new LazyAsyncFunction()
    		Optional.of(TimeoutException.class),
    		new StreamRecord<>(2, 5L));
    }
    
    public void testAsyncTimeoutIgnore() throws Exception {
    	testAsyncTimeout(
    		new IgnoreTimeoutLazyAsyncFunction()
    		Optional.of(TimeoutException.class),
    		new StreamRecord<>(6, 0L),
    		new StreamRecord<>(4, 5L));
    }
    
    private void testAsyncTimeout(
    		Optional<Class<?>> expectedException,
    		StreamRecord<Integer>... expectedRecords) throws Exception {
    	// your current testAsyncTimeoutAware method body adjusted to above parameters
    }
    ```
    
    or sth similar.


---

[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6091#discussion_r192028605
  
    --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala ---
    @@ -71,6 +71,9 @@ object AsyncDataStream {
           override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
             asyncFunction.asyncInvoke(input, new JavaResultFutureWrapper(resultFuture))
           }
    +      override def timeout(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
    --- End diff --
    
    You are right, there are none :( They are definitely missing, however simple this code is. I know this is painful but still I would insist on adding at least some rudimentary test coverage for the new code. As it is now, if someone would change the below invocation to `asyncFunction.timeout(null, null)` we wouldn't have known. 
    
    Let's try to improve the situation here a little bit. The new test doesn't have to duplicate the test cases from `AsyncWaitOperatorTest.java`, but some most basic one that checks that expected arguments are being passed will suffice.


---

[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6091


---

[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6091#discussion_r192316418
  
    --- Diff: flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala ---
    @@ -0,0 +1,157 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.scala
    +
    +import java.util.concurrent.TimeUnit
    +
    +import org.apache.flink.streaming.api.functions.sink.SinkFunction
    +import org.apache.flink.streaming.api.functions.source.SourceFunction
    +import org.apache.flink.streaming.api.scala.AsyncDataStreamITCase._
    +import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}
    +import org.apache.flink.test.util.AbstractTestBase
    +import org.junit.Assert._
    +import org.junit.Test
    +
    +import scala.collection.mutable
    +import scala.concurrent.{ExecutionContext, Future}
    +
    +object AsyncDataStreamITCase {
    +  val timeout = 1000L
    +  private var testResult: mutable.ArrayBuffer[Int] = _
    +}
    +
    +class AsyncDataStreamITCase extends AbstractTestBase {
    +
    +  @Test
    +  def testOrderedWait(): Unit = {
    +    testAsyncWait(true)
    +  }
    +
    +  @Test
    +  def testUnorderedWait(): Unit = {
    +    testAsyncWait(false)
    +  }
    +
    +  private def testAsyncWait(ordered: Boolean): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setParallelism(1)
    +
    +    val source = env.addSource(new SourceFunction[Int]() {
    +      override def run(ctx: SourceFunction.SourceContext[Int]) {
    +        ctx.collect(1)
    +        ctx.collect(2)
    +      }
    +      override def cancel() {}
    +    })
    +
    +    val asyncMapped = if (ordered) {
    +      AsyncDataStream.orderedWait(
    +        source, new MyAsyncFunction(), timeout, TimeUnit.MILLISECONDS)
    +    } else {
    +      AsyncDataStream.unorderedWait(
    +        source, new MyAsyncFunction(), timeout, TimeUnit.MILLISECONDS)
    +    }
    +
    +    testResult = mutable.ArrayBuffer[Int]()
    --- End diff --
    
    can you extract the lines between 71 and 85 to a private function and deduplicate them with `testAsyncWaitUsingAnonymousFunction`?


---

[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6091#discussion_r192316460
  
    --- Diff: flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala ---
    @@ -0,0 +1,157 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.scala
    +
    +import java.util.concurrent.TimeUnit
    +
    +import org.apache.flink.streaming.api.functions.sink.SinkFunction
    +import org.apache.flink.streaming.api.functions.source.SourceFunction
    +import org.apache.flink.streaming.api.scala.AsyncDataStreamITCase._
    +import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}
    +import org.apache.flink.test.util.AbstractTestBase
    +import org.junit.Assert._
    +import org.junit.Test
    +
    +import scala.collection.mutable
    +import scala.concurrent.{ExecutionContext, Future}
    +
    +object AsyncDataStreamITCase {
    +  val timeout = 1000L
    +  private var testResult: mutable.ArrayBuffer[Int] = _
    +}
    +
    +class AsyncDataStreamITCase extends AbstractTestBase {
    +
    +  @Test
    +  def testOrderedWait(): Unit = {
    +    testAsyncWait(true)
    +  }
    +
    +  @Test
    +  def testUnorderedWait(): Unit = {
    +    testAsyncWait(false)
    +  }
    +
    +  private def testAsyncWait(ordered: Boolean): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setParallelism(1)
    +
    +    val source = env.addSource(new SourceFunction[Int]() {
    +      override def run(ctx: SourceFunction.SourceContext[Int]) {
    +        ctx.collect(1)
    +        ctx.collect(2)
    +      }
    +      override def cancel() {}
    +    })
    +
    +    val asyncMapped = if (ordered) {
    +      AsyncDataStream.orderedWait(
    +        source, new MyAsyncFunction(), timeout, TimeUnit.MILLISECONDS)
    +    } else {
    +      AsyncDataStream.unorderedWait(
    +        source, new MyAsyncFunction(), timeout, TimeUnit.MILLISECONDS)
    +    }
    +
    +    testResult = mutable.ArrayBuffer[Int]()
    +    asyncMapped.addSink(new SinkFunction[Int]() {
    +      override def invoke(value: Int) {
    +        testResult += value
    +      }
    +    })
    +
    +    env.execute("testAsyncWait")
    +
    +    val expectedResult = mutable.ArrayBuffer[Int](2, 6)
    +    if (ordered) {
    +      assertEquals(expectedResult, testResult)
    +    } else {
    +      assertEquals(expectedResult, testResult.sorted)
    +    }
    +  }
    +
    +  @Test
    +  def testOrderedWaitUsingAnonymousFunction(): Unit = {
    +    testAsyncWaitUsingAnonymousFunction(true)
    +  }
    +
    +  @Test
    +  def testUnorderedWaitUsingAnonymousFunction(): Unit = {
    +    testAsyncWaitUsingAnonymousFunction(false)
    +  }
    +
    +  private def testAsyncWaitUsingAnonymousFunction(ordered: Boolean): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setParallelism(1)
    +
    +    val source = env.addSource(new SourceFunction[Int]() {
    --- End diff --
    
    `env.fromElements(1, 2)`


---

[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...

Posted by kisimple <gi...@git.apache.org>.
Github user kisimple commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6091#discussion_r191835482
  
    --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala ---
    @@ -71,6 +71,9 @@ object AsyncDataStream {
           override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
             asyncFunction.asyncInvoke(input, new JavaResultFutureWrapper(resultFuture))
           }
    +      override def timeout(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
    --- End diff --
    
    I haven't found any tests for `AsyncDataStream.scala` or `AsyncFunction.scala`, I am not sure whether it is missing or unnecessary. What do you think?


---

[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6091#discussion_r192316223
  
    --- Diff: flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala ---
    @@ -0,0 +1,157 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.scala
    +
    +import java.util.concurrent.TimeUnit
    +
    +import org.apache.flink.streaming.api.functions.sink.SinkFunction
    +import org.apache.flink.streaming.api.functions.source.SourceFunction
    +import org.apache.flink.streaming.api.scala.AsyncDataStreamITCase._
    +import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}
    +import org.apache.flink.test.util.AbstractTestBase
    +import org.junit.Assert._
    +import org.junit.Test
    +
    +import scala.collection.mutable
    +import scala.concurrent.{ExecutionContext, Future}
    +
    +object AsyncDataStreamITCase {
    +  val timeout = 1000L
    +  private var testResult: mutable.ArrayBuffer[Int] = _
    +}
    +
    +class AsyncDataStreamITCase extends AbstractTestBase {
    +
    +  @Test
    +  def testOrderedWait(): Unit = {
    +    testAsyncWait(true)
    +  }
    +
    +  @Test
    +  def testUnorderedWait(): Unit = {
    +    testAsyncWait(false)
    +  }
    +
    +  private def testAsyncWait(ordered: Boolean): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setParallelism(1)
    +
    +    val source = env.addSource(new SourceFunction[Int]() {
    --- End diff --
    
    `env.fromElements(1, 2)`


---

[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...

Posted by kisimple <gi...@git.apache.org>.
Github user kisimple commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6091#discussion_r192329480
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ---
    @@ -209,8 +208,7 @@ public void processElement(StreamRecord<IN> element) throws Exception {
     				new ProcessingTimeCallback() {
     					@Override
     					public void onProcessingTime(long timestamp) throws Exception {
    -						streamRecordBufferEntry.completeExceptionally(
    -							new TimeoutException("Async function call has timed out."));
    +						userFunction.timeout(element.getValue(), streamRecordBufferEntry);
    --- End diff --
    
    Highly appreciate all the reviews :) I have pushed the fixup.


---

[GitHub] flink issue #6091: [FLINK-7789][DataStream API] Add handler for Async IO ope...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/6091
  
    Looking at the git history it looks like it's there.


---

[GitHub] flink pull request #6091: [FLINK-7789][DataStream API] Add handler for Async...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6091#discussion_r191768245
  
    --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala ---
    @@ -71,6 +71,9 @@ object AsyncDataStream {
           override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
             asyncFunction.asyncInvoke(input, new JavaResultFutureWrapper(resultFuture))
           }
    +      override def timeout(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
    --- End diff --
    
    Are those changes in `AsyncDataStream.scala` somewhere covered by the tests? If you comment out their method bodies, does at least one test fails?


---