You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by kenyore <wo...@gmail.com> on 2021/01/12 06:38:32 UTC
Flink SQL 可以使用异步IO特性吗
hi,all
我有一个场景是使用自定义的ScalaFunction实现所需要的look up功能(从数据库查询并将多行数据拼成一个数组)。
我正在试图尝试使用异步IO的方式以提高它的性能,但是似乎只有Stream API提供了该特性支持。
大家有什么建议吗?或者有其他优化思路吗?
谢谢!
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink SQL 可以使用异步IO特性吗
Posted by zelin jin <fr...@gmail.com>.
hello,kenyore.
我大致了解了你的意思,你可以通过继承AsyncTableFunction的方式实现数据库异步IO。
公共抽象类AsyncTableFunction <T> 扩展了UserDefinedFunction
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/UserDefinedFunction.html>
AsyncTableFunction
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/AsyncTableFunction.html>可以通过实现自定义评估方法来定义a的行为。评估方法必须公开声明,而不是静态声明,并命名为“
eval”。评估方法也可以通过实现多个名为“ eval”的方法来重载。
对于每个“ eval”,都可以触发一个异步io操作,一旦完成,就可以通过调用来收集结果CompletableFuture.complete(T)
<http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CompletableFuture.html?is-external=true#complete-T->。对于每个异步操作,调用“
eval”后,其上下文将立即存储在运算符中,从而避免在内部缓冲区未满的情况下阻塞输入的每个流。
代码示例:
public void eval(CompletableFuture<Collection<String>> result,
String rowkey) {
Get get = new Get(Bytes.toBytes(rowkey));
ListenableFuture<Result> future = hbase.asyncGet(get);
Futures.addCallback(future, new FutureCallback<Result>() {
public void onSuccess(Result result) {
List<String> ret = process(result);
result.complete(ret);
}
public void onFailure(Throwable thrown) {
result.completeExceptionally(thrown);
}
});
}
参考链接:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/AsyncTableFunction.html
kenyore <wo...@gmail.com> 于2021年1月12日周二 下午3:29写道:
> 感谢如此详尽的回复!
> 但是我的场景似乎无法直接使用维表join。
> 因为我需要把look up的结果(会是多行数据),拼成一个数组放入数据行。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink SQL 可以使用异步IO特性吗
Posted by zelin jin <fr...@gmail.com>.
hello,kenyore.
我大致了解了你的意思,你可以通过继承AsyncTableFunction的方式实现数据库异步IO。
公共抽象类AsyncTableFunction <T> 扩展了UserDefinedFunction
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/UserDefinedFunction.html>
AsyncTableFunction
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/AsyncTableFunction.html>可以通过实现自定义评估方法来定义a的行为。评估方法必须公开声明,而不是静态声明,并命名为“
eval”。评估方法也可以通过实现多个名为“ eval”的方法来重载。
对于每个“ eval”,都可以触发一个异步io操作,一旦完成,就可以通过调用来收集结果CompletableFuture.complete(T)
<http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CompletableFuture.html?is-external=true#complete-T->。对于每个异步操作,调用“
eval”后,其上下文将立即存储在运算符中,从而避免在内部缓冲区未满的情况下阻塞输入的每个流。
代码示例:
public void eval(CompletableFuture<Collection<String>> result,
String rowkey) {
Get get = new Get(Bytes.toBytes(rowkey));
ListenableFuture<Result> future = hbase.asyncGet(get);
Futures.addCallback(future, new FutureCallback<Result>() {
public void onSuccess(Result result) {
List<String> ret = process(result);
result.complete(ret);
}
public void onFailure(Throwable thrown) {
result.completeExceptionally(thrown);
}
});
}
参考链接:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/AsyncTableFunction.html
kenyore <wo...@gmail.com> 于2021年1月12日周二 下午3:29写道:
> 感谢如此详尽的回复!
> 但是我的场景似乎无法直接使用维表join。
> 因为我需要把look up的结果(会是多行数据),拼成一个数组放入数据行。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink SQL 可以使用异步IO特性吗
Posted by kenyore <wo...@gmail.com>.
感谢如此详尽的回复!
但是我的场景似乎无法直接使用维表join。
因为我需要把look up的结果(会是多行数据),拼成一个数组放入数据行。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink SQL 可以使用异步IO特性吗
Posted by 金则林 <fr...@gmail.com>.
Hello,kenyore.
在维表的DDL的WITH参数中添加async='true',Async 相关参数如下。
参数说明是否必填备注
async 是否开启异步请求 否 默认值为fasle。
asyncResultOrder 异步结果顺序 否 取值如下:
- unordered(默认值):无序。
- ordered:有序。
asyncTimeoutMs 异步请求的超时时间 否 单位毫秒,默认值为3分钟。
asyncCapacity 异步请求的队列容量 否 默认值为100。
asyncCallbackThreads 回调处理线程数 否
回调类中的onComplete和onError默认会在线程池中处理该线程池的大小,默认值为50。
asyncConnectionQueueMaxsize 最大请求发送数 否 当等待某个服务器返回结果的请求数量达到
asyncConnectionQueueMaxsize值时,异步请求调用也会被阻塞,以防止客户端自身OOM(OutOfMemory),默认值为100。
asyncCallbackQueueMaxsize 最大回调处理队列 否 当等待回调处理的请求达到asyncCallbackQueueMaxsize
值时,异步请求调用也会被阻塞,以防止客户端自身OOM,默认值为500。
CREATE TABLE test(
id VARCHAR,
PRIMARY KEY(id)
) WITH(
async='true',
asyncResultOrder = 'unordered'
);
kenyore <wo...@gmail.com> 于2021年1月12日周二 下午2:38写道:
> hi,all
> 我有一个场景是使用自定义的ScalaFunction实现所需要的look up功能(从数据库查询并将多行数据拼成一个数组)。
> 我正在试图尝试使用异步IO的方式以提高它的性能,但是似乎只有Stream API提供了该特性支持。
> 大家有什么建议吗?或者有其他优化思路吗?
> 谢谢!
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/