You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 王敏超 <mi...@163.com> on 2020/09/29 09:43:18 UTC

使用异步IO时,数据写入到capacity数后,卡住不再消费source端数据了。

 AsyncDataStream
      //顺序异步IO
      .orderedWait(input, new AsyncDatabaseRequest(), 5000,
TimeUnit.MILLISECONDS, 1000)

  当我没重写timeout方法的时候,会执行这个报错信息 
resultFuture.completeExceptionally(new TimeoutException("Async function call
has timed out."))


  当我重写了timeout方法,如下,程序就卡住了,求大佬解答。
  override def timeout(input: String, resultFuture: ResultFuture[Int]): Unit
= {
    println("time out ... ")
  }




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 使用异步IO时,数据写入到capacity数后,卡住不再消费source端数据了。

Posted by 王敏超 <mi...@163.com>.
嗯嗯,是的。安装大佬的方法,的确成功了。再次感谢大佬。。。。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 使用异步IO时,数据写入到capacity数后,卡住不再消费source端数据了。

Posted by Benchao Li <li...@apache.org>.
你的timeout方法应该要正确的处理ResultFuture,
比如ResultFuture.complete或者completeExceptionally,如果你什么都没做,那么这个异步请求就还没有真的结束。

王敏超 <mi...@163.com> 于2020年9月29日周二 下午5:43写道:

>  AsyncDataStream
>       //顺序异步IO
>       .orderedWait(input, new AsyncDatabaseRequest(), 5000,
> TimeUnit.MILLISECONDS, 1000)
>
>   当我没重写timeout方法的时候,会执行这个报错信息
> resultFuture.completeExceptionally(new TimeoutException("Async function
> call
> has timed out."))
>
>
>   当我重写了timeout方法,如下,程序就卡住了,求大佬解答。
>   override def timeout(input: String, resultFuture: ResultFuture[Int]):
> Unit
> = {
>     println("time out ... ")
>   }
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li