You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by jrthe42 <gi...@git.apache.org> on 2018/07/11 02:23:41 UTC

[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...

GitHub user jrthe42 opened a pull request:

    https://github.com/apache/flink/pull/6301

    [FLINK-9794] [jdbc] JDBCOutputFormat does not consider idle connection and multithreads synchronization

    ## What is the purpose of the change
    
    This pull request fix bugs in original implementation of `JDBCOutputFormat`, which does not consider idle connection and multithreads synchronization .
    
    - The Connection was established when JDBCOutputFormat is opened, and will be used all the time. But if this connection lies idle for a long time, the database will force close the connection, thus errors may occur.
    - The flush() method is called when batchCount exceeds the threshold, but it is also called while snapshotting state. So two threads may modify upload and batchCount, but without synchronization.
    
    ## Brief change log
    
      - Using a Timer to test the jdbc connection periodically and keep it alive
      - Add synchronization for batch operation
    
    ## Verifying this change
    This change is a trivial rework / code cleanup without any test coverage.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jrthe42/flink fix-jdbcoutputformat

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6301.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6301
    
----
commit d0c34811e3a30ebc97fd784ebf2fdd3f2085358b
Author: jrthe42 <jr...@...>
Date:   2018-07-11T01:50:29Z

    Using a Timer to test the jdbc connection periodically and keep it alive
    
    If jdbc connction lies idle for a long time, the database will force close the connetion. Keep this connection valid using a timer.

commit d08938350bb3d78a324562e14d24a0892a052b5f
Author: jrthe42 <jr...@...>
Date:   2018-07-11T01:58:40Z

    [FLINK-9794] JDBCOutputFormat does not consider idle connection and multithreads synchronization

----


---

[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...

Posted by jrthe42 <gi...@git.apache.org>.
Github user jrthe42 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6301#discussion_r201702302
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
    @@ -111,110 +134,117 @@ public void writeRecord(Row row) throws IOException {
     		if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) {
     			LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
     		}
    -		try {
    +		synchronized (this) {
    --- End diff --
    
    we need synchronization because two ```flush()``` may be called by two separated thread. As it is explained in **the purpose of the change**.


---

[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...

Posted by jrthe42 <gi...@git.apache.org>.
Github user jrthe42 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6301#discussion_r201555575
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java ---
    @@ -115,6 +119,16 @@ public JDBCAppendTableSinkBuilder setParameterTypes(int... types) {
     		return this;
     	}
     
    +	public JDBCAppendTableSinkBuilder setIdleConnectionCheckInterval(long idleConnectionCheckInterval) {
    --- End diff --
    
    OK, I will add java doc here.


---

[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...

Posted by jrthe42 <gi...@git.apache.org>.
Github user jrthe42 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6301#discussion_r201555608
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
    @@ -41,6 +43,8 @@
     public class JDBCOutputFormat extends RichOutputFormat<Row> {
     	private static final long serialVersionUID = 1L;
     	static final int DEFAULT_BATCH_INTERVAL = 5000;
    +	static final long DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL = 30 * 60 * 1000;
    +	static final int DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT = 0;
    --- End diff --
    
    DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL is the default schedule period for timer, DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT  is the default timeout for validating the connection. A value of 0 indicates a timeout is not applied to the database operation, see here:
    https://docs.oracle.com/javase/8/docs/api/java/sql/Connection.html#isValid-int-


---

[GitHub] flink issue #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not consider id...

Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on the issue:

    https://github.com/apache/flink/pull/6301
  
    @jrthe42 Hi, thanks for your PR. 
    From my side, I think use a connection pool to solve the connection problem is a better way. We don't need to keep the connections all the way. It wastes the connection resources if most threads have been idle for a long time. Also, the connection pool will not bring extra cost if threads are busy writing data into database, since the connections in the pool will be reused. 
    
    I googled just now and find the `MiniConnectionPoolManager ` descriptions [here](http://www.source-code.biz/miniconnectionpoolmanager/).  Maybe we can use it. 
    
    Best, Hequn



---

[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6301#discussion_r201552317
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
    @@ -41,6 +43,8 @@
     public class JDBCOutputFormat extends RichOutputFormat<Row> {
     	private static final long serialVersionUID = 1L;
     	static final int DEFAULT_BATCH_INTERVAL = 5000;
    +	static final long DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL = 30 * 60 * 1000;
    +	static final int DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT = 0;
    --- End diff --
    
    I'd like to change the const value to larger than "0", based on JDK 1.8, the `Timer#schedule` method's third parameter `period` less or equal than "0" will throw `IllegalArgumentException` exception, see here : https://docs.oracle.com/javase/8/docs/api/java/util/Timer.html#schedule-java.util.TimerTask-long-long-


---

[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...

Posted by jrthe42 <gi...@git.apache.org>.
Github user jrthe42 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6301#discussion_r201555590
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java ---
    @@ -115,6 +119,16 @@ public JDBCAppendTableSinkBuilder setParameterTypes(int... types) {
     		return this;
     	}
     
    +	public JDBCAppendTableSinkBuilder setIdleConnectionCheckInterval(long idleConnectionCheckInterval) {
    +		this.idleConnectionCheckInterval = idleConnectionCheckInterval;
    +		return this;
    +	}
    +
    +	public JDBCAppendTableSinkBuilder setIdleConnectionCheckTimeout(int idleConnectionCheckTimeout) {
    --- End diff --
    
    OK, I will add java doc here.


---

[GitHub] flink issue #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not consider id...

Posted by jrthe42 <gi...@git.apache.org>.
Github user jrthe42 commented on the issue:

    https://github.com/apache/flink/pull/6301
  
    Hi @hequn8128, I agree with you that a connection pool is more effective using connection resource. I didn't choose connection pool because that will introduce new dependencies, and I'm not sure if that tradeoff is acceptable. I will check ```MiniConnectionPoolManager ``` to see if it's a better way.


---

[GitHub] flink issue #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not consider id...

Posted by jrthe42 <gi...@git.apache.org>.
Github user jrthe42 commented on the issue:

    https://github.com/apache/flink/pull/6301
  
    Hi @sihuazhou , I am not familiar with the checkpoint mechanism of Flink, and I check the source code again. 
    
    Although the ```RichSinkFunction#invoke()``` and ```RichSinkFunction#snapshotState()``` are not executed in the same thread, but there is already synchronization mechanism in ```StreamTask```. ```StreamTask``` use a **checkpoint lock object** to make sure they won't be called concurrently. Check ```StreamTask#performCheckpoint()``` and ```StreamInputProcessor#processInput()``` if you want to know more.
    
    Thanks for your comment, I removed synchronization here, and this PR is updated. cc @yanghua 
    



---

[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6301#discussion_r201551369
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java ---
    @@ -115,6 +119,16 @@ public JDBCAppendTableSinkBuilder setParameterTypes(int... types) {
     		return this;
     	}
     
    +	public JDBCAppendTableSinkBuilder setIdleConnectionCheckInterval(long idleConnectionCheckInterval) {
    +		this.idleConnectionCheckInterval = idleConnectionCheckInterval;
    +		return this;
    +	}
    +
    +	public JDBCAppendTableSinkBuilder setIdleConnectionCheckTimeout(int idleConnectionCheckTimeout) {
    --- End diff --
    
    please add java doc


---

[GitHub] flink issue #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not consider id...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/6301
  
    @sihuazhou is right and reviewed, +1 from my side


---

[GitHub] flink issue #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not consider id...

Posted by jrthe42 <gi...@git.apache.org>.
Github user jrthe42 commented on the issue:

    https://github.com/apache/flink/pull/6301
  
    Hi @sihuazhou snapshots are executed in a separated thread, you can check this : https://github.com/apache/flink/blob/53e6657658bc750b78c32e91fa7e2c02e8c54e33/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L1212


---

[GitHub] flink issue #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not consider id...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/6301
  
    Hi @jrthe42, In general the checkpoint include two part of works.
    
    - part1: take a snapshot of the state.
    - part2: transfer the snapshot to the checkpoint destination(e.g. DFS)
    
    The part1 need to be sync, and the part2 can be async, if I'm not wrong. 
    



---

[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6301#discussion_r201551945
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
    @@ -54,6 +58,10 @@
     	private Connection dbConn;
     	private PreparedStatement upload;
     
    +	private long idleConnectionCheckInterval = DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL;
    +	private int idleConnectionCheckTimeOut = DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT;
    --- End diff --
    
    change the variable to `idleConnectionCheckTimeout ` looks better to me


---

[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6301#discussion_r201551339
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java ---
    @@ -115,6 +119,16 @@ public JDBCAppendTableSinkBuilder setParameterTypes(int... types) {
     		return this;
     	}
     
    +	public JDBCAppendTableSinkBuilder setIdleConnectionCheckInterval(long idleConnectionCheckInterval) {
    --- End diff --
    
    please add java doc


---

[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6301#discussion_r201676322
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
    @@ -111,110 +134,117 @@ public void writeRecord(Row row) throws IOException {
     		if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) {
     			LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
     		}
    -		try {
    +		synchronized (this) {
    --- End diff --
    
    Why do we need synchronized this?


---

[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6301#discussion_r201558827
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
    @@ -41,6 +43,8 @@
     public class JDBCOutputFormat extends RichOutputFormat<Row> {
     	private static final long serialVersionUID = 1L;
     	static final int DEFAULT_BATCH_INTERVAL = 5000;
    +	static final long DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL = 30 * 60 * 1000;
    +	static final int DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT = 0;
    --- End diff --
    
    OK, my wrong, I misunderstand that you make this value as Timer#schedule method's third parameter period


---

[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...

Posted by jrthe42 <gi...@git.apache.org>.
Github user jrthe42 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6301#discussion_r201555598
  
    --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
    @@ -54,6 +58,10 @@
     	private Connection dbConn;
     	private PreparedStatement upload;
     
    +	private long idleConnectionCheckInterval = DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL;
    +	private int idleConnectionCheckTimeOut = DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT;
    --- End diff --
    
    Agree with that


---