You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Xin Ma <ke...@gmail.com> on 2019/10/16 14:02:51 UTC

Should I use static database connection pool?

I have watched one of the recent Flink forward videos, Apache Flink Worst
Practices by Konstantin Knauf. The talk helps me a lot and mentions that we
should avoid using static variables to share state between tasks.

So should I also avoid static database connection? Because I am facing a
weird issue currently, the database connection will lose at some point and
bring the whole job down.

*I have created a database tool like this, *

public class Phoenix {

    private static ComboPooledDataSource dataSource = new
ComboPooledDataSource();
    static {
        try {

dataSource.setDriverClass(Environment.get("phoenix.jdbc.driverClassName",
"org.apache.phoenix.jdbc.PhoenixDriver"));
            dataSource.setJdbcUrl(Environment.get("phoenix.jdbc.url",
null));
            dataSource.setMaxPoolSize(200);
            dataSource.setMinPoolSize(10);
            Properties properties = new Properties();
            properties.setProperty("user", "---");
            properties.setProperty("password", "---");
            dataSource.setProperties(properties);
        } catch (PropertyVetoException e) {
            throw new RuntimeException("phoenix datasource conf error");
        }
    }

    private static Connection getConn() throws SQLException {
        return dataSource.getConnection();
    }

    public static < T > T executeQuery(String sql, Caller < T > caller)
throws SQLException {
        // .. execiton logic
    }

    public static int executeUpdateWithTx(List < String > sqlList) throws
SQLException {
        // ..update logic
    }

}

*Then I implemented my customized sink function like this,*

public class CustomizedSink extends RichSinkFunction < Record > {

    private static Logger LOG = LoggerFactory.getLogger("userFlinkLogger");
    private static final int batchInsertSize = 5000;
    private static final long flushInterval = 60 * 1000 L;
    private long lastFlushTime;
    private BatchCommit batchCommit;
    private ConcurrentLinkedQueue < Object > cacheQueue;
    private ExecutorService threadPool;

    @Override
    public void open(Configuration parameters) throws Exception {
        cacheQueue = new ConcurrentLinkedQueue < > ();
        threadPool = Executors.newFixedThreadPool(1);
        batchCommit = new BatchCommit();
        super.open(parameters);
    }

    @Override
    public void invoke(DriverLbs driverLbs) throws Exception {
        cacheQueue.add(driverLbs);
        if (cacheQueue.size() >= batchInsertSize ||
            System.currentTimeMillis() - lastFlushTime >= flushInterval) {
            lastFlushTime = System.currentTimeMillis();
            threadPool.execute(batchCommit);
        }
    }

    private class BatchCommit implements Runnable {
        @Override
        public void run() {
            try {
                int ct;
                synchronized(cacheQueue) {
                    List < String > sqlList = Lists.newArrayList();
                    for (ct = 0; ct < batchInsertSize; ct++) {
                        Object obj = cacheQueue.poll();
                        if (obj == null) {
                            break;
                        }
                        sqlList.add(generateBatchSql((Record) obj));
                    }
                    Phoenix.executeUpdateWithTx(sqlList);
                }
                LOG.info("Batch insert " + ct + " cache records");
            } catch (Exception e) {
                LOG.error("Batch insert error: ", e);
            }
        }

        private String generateBatchSql(Record record) {
            // business logic
        }
    }
}

*Is there any good idea to refactor the codes?*

Best,
Kevin

Re: Should I use static database connection pool?

Posted by John Smith <ja...@gmail.com>.
Also pool should be transient because it it holds connections which
shouldn't/cannot be serialized.

On Thu., Oct. 17, 2019, 9:39 a.m. John Smith, <ja...@gmail.com>
wrote:

