You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by sunfulin <su...@163.com> on 2020/07/02 07:56:46 UTC

flink asynctablefunction调用异常

hi,
我在使用flink 1.10.1 blink planner,通过扩展tablesourcesinkfactory和asynctablefunction扩展了一个维表,维表会初始化并调用rpc服务。
遇到一个比较诡异的问题,作业在执行如下join的sql时,没有任何输出,等一段时间后,抛出了异常:Caused by : java.lang.Exception: Could not complete the stream element: org.apache.flink.table.dataformat.BinaryRow....  caused by : java.util.concurrent.TimeoutException: Async function call has timed out.


我开了debug日志,debug时在我的lookupfunction.eval里,可以正常调用rpc接口服务并future.complete,但是并不输出任何结果。不确定可能是啥原因。望指导。谢谢。

Re: flink asynctablefunction调用异常

Posted by Jark Wu <im...@gmail.com>.
可以分享下你的 AsyncTableFunction 的实现吗?

Best,
Jark

> 2020年7月2日 15:56,sunfulin <su...@163.com> 写道:
> 
> hi,
> 我在使用flink 1.10.1 blink planner,通过扩展tablesourcesinkfactory和asynctablefunction扩展了一个维表,维表会初始化并调用rpc服务。
> 遇到一个比较诡异的问题,作业在执行如下join的sql时,没有任何输出,等一段时间后,抛出了异常:Caused by : java.lang.Exception: Could not complete the stream element: org.apache.flink.table.dataformat.BinaryRow....  caused by : java.util.concurrent.TimeoutException: Async function call has timed out.
> 
> 
> 我开了debug日志,debug时在我的lookupfunction.eval里,可以正常调用rpc接口服务并future.complete,但是并不输出任何结果。不确定可能是啥原因。望指导。谢谢。


Re:Re:flink asynctablefunction调用异常

Posted by sunfulin <su...@163.com>.


hi
抱歉忘记回复了。经过进一步调试发现,是因为定义的schema的column类型,与实际获取到的字段类型不一致导致。主要是在调试的过程中,ComplettedFuture.complete会吃掉这种类型不一致的异常,也不下发数据。看源码发现只会在timeout的时候才调用future.completeException。记录下。














在 2020-07-03 17:01:19,"forideal" <fs...@163.com> 写道:
>Hi sunfulin:
>     
>      我这么实现是可以的。
>public void eval(CompletableFuture<Collection<Row>> result, String key) {
>    executorService.submit(() -> {
>try {
>Row row = fetchdata(key);
>            if (row != null) {
>result.complete(Collections.singletonList(row));
>} else {
>result.complete(Collections.singletonList(new Row(this.fieldNames.length)));
>}
>        } catch (Exception e) {
>result.complete(Collections.singletonList(new Row(this.fieldNames.length)));
>}
>    });
>}
>
>
>
>
>Best forideal.
>
>
>
>
>
>在 2020-07-02 15:56:46,"sunfulin" <su...@163.com> 写道:
>>hi,
>>我在使用flink 1.10.1 blink planner,通过扩展tablesourcesinkfactory和asynctablefunction扩展了一个维表,维表会初始化并调用rpc服务。
>>遇到一个比较诡异的问题,作业在执行如下join的sql时,没有任何输出,等一段时间后,抛出了异常:Caused by : java.lang.Exception: Could not complete the stream element: org.apache.flink.table.dataformat.BinaryRow....  caused by : java.util.concurrent.TimeoutException: Async function call has timed out.
>>
>>
>>我开了debug日志,debug时在我的lookupfunction.eval里,可以正常调用rpc接口服务并future.complete,但是并不输出任何结果。不确定可能是啥原因。望指导。谢谢。

Re:flink asynctablefunction调用异常

Posted by forideal <fs...@163.com>.
Hi sunfulin:
     
      我这么实现是可以的。
public void eval(CompletableFuture<Collection<Row>> result, String key) {
    executorService.submit(() -> {
try {
Row row = fetchdata(key);
            if (row != null) {
result.complete(Collections.singletonList(row));
} else {
result.complete(Collections.singletonList(new Row(this.fieldNames.length)));
}
        } catch (Exception e) {
result.complete(Collections.singletonList(new Row(this.fieldNames.length)));
}
    });
}




Best forideal.





在 2020-07-02 15:56:46,"sunfulin" <su...@163.com> 写道:
>hi,
>我在使用flink 1.10.1 blink planner,通过扩展tablesourcesinkfactory和asynctablefunction扩展了一个维表,维表会初始化并调用rpc服务。
>遇到一个比较诡异的问题,作业在执行如下join的sql时,没有任何输出,等一段时间后,抛出了异常:Caused by : java.lang.Exception: Could not complete the stream element: org.apache.flink.table.dataformat.BinaryRow....  caused by : java.util.concurrent.TimeoutException: Async function call has timed out.
>
>
>我开了debug日志,debug时在我的lookupfunction.eval里,可以正常调用rpc接口服务并future.complete,但是并不输出任何结果。不确定可能是啥原因。望指导。谢谢。