You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by jrthe42 <gi...@git.apache.org> on 2018/07/11 02:23:41 UTC
[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...
GitHub user jrthe42 opened a pull request:
https://github.com/apache/flink/pull/6301
[FLINK-9794] [jdbc] JDBCOutputFormat does not consider idle connection and multithreads synchronization
## What is the purpose of the change
This pull request fix bugs in original implementation of `JDBCOutputFormat`, which does not consider idle connection and multithreads synchronization .
- The Connection was established when JDBCOutputFormat is opened, and will be used all the time. But if this connection lies idle for a long time, the database will force close the connection, thus errors may occur.
- The flush() method is called when batchCount exceeds the threshold, but it is also called while snapshotting state. So two threads may modify upload and batchCount, but without synchronization.
## Brief change log
- Using a Timer to test the jdbc connection periodically and keep it alive
- Add synchronization for batch operation
## Verifying this change
This change is a trivial rework / code cleanup without any test coverage.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/jrthe42/flink fix-jdbcoutputformat
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6301.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6301
----
commit d0c34811e3a30ebc97fd784ebf2fdd3f2085358b
Author: jrthe42 <jr...@...>
Date: 2018-07-11T01:50:29Z
Using a Timer to test the jdbc connection periodically and keep it alive
If jdbc connction lies idle for a long time, the database will force close the connetion. Keep this connection valid using a timer.
commit d08938350bb3d78a324562e14d24a0892a052b5f
Author: jrthe42 <jr...@...>
Date: 2018-07-11T01:58:40Z
[FLINK-9794] JDBCOutputFormat does not consider idle connection and multithreads synchronization
----
---
[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...
Posted by jrthe42 <gi...@git.apache.org>.
Github user jrthe42 commented on a diff in the pull request:
https://github.com/apache/flink/pull/6301#discussion_r201702302
--- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
@@ -111,110 +134,117 @@ public void writeRecord(Row row) throws IOException {
if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) {
LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
}
- try {
+ synchronized (this) {
--- End diff --
we need synchronization because two ```flush()``` may be called by two separated thread. As it is explained in **the purpose of the change**.
---
[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...
Posted by jrthe42 <gi...@git.apache.org>.
Github user jrthe42 commented on a diff in the pull request:
https://github.com/apache/flink/pull/6301#discussion_r201555575
--- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java ---
@@ -115,6 +119,16 @@ public JDBCAppendTableSinkBuilder setParameterTypes(int... types) {
return this;
}
+ public JDBCAppendTableSinkBuilder setIdleConnectionCheckInterval(long idleConnectionCheckInterval) {
--- End diff --
OK, I will add java doc here.
---
[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...
Posted by jrthe42 <gi...@git.apache.org>.
Github user jrthe42 commented on a diff in the pull request:
https://github.com/apache/flink/pull/6301#discussion_r201555608
--- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
@@ -41,6 +43,8 @@
public class JDBCOutputFormat extends RichOutputFormat<Row> {
private static final long serialVersionUID = 1L;
static final int DEFAULT_BATCH_INTERVAL = 5000;
+ static final long DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL = 30 * 60 * 1000;
+ static final int DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT = 0;
--- End diff --
DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL is the default schedule period for timer, DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT is the default timeout for validating the connection. A value of 0 indicates a timeout is not applied to the database operation, see here:
https://docs.oracle.com/javase/8/docs/api/java/sql/Connection.html#isValid-int-
---
[GitHub] flink issue #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not consider id...
Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on the issue:
https://github.com/apache/flink/pull/6301
@jrthe42 Hi, thanks for your PR.
From my side, I think use a connection pool to solve the connection problem is a better way. We don't need to keep the connections all the way. It wastes the connection resources if most threads have been idle for a long time. Also, the connection pool will not bring extra cost if threads are busy writing data into database, since the connections in the pool will be reused.
I googled just now and find the `MiniConnectionPoolManager ` descriptions [here](http://www.source-code.biz/miniconnectionpoolmanager/). Maybe we can use it.
Best, Hequn
---
[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...
Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:
https://github.com/apache/flink/pull/6301#discussion_r201552317
--- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
@@ -41,6 +43,8 @@
public class JDBCOutputFormat extends RichOutputFormat<Row> {
private static final long serialVersionUID = 1L;
static final int DEFAULT_BATCH_INTERVAL = 5000;
+ static final long DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL = 30 * 60 * 1000;
+ static final int DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT = 0;
--- End diff --
I'd like to change the const value to larger than "0", based on JDK 1.8, the `Timer#schedule` method's third parameter `period` less or equal than "0" will throw `IllegalArgumentException` exception, see here : https://docs.oracle.com/javase/8/docs/api/java/util/Timer.html#schedule-java.util.TimerTask-long-long-
---
[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...
Posted by jrthe42 <gi...@git.apache.org>.
Github user jrthe42 commented on a diff in the pull request:
https://github.com/apache/flink/pull/6301#discussion_r201555590
--- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java ---
@@ -115,6 +119,16 @@ public JDBCAppendTableSinkBuilder setParameterTypes(int... types) {
return this;
}
+ public JDBCAppendTableSinkBuilder setIdleConnectionCheckInterval(long idleConnectionCheckInterval) {
+ this.idleConnectionCheckInterval = idleConnectionCheckInterval;
+ return this;
+ }
+
+ public JDBCAppendTableSinkBuilder setIdleConnectionCheckTimeout(int idleConnectionCheckTimeout) {
--- End diff --
OK, I will add java doc here.
---
[GitHub] flink issue #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not consider id...
Posted by jrthe42 <gi...@git.apache.org>.
Github user jrthe42 commented on the issue:
https://github.com/apache/flink/pull/6301
Hi @hequn8128, I agree with you that a connection pool is more effective using connection resource. I didn't choose connection pool because that will introduce new dependencies, and I'm not sure if that tradeoff is acceptable. I will check ```MiniConnectionPoolManager ``` to see if it's a better way.
---
[GitHub] flink issue #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not consider id...
Posted by jrthe42 <gi...@git.apache.org>.
Github user jrthe42 commented on the issue:
https://github.com/apache/flink/pull/6301
Hi @sihuazhou , I am not familiar with the checkpoint mechanism of Flink, and I check the source code again.
Although the ```RichSinkFunction#invoke()``` and ```RichSinkFunction#snapshotState()``` are not executed in the same thread, but there is already synchronization mechanism in ```StreamTask```. ```StreamTask``` use a **checkpoint lock object** to make sure they won't be called concurrently. Check ```StreamTask#performCheckpoint()``` and ```StreamInputProcessor#processInput()``` if you want to know more.
Thanks for your comment, I removed synchronization here, and this PR is updated. cc @yanghua
---
[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...
Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:
https://github.com/apache/flink/pull/6301#discussion_r201551369
--- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java ---
@@ -115,6 +119,16 @@ public JDBCAppendTableSinkBuilder setParameterTypes(int... types) {
return this;
}
+ public JDBCAppendTableSinkBuilder setIdleConnectionCheckInterval(long idleConnectionCheckInterval) {
+ this.idleConnectionCheckInterval = idleConnectionCheckInterval;
+ return this;
+ }
+
+ public JDBCAppendTableSinkBuilder setIdleConnectionCheckTimeout(int idleConnectionCheckTimeout) {
--- End diff --
please add java doc
---
[GitHub] flink issue #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not consider id...
Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:
https://github.com/apache/flink/pull/6301
@sihuazhou is right and reviewed, +1 from my side
---
[GitHub] flink issue #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not consider id...
Posted by jrthe42 <gi...@git.apache.org>.
Github user jrthe42 commented on the issue:
https://github.com/apache/flink/pull/6301
Hi @sihuazhou snapshots are executed in a separated thread, you can check this : https://github.com/apache/flink/blob/53e6657658bc750b78c32e91fa7e2c02e8c54e33/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L1212
---
[GitHub] flink issue #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not consider id...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/6301
Hi @jrthe42, In general the checkpoint include two part of works.
- part1: take a snapshot of the state.
- part2: transfer the snapshot to the checkpoint destination(e.g. DFS)
The part1 need to be sync, and the part2 can be async, if I'm not wrong.
---
[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...
Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:
https://github.com/apache/flink/pull/6301#discussion_r201551945
--- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
@@ -54,6 +58,10 @@
private Connection dbConn;
private PreparedStatement upload;
+ private long idleConnectionCheckInterval = DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL;
+ private int idleConnectionCheckTimeOut = DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT;
--- End diff --
change the variable to `idleConnectionCheckTimeout ` looks better to me
---
[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...
Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:
https://github.com/apache/flink/pull/6301#discussion_r201551339
--- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java ---
@@ -115,6 +119,16 @@ public JDBCAppendTableSinkBuilder setParameterTypes(int... types) {
return this;
}
+ public JDBCAppendTableSinkBuilder setIdleConnectionCheckInterval(long idleConnectionCheckInterval) {
--- End diff --
please add java doc
---
[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6301#discussion_r201676322
--- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
@@ -111,110 +134,117 @@ public void writeRecord(Row row) throws IOException {
if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) {
LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
}
- try {
+ synchronized (this) {
--- End diff --
Why do we need synchronized this?
---
[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...
Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:
https://github.com/apache/flink/pull/6301#discussion_r201558827
--- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
@@ -41,6 +43,8 @@
public class JDBCOutputFormat extends RichOutputFormat<Row> {
private static final long serialVersionUID = 1L;
static final int DEFAULT_BATCH_INTERVAL = 5000;
+ static final long DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL = 30 * 60 * 1000;
+ static final int DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT = 0;
--- End diff --
OK, my wrong, I misunderstand that you make this value as Timer#schedule method's third parameter period
---
[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...
Posted by jrthe42 <gi...@git.apache.org>.
Github user jrthe42 commented on a diff in the pull request:
https://github.com/apache/flink/pull/6301#discussion_r201555598
--- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
@@ -54,6 +58,10 @@
private Connection dbConn;
private PreparedStatement upload;
+ private long idleConnectionCheckInterval = DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL;
+ private int idleConnectionCheckTimeOut = DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT;
--- End diff --
Agree with that
---