> If by task you mean job then yes global static variables initialized im
> the main of the job do not get serialized/transfered to the nodes where
> that job may get assigned.
>
> The other thing is also since it is a sink, the sink will be serialized to
> that node and then initialized so that static variable will be local to
> that sink.
>
> Someone from flink should chime in :p
>
> On Thu., Oct. 17, 2019, 9:22 a.m. John Smith, <ja...@gmail.com>
> wrote:
>
>> Usually the database connection pool is thread safe. When you mean task
>> you mean a single deployed flink job?
>>
>> I still think a sink is only init once. You can prove it by putting
>> logging in the open and close.
>>
>> On Thu., Oct. 17, 2019, 1:39 a.m. Xin Ma, <ke...@gmail.com> wrote:
>>
>>> Thanks, John. If I don't static my pool, I think it will create one
>>> instance for each task. If the pool is static, each jvm can hold one
>>> instance. Depending on the deployment approach, it can create one to
>>> multiple instances. Is this correct?
>>> Konstantin's talk mentions static variables can lead to dead locks, etc,
>>> I don't know if the loss of jdbc connection is also related to this. Btw, I
>>> am using JDBC to write to HBase, maybe it also matters.
>>>
>>>
>>> On Thu, Oct 17, 2019 at 2:32 AM John Smith <ja...@gmail.com>
>>> wrote:
>>>
>>>> Xin. The open() close() cycle of a Sink function is only called once so
>>>> I don't think you event need to have it static your pool. Someone can
>>>> confirm this?
>>>>
>>>> Miki the JDBC Connector lacks some functionality for instance it only
>>>> flushes batches when the batch interval is reached. So if you set batch
>>>> interval to 5 and you get 6 records the 6 one will not be flushed to the DB
>>>> until you get another 4. You can see in the code above Xin has put a timer
>>>> based flush as well. Also JDBC connector does not have checkpointing if you
>>>> ever need that, which is a surprise because most JDBC databases have
>>>> transactions so it would be nice to have.
>>>>
>>>> On Wed, 16 Oct 2019 at 10:58, miki haiat <mi...@gmail.com> wrote:
>>>>
>>>>> If it's a sink that use jdbc, why not using the flink Jdbcsink
>>>>> connector?
>>>>>
>>>>>
>>>>> On Wed, Oct 16, 2019, 17:03 Xin Ma <ke...@gmail.com> wrote:
>>>>>
>>>>>> I have watched one of the recent Flink forward videos, Apache Flink
>>>>>> Worst Practices by Konstantin Knauf. The talk helps me a lot and mentions
>>>>>> that we should avoid using static variables to share state between tasks.
>>>>>>
>>>>>> So should I also avoid static database connection? Because I am
>>>>>> facing a weird issue currently, the database connection will lose at some
>>>>>> point and bring the whole job down.
>>>>>>
>>>>>> *I have created a database tool like this, *
>>>>>>
>>>>>> public class Phoenix {
>>>>>>
>>>>>>     private static ComboPooledDataSource dataSource = new
>>>>>> ComboPooledDataSource();
>>>>>>     static {
>>>>>>         try {
>>>>>>
>>>>>> dataSource.setDriverClass(Environment.get("phoenix.jdbc.driverClassName",
>>>>>> "org.apache.phoenix.jdbc.PhoenixDriver"));
>>>>>>             dataSource.setJdbcUrl(Environment.get("phoenix.jdbc.url",
>>>>>> null));
>>>>>>             dataSource.setMaxPoolSize(200);
>>>>>>             dataSource.setMinPoolSize(10);
>>>>>>             Properties properties = new Properties();
>>>>>>             properties.setProperty("user", "---");
>>>>>>             properties.setProperty("password", "---");
>>>>>>             dataSource.setProperties(properties);
>>>>>>         } catch (PropertyVetoException e) {
>>>>>>             throw new RuntimeException("phoenix datasource conf
>>>>>> error");
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>>     private static Connection getConn() throws SQLException {
>>>>>>         return dataSource.getConnection();
>>>>>>     }
>>>>>>
>>>>>>     public static < T > T executeQuery(String sql, Caller < T >
>>>>>> caller) throws SQLException {
>>>>>>         // .. execiton logic
>>>>>>     }
>>>>>>
>>>>>>     public static int executeUpdateWithTx(List < String > sqlList)
>>>>>> throws SQLException {
>>>>>>         // ..update logic
>>>>>>     }
>>>>>>
>>>>>> }
>>>>>>
>>>>>> *Then I implemented my customized sink function like this,*
>>>>>>
>>>>>> public class CustomizedSink extends RichSinkFunction < Record > {
>>>>>>
>>>>>>     private static Logger LOG =
>>>>>> LoggerFactory.getLogger("userFlinkLogger");
>>>>>>     private static final int batchInsertSize = 5000;
>>>>>>     private static final long flushInterval = 60 * 1000 L;
>>>>>>     private long lastFlushTime;
>>>>>>     private BatchCommit batchCommit;
>>>>>>     private ConcurrentLinkedQueue < Object > cacheQueue;
>>>>>>     private ExecutorService threadPool;
>>>>>>
>>>>>>     @Override
>>>>>>     public void open(Configuration parameters) throws Exception {
>>>>>>         cacheQueue = new ConcurrentLinkedQueue < > ();
>>>>>>         threadPool = Executors.newFixedThreadPool(1);
>>>>>>         batchCommit = new BatchCommit();
>>>>>>         super.open(parameters);
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public void invoke(DriverLbs driverLbs) throws Exception {
>>>>>>         cacheQueue.add(driverLbs);
>>>>>>         if (cacheQueue.size() >= batchInsertSize ||
>>>>>>             System.currentTimeMillis() - lastFlushTime >=
>>>>>> flushInterval) {
>>>>>>             lastFlushTime = System.currentTimeMillis();
>>>>>>             threadPool.execute(batchCommit);
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>>     private class BatchCommit implements Runnable {
>>>>>>         @Override
>>>>>>         public void run() {
>>>>>>             try {
>>>>>>                 int ct;
>>>>>>                 synchronized(cacheQueue) {
>>>>>>                     List < String > sqlList = Lists.newArrayList();
>>>>>>                     for (ct = 0; ct < batchInsertSize; ct++) {
>>>>>>                         Object obj = cacheQueue.poll();
>>>>>>                         if (obj == null) {
>>>>>>                             break;
>>>>>>                         }
>>>>>>                         sqlList.add(generateBatchSql((Record) obj));
>>>>>>                     }
>>>>>>                     Phoenix.executeUpdateWithTx(sqlList);
>>>>>>                 }
>>>>>>                 LOG.info("Batch insert " + ct + " cache records");
>>>>>>             } catch (Exception e) {
>>>>>>                 LOG.error("Batch insert error: ", e);
>>>>>>             }
>>>>>>         }
>>>>>>
>>>>>>         private String generateBatchSql(Record record) {
>>>>>>             // business logic
>>>>>>         }
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>> *Is there any good idea to refactor the codes?*
>>>>>>
>>>>>> Best,
>>>>>> Kevin
>>>>>>
>>>>>>

Re: Should I use static database connection pool?

Posted by John Smith <ja...@gmail.com>.
If by task you mean job then yes global static variables initialized im the
main of the job do not get serialized/transfered to the nodes where that
job may get assigned.

The other thing is also since it is a sink, the sink will be serialized to
that node and then initialized so that static variable will be local to
that sink.

Someone from flink should chime in :p

On Thu., Oct. 17, 2019, 9:22 a.m. John Smith, <ja...@gmail.com>
wrote:

> Usually the database connection pool is thread safe. When you mean task
> you mean a single deployed flink job?
>
> I still think a sink is only init once. You can prove it by putting
> logging in the open and close.
>
> On Thu., Oct. 17, 2019, 1:39 a.m. Xin Ma, <ke...@gmail.com> wrote:
>
>> Thanks, John. If I don't static my pool, I think it will create one
>> instance for each task. If the pool is static, each jvm can hold one
>> instance. Depending on the deployment approach, it can create one to
>> multiple instances. Is this correct?
>> Konstantin's talk mentions static variables can lead to dead locks, etc,
>> I don't know if the loss of jdbc connection is also related to this. Btw, I
>> am using JDBC to write to HBase, maybe it also matters.
>>
>>
>> On Thu, Oct 17, 2019 at 2:32 AM John Smith <ja...@gmail.com>
>> wrote:
>>
>>> Xin. The open() close() cycle of a Sink function is only called once so
>>> I don't think you event need to have it static your pool. Someone can
>>> confirm this?
>>>
>>> Miki the JDBC Connector lacks some functionality for instance it only
>>> flushes batches when the batch interval is reached. So if you set batch
>>> interval to 5 and you get 6 records the 6 one will not be flushed to the DB
>>> until you get another 4. You can see in the code above Xin has put a timer
>>> based flush as well. Also JDBC connector does not have checkpointing if you
>>> ever need that, which is a surprise because most JDBC databases have
>>> transactions so it would be nice to have.
>>>
>>> On Wed, 16 Oct 2019 at 10:58, miki haiat <mi...@gmail.com> wrote:
>>>
>>>> If it's a sink that use jdbc, why not using the flink Jdbcsink
>>>> connector?
>>>>
>>>>
>>>> On Wed, Oct 16, 2019, 17:03 Xin Ma <ke...@gmail.com> wrote:
>>>>
>>>>> I have watched one of the recent Flink forward videos, Apache Flink
>>>>> Worst Practices by Konstantin Knauf. The talk helps me a lot and mentions
>>>>> that we should avoid using static variables to share state between tasks.
>>>>>
>>>>> So should I also avoid static database connection? Because I am facing
>>>>> a weird issue currently, the database connection will lose at some point
>>>>> and bring the whole job down.
>>>>>
>>>>> *I have created a database tool like this, *
>>>>>
>>>>> public class Phoenix {
>>>>>
>>>>>     private static ComboPooledDataSource dataSource = new
>>>>> ComboPooledDataSource();
>>>>>     static {
>>>>>         try {
>>>>>
>>>>> dataSource.setDriverClass(Environment.get("phoenix.jdbc.driverClassName",
>>>>> "org.apache.phoenix.jdbc.PhoenixDriver"));
>>>>>             dataSource.setJdbcUrl(Environment.get("phoenix.jdbc.url",
>>>>> null));
>>>>>             dataSource.setMaxPoolSize(200);
>>>>>             dataSource.setMinPoolSize(10);
>>>>>             Properties properties = new Properties();
>>>>>             properties.setProperty("user", "---");
>>>>>             properties.setProperty("password", "---");
>>>>>             dataSource.setProperties(properties);
>>>>>         } catch (PropertyVetoException e) {
>>>>>             throw new RuntimeException("phoenix datasource conf
>>>>> error");
>>>>>         }
>>>>>     }
>>>>>
>>>>>     private static Connection getConn() throws SQLException {
>>>>>         return dataSource.getConnection();
>>>>>     }
>>>>>
>>>>>     public static < T > T executeQuery(String sql, Caller < T >
>>>>> caller) throws SQLException {
>>>>>         // .. execiton logic
>>>>>     }
>>>>>
>>>>>     public static int executeUpdateWithTx(List < String > sqlList)
>>>>> throws SQLException {
>>>>>         // ..update logic
>>>>>     }
>>>>>
>>>>> }
>>>>>
>>>>> *Then I implemented my customized sink function like this,*
>>>>>
>>>>> public class CustomizedSink extends RichSinkFunction < Record > {
>>>>>
>>>>>     private static Logger LOG =
>>>>> LoggerFactory.getLogger("userFlinkLogger");
>>>>>     private static final int batchInsertSize = 5000;
>>>>>     private static final long flushInterval = 60 * 1000 L;
>>>>>     private long lastFlushTime;
>>>>>     private BatchCommit batchCommit;
>>>>>     private ConcurrentLinkedQueue < Object > cacheQueue;
>>>>>     private ExecutorService threadPool;
>>>>>
>>>>>     @Override
>>>>>     public void open(Configuration parameters) throws Exception {
>>>>>         cacheQueue = new ConcurrentLinkedQueue < > ();
>>>>>         threadPool = Executors.newFixedThreadPool(1);
>>>>>         batchCommit = new BatchCommit();
>>>>>         super.open(parameters);
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public void invoke(DriverLbs driverLbs) throws Exception {
>>>>>         cacheQueue.add(driverLbs);
>>>>>         if (cacheQueue.size() >= batchInsertSize ||
>>>>>             System.currentTimeMillis() - lastFlushTime >=
>>>>> flushInterval) {
>>>>>             lastFlushTime = System.currentTimeMillis();
>>>>>             threadPool.execute(batchCommit);
>>>>>         }
>>>>>     }
>>>>>
>>>>>     private class BatchCommit implements Runnable {
>>>>>         @Override
>>>>>         public void run() {
>>>>>             try {
>>>>>                 int ct;
>>>>>                 synchronized(cacheQueue) {
>>>>>                     List < String > sqlList = Lists.newArrayList();
>>>>>                     for (ct = 0; ct < batchInsertSize; ct++) {
>>>>>                         Object obj = cacheQueue.poll();
>>>>>                         if (obj == null) {
>>>>>                             break;
>>>>>                         }
>>>>>                         sqlList.add(generateBatchSql((Record) obj));
>>>>>                     }
>>>>>                     Phoenix.executeUpdateWithTx(sqlList);
>>>>>                 }
>>>>>                 LOG.info("Batch insert " + ct + " cache records");
>>>>>             } catch (Exception e) {
>>>>>                 LOG.error("Batch insert error: ", e);
>>>>>             }
>>>>>         }
>>>>>
>>>>>         private String generateBatchSql(Record record) {
>>>>>             // business logic
>>>>>         }
>>>>>     }
>>>>> }
>>>>>
>>>>> *Is there any good idea to refactor the codes?*
>>>>>
>>>>> Best,
>>>>> Kevin
>>>>>
>>>>>

Re: Should I use static database connection pool?

Posted by John Smith <ja...@gmail.com>.
Usually the database connection pool is thread safe. When you mean task you
mean a single deployed flink job?

I still think a sink is only init once. You can prove it by putting logging
in the open and close.

On Thu., Oct. 17, 2019, 1:39 a.m. Xin Ma, <ke...@gmail.com> wrote:

> Thanks, John. If I don't static my pool, I think it will create one
> instance for each task. If the pool is static, each jvm can hold one
> instance. Depending on the deployment approach, it can create one to
> multiple instances. Is this correct?
> Konstantin's talk mentions static variables can lead to dead locks, etc, I
> don't know if the loss of jdbc connection is also related to this. Btw, I
> am using JDBC to write to HBase, maybe it also matters.
>
>
> On Thu, Oct 17, 2019 at 2:32 AM John Smith <ja...@gmail.com> wrote:
>
>> Xin. The open() close() cycle of a Sink function is only called once so I
>> don't think you event need to have it static your pool. Someone can confirm
>> this?
>>
>> Miki the JDBC Connector lacks some functionality for instance it only
>> flushes batches when the batch interval is reached. So if you set batch
>> interval to 5 and you get 6 records the 6 one will not be flushed to the DB
>> until you get another 4. You can see in the code above Xin has put a timer
>> based flush as well. Also JDBC connector does not have checkpointing if you
>> ever need that, which is a surprise because most JDBC databases have
>> transactions so it would be nice to have.
>>
>> On Wed, 16 Oct 2019 at 10:58, miki haiat <mi...@gmail.com> wrote:
>>
>>> If it's a sink that use jdbc, why not using the flink Jdbcsink connector?
>>>
>>>
>>> On Wed, Oct 16, 2019, 17:03 Xin Ma <ke...@gmail.com> wrote:
>>>
>>>> I have watched one of the recent Flink forward videos, Apache Flink
>>>> Worst Practices by Konstantin Knauf. The talk helps me a lot and mentions
>>>> that we should avoid using static variables to share state between tasks.
>>>>
>>>> So should I also avoid static database connection? Because I am facing
>>>> a weird issue currently, the database connection will lose at some point
>>>> and bring the whole job down.
>>>>
>>>> *I have created a database tool like this, *
>>>>
>>>> public class Phoenix {
>>>>
>>>>     private static ComboPooledDataSource dataSource = new
>>>> ComboPooledDataSource();
>>>>     static {
>>>>         try {
>>>>
>>>> dataSource.setDriverClass(Environment.get("phoenix.jdbc.driverClassName",
>>>> "org.apache.phoenix.jdbc.PhoenixDriver"));
>>>>             dataSource.setJdbcUrl(Environment.get("phoenix.jdbc.url",
>>>> null));
>>>>             dataSource.setMaxPoolSize(200);
>>>>             dataSource.setMinPoolSize(10);
>>>>             Properties properties = new Properties();
>>>>             properties.setProperty("user", "---");
>>>>             properties.setProperty("password", "---");
>>>>             dataSource.setProperties(properties);
>>>>         } catch (PropertyVetoException e) {
>>>>             throw new RuntimeException("phoenix datasource conf error");
>>>>         }
>>>>     }
>>>>
>>>>     private static Connection getConn() throws SQLException {
>>>>         return dataSource.getConnection();
>>>>     }
>>>>
>>>>     public static < T > T executeQuery(String sql, Caller < T > caller)
>>>> throws SQLException {
>>>>         // .. execiton logic
>>>>     }
>>>>
>>>>     public static int executeUpdateWithTx(List < String > sqlList)
>>>> throws SQLException {
>>>>         // ..update logic
>>>>     }
>>>>
>>>> }
>>>>
>>>> *Then I implemented my customized sink function like this,*
>>>>
>>>> public class CustomizedSink extends RichSinkFunction < Record > {
>>>>
>>>>     private static Logger LOG =
>>>> LoggerFactory.getLogger("userFlinkLogger");
>>>>     private static final int batchInsertSize = 5000;
>>>>     private static final long flushInterval = 60 * 1000 L;
>>>>     private long lastFlushTime;
>>>>     private BatchCommit batchCommit;
>>>>     private ConcurrentLinkedQueue < Object > cacheQueue;
>>>>     private ExecutorService threadPool;
>>>>
>>>>     @Override
>>>>     public void open(Configuration parameters) throws Exception {
>>>>         cacheQueue = new ConcurrentLinkedQueue < > ();
>>>>         threadPool = Executors.newFixedThreadPool(1);
>>>>         batchCommit = new BatchCommit();
>>>>         super.open(parameters);
>>>>     }
>>>>
>>>>     @Override
>>>>     public void invoke(DriverLbs driverLbs) throws Exception {
>>>>         cacheQueue.add(driverLbs);
>>>>         if (cacheQueue.size() >= batchInsertSize ||
>>>>             System.currentTimeMillis() - lastFlushTime >=
>>>> flushInterval) {
>>>>             lastFlushTime = System.currentTimeMillis();
>>>>             threadPool.execute(batchCommit);
>>>>         }
>>>>     }
>>>>
>>>>     private class BatchCommit implements Runnable {
>>>>         @Override
>>>>         public void run() {
>>>>             try {
>>>>                 int ct;
>>>>                 synchronized(cacheQueue) {
>>>>                     List < String > sqlList = Lists.newArrayList();
>>>>                     for (ct = 0; ct < batchInsertSize; ct++) {
>>>>                         Object obj = cacheQueue.poll();
>>>>                         if (obj == null) {
>>>>                             break;
>>>>                         }
>>>>                         sqlList.add(generateBatchSql((Record) obj));
>>>>                     }
>>>>                     Phoenix.executeUpdateWithTx(sqlList);
>>>>                 }
>>>>                 LOG.info("Batch insert " + ct + " cache records");
>>>>             } catch (Exception e) {
>>>>                 LOG.error("Batch insert error: ", e);
>>>>             }
>>>>         }
>>>>
>>>>         private String generateBatchSql(Record record) {
>>>>             // business logic
>>>>         }
>>>>     }
>>>> }
>>>>
>>>> *Is there any good idea to refactor the codes?*
>>>>
>>>> Best,
>>>> Kevin
>>>>
>>>>

Re: Should I use static database connection pool?

Posted by Xin Ma <ke...@gmail.com>.
Thanks, John. If I don't static my pool, I think it will create one
instance for each task. If the pool is static, each jvm can hold one
instance. Depending on the deployment approach, it can create one to
multiple instances. Is this correct?
Konstantin's talk mentions static variables can lead to dead locks, etc, I
don't know if the loss of jdbc connection is also related to this. Btw, I
am using JDBC to write to HBase, maybe it also matters.


On Thu, Oct 17, 2019 at 2:32 AM John Smith <ja...@gmail.com> wrote:

> Xin. The open() close() cycle of a Sink function is only called once so I
> don't think you event need to have it static your pool. Someone can confirm
> this?
>
> Miki the JDBC Connector lacks some functionality for instance it only
> flushes batches when the batch interval is reached. So if you set batch
> interval to 5 and you get 6 records the 6 one will not be flushed to the DB
> until you get another 4. You can see in the code above Xin has put a timer
> based flush as well. Also JDBC connector does not have checkpointing if you
> ever need that, which is a surprise because most JDBC databases have
> transactions so it would be nice to have.
>
> On Wed, 16 Oct 2019 at 10:58, miki haiat <mi...@gmail.com> wrote:
>
>> If it's a sink that use jdbc, why not using the flink Jdbcsink connector?
>>
>>
>> On Wed, Oct 16, 2019, 17:03 Xin Ma <ke...@gmail.com> wrote:
>>
>>> I have watched one of the recent Flink forward videos, Apache Flink
>>> Worst Practices by Konstantin Knauf. The talk helps me a lot and mentions
>>> that we should avoid using static variables to share state between tasks.
>>>
>>> So should I also avoid static database connection? Because I am facing a
>>> weird issue currently, the database connection will lose at some point and
>>> bring the whole job down.
>>>
>>> *I have created a database tool like this, *
>>>
>>> public class Phoenix {
>>>
>>>     private static ComboPooledDataSource dataSource = new
>>> ComboPooledDataSource();
>>>     static {
>>>         try {
>>>
>>> dataSource.setDriverClass(Environment.get("phoenix.jdbc.driverClassName",
>>> "org.apache.phoenix.jdbc.PhoenixDriver"));
>>>             dataSource.setJdbcUrl(Environment.get("phoenix.jdbc.url",
>>> null));
>>>             dataSource.setMaxPoolSize(200);
>>>             dataSource.setMinPoolSize(10);
>>>             Properties properties = new Properties();
>>>             properties.setProperty("user", "---");
>>>             properties.setProperty("password", "---");
>>>             dataSource.setProperties(properties);
>>>         } catch (PropertyVetoException e) {
>>>             throw new RuntimeException("phoenix datasource conf error");
>>>         }
>>>     }
>>>
>>>     private static Connection getConn() throws SQLException {
>>>         return dataSource.getConnection();
>>>     }
>>>
>>>     public static < T > T executeQuery(String sql, Caller < T > caller)
>>> throws SQLException {
>>>         // .. execiton logic
>>>     }
>>>
>>>     public static int executeUpdateWithTx(List < String > sqlList)
>>> throws SQLException {
>>>         // ..update logic
>>>     }
>>>
>>> }
>>>
>>> *Then I implemented my customized sink function like this,*
>>>
>>> public class CustomizedSink extends RichSinkFunction < Record > {
>>>
>>>     private static Logger LOG =
>>> LoggerFactory.getLogger("userFlinkLogger");
>>>     private static final int batchInsertSize = 5000;
>>>     private static final long flushInterval = 60 * 1000 L;
>>>     private long lastFlushTime;
>>>     private BatchCommit batchCommit;
>>>     private ConcurrentLinkedQueue < Object > cacheQueue;
>>>     private ExecutorService threadPool;
>>>
>>>     @Override
>>>     public void open(Configuration parameters) throws Exception {
>>>         cacheQueue = new ConcurrentLinkedQueue < > ();
>>>         threadPool = Executors.newFixedThreadPool(1);
>>>         batchCommit = new BatchCommit();
>>>         super.open(parameters);
>>>     }
>>>
>>>     @Override
>>>     public void invoke(DriverLbs driverLbs) throws Exception {
>>>         cacheQueue.add(driverLbs);
>>>         if (cacheQueue.size() >= batchInsertSize ||
>>>             System.currentTimeMillis() - lastFlushTime >= flushInterval)
>>> {
>>>             lastFlushTime = System.currentTimeMillis();
>>>             threadPool.execute(batchCommit);
>>>         }
>>>     }
>>>
>>>     private class BatchCommit implements Runnable {
>>>         @Override
>>>         public void run() {
>>>             try {
>>>                 int ct;
>>>                 synchronized(cacheQueue) {
>>>                     List < String > sqlList = Lists.newArrayList();
>>>                     for (ct = 0; ct < batchInsertSize; ct++) {
>>>                         Object obj = cacheQueue.poll();
>>>                         if (obj == null) {
>>>                             break;
>>>                         }
>>>                         sqlList.add(generateBatchSql((Record) obj));
>>>                     }
>>>                     Phoenix.executeUpdateWithTx(sqlList);
>>>                 }
>>>                 LOG.info("Batch insert " + ct + " cache records");
>>>             } catch (Exception e) {
>>>                 LOG.error("Batch insert error: ", e);
>>>             }
>>>         }
>>>
>>>         private String generateBatchSql(Record record) {
>>>             // business logic
>>>         }
>>>     }
>>> }
>>>
>>> *Is there any good idea to refactor the codes?*
>>>
>>> Best,
>>> Kevin
>>>
>>>

Re: Should I use static database connection pool?

Posted by John Smith <ja...@gmail.com>.
Xin. The open() close() cycle of a Sink function is only called once so I
don't think you event need to have it static your pool. Someone can confirm
this?

Miki the JDBC Connector lacks some functionality for instance it only
flushes batches when the batch interval is reached. So if you set batch
interval to 5 and you get 6 records the 6 one will not be flushed to the DB
until you get another 4. You can see in the code above Xin has put a timer
based flush as well. Also JDBC connector does not have checkpointing if you
ever need that, which is a surprise because most JDBC databases have
transactions so it would be nice to have.

On Wed, 16 Oct 2019 at 10:58, miki haiat <mi...@gmail.com> wrote:

> If it's a sink that use jdbc, why not using the flink Jdbcsink connector?
>
>
> On Wed, Oct 16, 2019, 17:03 Xin Ma <ke...@gmail.com> wrote:
>
>> I have watched one of the recent Flink forward videos, Apache Flink Worst
>> Practices by Konstantin Knauf. The talk helps me a lot and mentions that we
>> should avoid using static variables to share state between tasks.
>>
>> So should I also avoid static database connection? Because I am facing a
>> weird issue currently, the database connection will lose at some point and
>> bring the whole job down.
>>
>> *I have created a database tool like this, *
>>
>> public class Phoenix {
>>
>>     private static ComboPooledDataSource dataSource = new
>> ComboPooledDataSource();
>>     static {
>>         try {
>>
>> dataSource.setDriverClass(Environment.get("phoenix.jdbc.driverClassName",
>> "org.apache.phoenix.jdbc.PhoenixDriver"));
>>             dataSource.setJdbcUrl(Environment.get("phoenix.jdbc.url",
>> null));
>>             dataSource.setMaxPoolSize(200);
>>             dataSource.setMinPoolSize(10);
>>             Properties properties = new Properties();
>>             properties.setProperty("user", "---");
>>             properties.setProperty("password", "---");
>>             dataSource.setProperties(properties);
>>         } catch (PropertyVetoException e) {
>>             throw new RuntimeException("phoenix datasource conf error");
>>         }
>>     }
>>
>>     private static Connection getConn() throws SQLException {
>>         return dataSource.getConnection();
>>     }
>>
>>     public static < T > T executeQuery(String sql, Caller < T > caller)
>> throws SQLException {
>>         // .. execiton logic
>>     }
>>
>>     public static int executeUpdateWithTx(List < String > sqlList) throws
>> SQLException {
>>         // ..update logic
>>     }
>>
>> }
>>
>> *Then I implemented my customized sink function like this,*
>>
>> public class CustomizedSink extends RichSinkFunction < Record > {
>>
>>     private static Logger LOG =
>> LoggerFactory.getLogger("userFlinkLogger");
>>     private static final int batchInsertSize = 5000;
>>     private static final long flushInterval = 60 * 1000 L;
>>     private long lastFlushTime;
>>     private BatchCommit batchCommit;
>>     private ConcurrentLinkedQueue < Object > cacheQueue;
>>     private ExecutorService threadPool;
>>
>>     @Override
>>     public void open(Configuration parameters) throws Exception {
>>         cacheQueue = new ConcurrentLinkedQueue < > ();
>>         threadPool = Executors.newFixedThreadPool(1);
>>         batchCommit = new BatchCommit();
>>         super.open(parameters);
>>     }
>>
>>     @Override
>>     public void invoke(DriverLbs driverLbs) throws Exception {
>>         cacheQueue.add(driverLbs);
>>         if (cacheQueue.size() >= batchInsertSize ||
>>             System.currentTimeMillis() - lastFlushTime >= flushInterval) {
>>             lastFlushTime = System.currentTimeMillis();
>>             threadPool.execute(batchCommit);
>>         }
>>     }
>>
>>     private class BatchCommit implements Runnable {
>>         @Override
>>         public void run() {
>>             try {
>>                 int ct;
>>                 synchronized(cacheQueue) {
>>                     List < String > sqlList = Lists.newArrayList();
>>                     for (ct = 0; ct < batchInsertSize; ct++) {
>>                         Object obj = cacheQueue.poll();
>>                         if (obj == null) {
>>                             break;
>>                         }
>>                         sqlList.add(generateBatchSql((Record) obj));
>>                     }
>>                     Phoenix.executeUpdateWithTx(sqlList);
>>                 }
>>                 LOG.info("Batch insert " + ct + " cache records");
>>             } catch (Exception e) {
>>                 LOG.error("Batch insert error: ", e);
>>             }
>>         }
>>
>>         private String generateBatchSql(Record record) {
>>             // business logic
>>         }
>>     }
>> }
>>
>> *Is there any good idea to refactor the codes?*
>>
>> Best,
>> Kevin
>>
>>

Re: Should I use static database connection pool?

Posted by miki haiat <mi...@gmail.com>.
If it's a sink that use jdbc, why not using the flink Jdbcsink connector?


On Wed, Oct 16, 2019, 17:03 Xin Ma <ke...@gmail.com> wrote:

> I have watched one of the recent Flink forward videos, Apache Flink Worst
> Practices by Konstantin Knauf. The talk helps me a lot and mentions that we
> should avoid using static variables to share state between tasks.
>
> So should I also avoid static database connection? Because I am facing a
> weird issue currently, the database connection will lose at some point and
> bring the whole job down.
>
> *I have created a database tool like this, *
>
> public class Phoenix {
>
>     private static ComboPooledDataSource dataSource = new
> ComboPooledDataSource();
>     static {
>         try {
>
> dataSource.setDriverClass(Environment.get("phoenix.jdbc.driverClassName",
> "org.apache.phoenix.jdbc.PhoenixDriver"));
>             dataSource.setJdbcUrl(Environment.get("phoenix.jdbc.url",
> null));
>             dataSource.setMaxPoolSize(200);
>             dataSource.setMinPoolSize(10);
>             Properties properties = new Properties();
>             properties.setProperty("user", "---");
>             properties.setProperty("password", "---");
>             dataSource.setProperties(properties);
>         } catch (PropertyVetoException e) {
>             throw new RuntimeException("phoenix datasource conf error");
>         }
>     }
>
>     private static Connection getConn() throws SQLException {
>         return dataSource.getConnection();
>     }
>
>     public static < T > T executeQuery(String sql, Caller < T > caller)
> throws SQLException {
>         // .. execiton logic
>     }
>
>     public static int executeUpdateWithTx(List < String > sqlList) throws
> SQLException {
>         // ..update logic
>     }
>
> }
>
> *Then I implemented my customized sink function like this,*
>
> public class CustomizedSink extends RichSinkFunction < Record > {
>
>     private static Logger LOG = LoggerFactory.getLogger("userFlinkLogger");
>     private static final int batchInsertSize = 5000;
>     private static final long flushInterval = 60 * 1000 L;
>     private long lastFlushTime;
>     private BatchCommit batchCommit;
>     private ConcurrentLinkedQueue < Object > cacheQueue;
>     private ExecutorService threadPool;
>
>     @Override
>     public void open(Configuration parameters) throws Exception {
>         cacheQueue = new ConcurrentLinkedQueue < > ();
>         threadPool = Executors.newFixedThreadPool(1);
>         batchCommit = new BatchCommit();
>         super.open(parameters);
>     }
>
>     @Override
>     public void invoke(DriverLbs driverLbs) throws Exception {
>         cacheQueue.add(driverLbs);
>         if (cacheQueue.size() >= batchInsertSize ||
>             System.currentTimeMillis() - lastFlushTime >= flushInterval) {
>             lastFlushTime = System.currentTimeMillis();
>             threadPool.execute(batchCommit);
>         }
>     }
>
>     private class BatchCommit implements Runnable {
>         @Override
>         public void run() {
>             try {
>                 int ct;
>                 synchronized(cacheQueue) {
>                     List < String > sqlList = Lists.newArrayList();
>                     for (ct = 0; ct < batchInsertSize; ct++) {
>                         Object obj = cacheQueue.poll();
>                         if (obj == null) {
>                             break;
>                         }
>                         sqlList.add(generateBatchSql((Record) obj));
>                     }
>                     Phoenix.executeUpdateWithTx(sqlList);
>                 }
>                 LOG.info("Batch insert " + ct + " cache records");
>             } catch (Exception e) {
>                 LOG.error("Batch insert error: ", e);
>             }
>         }
>
>         private String generateBatchSql(Record record) {
>             // business logic
>         }
>     }
> }
>
> *Is there any good idea to refactor the codes?*
>
> Best,
> Kevin
>
>