You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by kenyore <wo...@gmail.com> on 2021/01/12 06:38:32 UTC

Flink SQL 可以使用异步IO特性吗

hi,all
  我有一个场景是使用自定义的ScalaFunction实现所需要的look up功能(从数据库查询并将多行数据拼成一个数组)。
  我正在试图尝试使用异步IO的方式以提高它的性能,但是似乎只有Stream API提供了该特性支持。
  大家有什么建议吗?或者有其他优化思路吗?
  谢谢!



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink SQL 可以使用异步IO特性吗

Posted by zelin jin <fr...@gmail.com>.
hello,kenyore.
我大致了解了你的意思,你可以通过继承AsyncTableFunction的方式实现数据库异步IO。

公共抽象类AsyncTableFunction <T> 扩展了UserDefinedFunction
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/UserDefinedFunction.html>

AsyncTableFunction
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/AsyncTableFunction.html>可以通过实现自定义评估方法来定义a的行为。评估方法必须公开声明,而不是静态声明,并命名为“
eval”。评估方法也可以通过实现多个名为“ eval”的方法来重载。

对于每个“ eval”,都可以触发一个异步io操作,一旦完成,就可以通过调用来收集结果CompletableFuture.complete(T)
<http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CompletableFuture.html?is-external=true#complete-T->。对于每个异步操作,调用“
eval”后,其上下文将立即存储在运算符中,从而避免在内部缓冲区未满的情况下阻塞输入的每个流。

代码示例:

   public void eval(CompletableFuture<Collection<String>> result,
String rowkey) {
            Get get = new Get(Bytes.toBytes(rowkey));
            ListenableFuture<Result> future = hbase.asyncGet(get);
            Futures.addCallback(future, new FutureCallback<Result>() {
                public void onSuccess(Result result) {
                    List<String> ret = process(result);
                    result.complete(ret);
                }

                public void onFailure(Throwable thrown) {
                    result.completeExceptionally(thrown);
                }
            });
        }

参考链接:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/AsyncTableFunction.html
kenyore <wo...@gmail.com> 于2021年1月12日周二 下午3:29写道:

> 感谢如此详尽的回复!
> 但是我的场景似乎无法直接使用维表join。
> 因为我需要把look up的结果(会是多行数据),拼成一个数组放入数据行。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink SQL 可以使用异步IO特性吗

Posted by zelin jin <fr...@gmail.com>.
hello,kenyore.
我大致了解了你的意思,你可以通过继承AsyncTableFunction的方式实现数据库异步IO。

公共抽象类AsyncTableFunction <T> 扩展了UserDefinedFunction
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/UserDefinedFunction.html>

AsyncTableFunction
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/AsyncTableFunction.html>可以通过实现自定义评估方法来定义a的行为。评估方法必须公开声明,而不是静态声明,并命名为“
eval”。评估方法也可以通过实现多个名为“ eval”的方法来重载。

对于每个“ eval”,都可以触发一个异步io操作,一旦完成,就可以通过调用来收集结果CompletableFuture.complete(T)
<http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CompletableFuture.html?is-external=true#complete-T->。对于每个异步操作,调用“
eval”后,其上下文将立即存储在运算符中,从而避免在内部缓冲区未满的情况下阻塞输入的每个流。

代码示例:

   public void eval(CompletableFuture<Collection<String>> result,
String rowkey) {
            Get get = new Get(Bytes.toBytes(rowkey));
            ListenableFuture<Result> future = hbase.asyncGet(get);
            Futures.addCallback(future, new FutureCallback<Result>() {
                public void onSuccess(Result result) {
                    List<String> ret = process(result);
                    result.complete(ret);
                }

                public void onFailure(Throwable thrown) {
                    result.completeExceptionally(thrown);
                }
            });
        }

参考链接:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/AsyncTableFunction.html

kenyore <wo...@gmail.com> 于2021年1月12日周二 下午3:29写道:

> 感谢如此详尽的回复!
> 但是我的场景似乎无法直接使用维表join。
> 因为我需要把look up的结果(会是多行数据),拼成一个数组放入数据行。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink SQL 可以使用异步IO特性吗

Posted by kenyore <wo...@gmail.com>.
感谢如此详尽的回复!
但是我的场景似乎无法直接使用维表join。
因为我需要把look up的结果(会是多行数据),拼成一个数组放入数据行。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink SQL 可以使用异步IO特性吗

Posted by 金则林 <fr...@gmail.com>.
Hello,kenyore.

在维表的DDL的WITH参数中添加async='true',Async 相关参数如下。
参数说明是否必填备注
async 是否开启异步请求 否 默认值为fasle。
asyncResultOrder 异步结果顺序 否 取值如下:

   - unordered(默认值):无序。
   - ordered:有序。

asyncTimeoutMs 异步请求的超时时间 否 单位毫秒,默认值为3分钟。
asyncCapacity 异步请求的队列容量 否 默认值为100。
asyncCallbackThreads 回调处理线程数 否
回调类中的onComplete和onError默认会在线程池中处理该线程池的大小,默认值为50。
asyncConnectionQueueMaxsize 最大请求发送数 否 当等待某个服务器返回结果的请求数量达到
asyncConnectionQueueMaxsize值时,异步请求调用也会被阻塞,以防止客户端自身OOM(OutOfMemory),默认值为100。
asyncCallbackQueueMaxsize 最大回调处理队列 否 当等待回调处理的请求达到asyncCallbackQueueMaxsize
值时,异步请求调用也会被阻塞,以防止客户端自身OOM,默认值为500。

CREATE TABLE test(
    id VARCHAR,
    PRIMARY KEY(id)
) WITH(
    async='true',
    asyncResultOrder = 'unordered'
);


kenyore <wo...@gmail.com> 于2021年1月12日周二 下午2:38写道:

> hi,all
>   我有一个场景是使用自定义的ScalaFunction实现所需要的look up功能(从数据库查询并将多行数据拼成一个数组)。
>   我正在试图尝试使用异步IO的方式以提高它的性能,但是似乎只有Stream API提供了该特性支持。
>   大家有什么建议吗?或者有其他优化思路吗?
>   谢谢!
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/