You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Marco Villalobos <mv...@kineteque.com> on 2020/08/12 16:31:28 UTC

What async database library does the asyncio code example use?

I would like to enrich my stream with database calls as documented at:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html>

What async database library does the asyncio code example use? It references a class called "DatabaseClient".


Additionally, does anybody know any async database libraries that work with PostgreSQL that they could recommend?





Re: What async database library does the asyncio code example use?

Posted by Marco Villalobos <mv...@kineteque.com>.
Thank you!

This was very helpful.

Sincerely, 

Marco A. Villalobos

> On Aug 13, 2020, at 1:24 PM, Arvid Heise <ar...@ververica.com> wrote:
> 
> Hi Marco,
> 
> you don't need to use an async library; you could simply write your code in async fashion.
> 
> I'm trying to sketch the basic idea using any JDBC driver in the following (it's been a while since I used JDBC, so don't take it too literally).
> 
> private static class SampleAsyncFunction extends RichAsyncFunction<Integer, String> {
>    private transient ExecutorService executorService;
>    private transient Connection dbConn;
>    private transient PreparedStatement preparedStatement;
> 
>    SampleAsyncFunction(<connection info>) {
>       this.<connection info > = <connection info>;
>    }
> 
>    @Override
>    public void open(Configuration parameters) throws Exception {
>       super.open(parameters);
>       executorService = Executors.newFixedThreadPool(30);
>       dbConn = DriverManager.getConnection( < connection info >);
>       preparedStatement = dbConn.prepareStatement("SELECT * FROM WHERE ...");
>    }
> 
>    @Override
>    public void close() throws Exception {
>       super.close();
>       executorService.shutdownNow();
>       preparedStatement.close();
>       dbConn.close();
>    }
> 
>    @Override
>    public void asyncInvoke(final Integer input, final ResultFuture<String> resultFuture) {
>       executorService.submit(() -> {
>          try {
>             preparedStatement.setInt(0, input);
>             final ResultSet resultSet = preparedStatement.executeQuery();
> 
>             resultFuture.complete(Arrays.asList(resultSet.getString(0)));
>          } catch (SQLException e) {
>             resultFuture.completeExceptionally(e);
>          }
>       });
>    }
> }
> That's basically what all async libraries are doing behind the scenes anyways: spawn a thread pool and call the callbacks when a submitted task finishes.
> 
> To decide on the size of the thread pool, you should do some measurements without Flink on how many queries you can execute in parallel. Also keep in mind that if your async IO is run in parallel on the same task manager, that your threads will multiply (you can also use a static, shared executor, but it's a bit tricky to shutdown).
> 
> On Wed, Aug 12, 2020 at 8:16 PM KristoffSC <krzysiek.chmielewski@gmail.com <ma...@gmail.com>> wrote:
> Hi,
> I do believe that example from [1] where you see DatabaseClient is just a
> hint that whatever library you would use (db or REST based or whatever else)
> should be asynchronous or should actually not block. It does not have to be
> non blocking until it runs on its own thread pool that will return a feature
> or somewhat allowing you to register resultFuture.complete(...) on that
> future.
> 
> I actually write my own semi library that registers onto
> resultFuture.complete(...) from each library thread.
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html>
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
> 
> 
> -- 
> Arvid Heise | Senior Java Developer
>  <https://www.ververica.com/>
> Follow us @VervericaData
> --
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng    


Re: What async database library does the asyncio code example use?

Posted by Arvid Heise <ar...@ververica.com>.
Hi Marco,

you don't need to use an async library; you could simply write your code in
async fashion.

I'm trying to sketch the basic idea using any JDBC driver in the following
(it's been a while since I used JDBC, so don't take it too literally).

private static class SampleAsyncFunction extends
RichAsyncFunction<Integer, String> {
   private transient ExecutorService executorService;
   private transient Connection dbConn;
   private transient PreparedStatement preparedStatement;

   SampleAsyncFunction(<connection info>) {
      this.<connection info > = <connection info>;
   }

   @Override
   public void open(Configuration parameters) throws Exception {
      super.open(parameters);
      executorService = Executors.newFixedThreadPool(30);
      dbConn = DriverManager.getConnection( < connection info >);
      preparedStatement = dbConn.prepareStatement("SELECT * FROM WHERE ...");
   }

   @Override
   public void close() throws Exception {
      super.close();
      executorService.shutdownNow();
      preparedStatement.close();
      dbConn.close();
   }

   @Override
   public void asyncInvoke(final Integer input, final
ResultFuture<String> resultFuture) {
      executorService.submit(() -> {
         try {
            preparedStatement.setInt(0, input);
            final ResultSet resultSet = preparedStatement.executeQuery();

            resultFuture.complete(Arrays.asList(resultSet.getString(0)));
         } catch (SQLException e) {
            resultFuture.completeExceptionally(e);
         }
      });
   }
}

That's basically what all async libraries are doing behind the scenes
anyways: spawn a thread pool and call the callbacks when a submitted task
finishes.

To decide on the size of the thread pool, you should do some measurements
without Flink on how many queries you can execute in parallel. Also keep in
mind that if your async IO is run in parallel on the same task manager,
that your threads will multiply (you can also use a static, shared
executor, but it's a bit tricky to shutdown).

On Wed, Aug 12, 2020 at 8:16 PM KristoffSC <kr...@gmail.com>
wrote:

> Hi,
> I do believe that example from [1] where you see DatabaseClient is just a
> hint that whatever library you would use (db or REST based or whatever
> else)
> should be asynchronous or should actually not block. It does not have to be
> non blocking until it runs on its own thread pool that will return a
> feature
> or somewhat allowing you to register resultFuture.complete(...) on that
> future.
>
> I actually write my own semi library that registers onto
> resultFuture.complete(...) from each library thread.
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: What async database library does the asyncio code example use?

Posted by KristoffSC <kr...@gmail.com>.
Hi,
I do believe that example from [1] where you see DatabaseClient is just a
hint that whatever library you would use (db or REST based or whatever else)
should be asynchronous or should actually not block. It does not have to be
non blocking until it runs on its own thread pool that will return a feature
or somewhat allowing you to register resultFuture.complete(...) on that
future.

I actually write my own semi library that registers onto
resultFuture.complete(...) from each library thread.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: What async database library does the asyncio code example use?

Posted by Marco Villalobos <mv...@kineteque.com>.
So, I searched for an async DatabaseClient class, and I found r2dbc.

Is that it?

https://docs.spring.io/spring-data/r2dbc/docs/1.1.3.RELEASE/reference/html

On Wed, Aug 12, 2020 at 9:31 AM Marco Villalobos <mv...@kineteque.com>
wrote:

> I would like to enrich my stream with database calls as documented at:
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html
>
> What async database library does the asyncio code example use? It
> references a class called "DatabaseClient".
>
>
> Additionally, does anybody know any async database libraries that work
> with PostgreSQL that they could recommend?
>
>
>
>
>