You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Stephen Connolly <st...@gmail.com> on 2019/07/24 07:18:18 UTC

How to handle JDBC connections in a topology

Hi,

So we have a number of nodes in our topology that need to do things like
checking a database, e.g.

* We need a filter step to drop events on the floor from systems we are no
longer interested in
* We need a step that outputs on a side-channel if the event is for an
object where the parent is not currently known to the database.

Right now we are grabbing a JDBC connection for each node in the topology
that needs to talk to the database and storing the connection in a
transient field (to exclude it from the serialized state)

What I'd really like to do is have a JDBC connection pool shared across the
entire topology as that way we could have the pool check for stale
connections, etc.

Does anyone have any tips for doing this kind of thing?

(My current idea is to maintain a `static final
WeakHashMap<ClassLoader,ConnectionPool>` in the main class... but that
feels very much like a hack)

What I'm really looking for is some form of Node Transient State... are
there any examples of this type of think.

Flink 1.8.x

Thanks,

-Stephen

Re: How to handle JDBC connections in a topology

Posted by Chesnay Schepler <ch...@apache.org>.
Note that in order for the static class approach to work you have to 
ensure that the class is loaded by the parent classloader, either by 
placing the class in /lib or configuring 
`classloader.parent-first-patterns-additional` to pick up this 
particular class.

On 24/07/2019 10:24, Haibo Sun wrote:
> Hi Stephen,
>
> I don't think it's possible to use the same connection pool for the 
> entire topology, because the nodes on the topology may run in 
> different JVMs and on different machines.
>
> If you want all operators running in the same JVM to use the same 
> connection pool, I think you can implement a static class that 
> contains the connection pool, and then the operators get the 
> connection from it.
>
> Best,
> Haibo
>
> At 2019-07-24 15:20:31, "Stephen Connolly" 
> <st...@gmail.com> wrote:
>
>     Oh and I'd also need some way to clean up the per-node transient
>     state if the topology stops running on a specific node.
>
>     On Wed, 24 Jul 2019 at 08:18, Stephen Connolly
>     <stephen.alan.connolly@gmail.com
>     <ma...@gmail.com>> wrote:
>
>         Hi,
>
>         So we have a number of nodes in our topology that need to do
>         things like checking a database, e.g.
>
>         * We need a filter step to drop events on the floor from
>         systems we are no longer interested in
>         * We need a step that outputs on a side-channel if the event
>         is for an object where the parent is not currently known to
>         the database.
>
>         Right now we are grabbing a JDBC connection for each node in
>         the topology that needs to talk to the database and storing
>         the connection in a transient field (to exclude it from the
>         serialized state)
>
>         What I'd really like to do is have a JDBC connection pool
>         shared across the entire topology as that way we could have
>         the pool check for stale connections, etc.
>
>         Does anyone have any tips for doing this kind of thing?
>
>         (My current idea is to maintain a `static final
>         WeakHashMap<ClassLoader,ConnectionPool>` in the main class...
>         but that feels very much like a hack)
>
>         What I'm really looking for is some form of Node Transient
>         State... are there any examples of this type of think.
>
>         Flink 1.8.x
>
>         Thanks,
>
>         -Stephen
>


Re:Re: How to handle JDBC connections in a topology

Posted by Haibo Sun <su...@163.com>.
Hi Stephen,


I don't think it's possible to use the same connection pool for the entire topology, because the nodes on the topology may run in different JVMs and on different machines.


If you want all operators running in the same JVM to use the same connection pool, I think you can implement a static class that contains the connection pool, and then the operators get the  connection from it.


Best,
Haibo

At 2019-07-24 15:20:31, "Stephen Connolly" <st...@gmail.com> wrote:

Oh and I'd also need some way to clean up the per-node transient state if the topology stops running on a specific node.


On Wed, 24 Jul 2019 at 08:18, Stephen Connolly <st...@gmail.com> wrote:

Hi,


So we have a number of nodes in our topology that need to do things like checking a database, e.g.


* We need a filter step to drop events on the floor from systems we are no longer interested in
* We need a step that outputs on a side-channel if the event is for an object where the parent is not currently known to the database.


Right now we are grabbing a JDBC connection for each node in the topology that needs to talk to the database and storing the connection in a transient field (to exclude it from the serialized state)


What I'd really like to do is have a JDBC connection pool shared across the entire topology as that way we could have the pool check for stale connections, etc.


Does anyone have any tips for doing this kind of thing?


(My current idea is to maintain a `static final WeakHashMap<ClassLoader,ConnectionPool>` in the main class... but that feels very much like a hack)


What I'm really looking for is some form of Node Transient State... are there any examples of this type of think.


Flink 1.8.x


Thanks,


-Stephen

Re: How to handle JDBC connections in a topology

Posted by Stephen Connolly <st...@gmail.com>.
Oh and I'd also need some way to clean up the per-node transient state if
the topology stops running on a specific node.

On Wed, 24 Jul 2019 at 08:18, Stephen Connolly <
stephen.alan.connolly@gmail.com> wrote:

> Hi,
>
> So we have a number of nodes in our topology that need to do things like
> checking a database, e.g.
>
> * We need a filter step to drop events on the floor from systems we are no
> longer interested in
> * We need a step that outputs on a side-channel if the event is for an
> object where the parent is not currently known to the database.
>
> Right now we are grabbing a JDBC connection for each node in the topology
> that needs to talk to the database and storing the connection in a
> transient field (to exclude it from the serialized state)
>
> What I'd really like to do is have a JDBC connection pool shared across
> the entire topology as that way we could have the pool check for stale
> connections, etc.
>
> Does anyone have any tips for doing this kind of thing?
>
> (My current idea is to maintain a `static final
> WeakHashMap<ClassLoader,ConnectionPool>` in the main class... but that
> feels very much like a hack)
>
> What I'm really looking for is some form of Node Transient State... are
> there any examples of this type of think.
>
> Flink 1.8.x
>
> Thanks,
>
> -Stephen
>