You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by venn <wx...@163.com> on 2019/07/09 11:40:01 UTC

Flink Async io problem

Hi Flink experts,

            I’m working flink async io program for stream join outer
database(mysql),but found sync,please give some advice, or provide some
async demo. thanks 

 

asyncInvoke method are as follow:


@Override
public void asyncInvoke(AsyncUser asyncUser, ResultFuture<AsyncUser>
resultFuture) throws Exception {
    // 使用 asyncUser id 查询
    ps.setString(1, asyncUser.getId());
    ResultSet rs = ps.executeQuery();

    CompletableFuture.supplyAsync(new Supplier<AsyncUser>() {
        @Override
        public AsyncUser get() {
            try {
                if (!rs.isClosed() && rs.next()) {
                    asyncUser.setPhone(rs.getString(1));
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
            return asyncUser;
        }
    }).thenAccept((AsyncUser tmp) -> {
        List<AsyncUser> list = new ArrayList();
        list.add(tmp);
        resultFuture.complete(list);
    });
}

 

 

 

Best, Venn


Re: Flink Async io problem

Posted by Yun Gao <yu...@aliyun.com>.
Hi Venn,

     I think `AsyncFunction#asyncInvoke` should be used to submit asynchronous tasks for the input records instead of executing the tasks directly. However, it seems that in the code fragment, the query is executed directly in the asyncInvoke method.

    I think you may also find more information in the document page [1]. A point might need to be noted is that in the example of the document page, the call to the `client#query` returns a Future, thus is is an asynchronous action instead of executing the query directly.

   [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
Best,
Yun


------------------------------------------------------------------
From:venn <wx...@163.com>
Send Time:2019 Jul. 9 (Tue.) 19:54
To:user <us...@flink.apache.org>
Subject:Flink Async io problem


Hi Flink experts,
            I’m working flink async io program for stream join outer database(mysql),but found sync,please give some advice, or provide some async demo. thanks 

asyncInvoke method are as follow:


@Override
public void asyncInvoke(AsyncUser asyncUser, ResultFuture<AsyncUser> resultFuture) throws Exception {
// 使用 asyncUser id 查询
ps.setString(1, asyncUser.getId());
ResultSet rs = ps.executeQuery();

CompletableFuture.supplyAsync(new Supplier<AsyncUser>() {
@Override
public AsyncUser get() {
try {
if (!rs.isClosed() && rs.next()) {
asyncUser.setPhone(rs.getString(1));
}
            } catch (SQLException e) {
                e.printStackTrace();
}
return asyncUser;
}
    }).thenAccept((AsyncUser tmp) -> {
        List<AsyncUser> list = new ArrayList();
list.add(tmp);
resultFuture.complete(list);
});
}



Best, Venn


Re: Flink Async io problem

Posted by Yun Gao <yu...@aliyun.com>.
Hi Venn,

     I think `AsyncFunction#asyncInvoke` should be used to submit asynchronous tasks for the input records instead of executing the tasks directly. However, it seems that in the code fragment, the query is executed directly in the asyncInvoke method.

    I think you may also find more information in the document page [1]. A point might need to be noted is that in the example of the document page, the call to the `client#query` returns a Future, thus is is an asynchronous action instead of executing the query directly.

   [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
Best,
Yun




------------------------------------------------------------------
From:venn <wx...@163.com>
Send Time:2019 Jul. 9 (Tue.) 19:54
To:user <us...@flink.apache.org>
Subject:Flink Async io problem


Hi Flink experts,
            I’m working flink async io program for stream join outer database(mysql),but found sync,please give some advice, or provide some async demo. thanks 

asyncInvoke method are as follow:


@Override
public void asyncInvoke(AsyncUser asyncUser, ResultFuture<AsyncUser> resultFuture) throws Exception {
// 使用 asyncUser id 查询
ps.setString(1, asyncUser.getId());
ResultSet rs = ps.executeQuery();

CompletableFuture.supplyAsync(new Supplier<AsyncUser>() {
@Override
public AsyncUser get() {
try {
if (!rs.isClosed() && rs.next()) {
asyncUser.setPhone(rs.getString(1));
}
            } catch (SQLException e) {
                e.printStackTrace();
}
return asyncUser;
}
    }).thenAccept((AsyncUser tmp) -> {
        List<AsyncUser> list = new ArrayList();
list.add(tmp);
resultFuture.complete(list);
});
}



Best, Venn