You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by shangwen <58...@qq.com> on 2020/03/23 03:05:19 UTC

Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效

我们在测试环境测试JDBC写入postgresql的场景,用tcpkill模拟链接被关闭的情况,测试对异常的兼容性,我们发现一直打印类似的日志


2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat&nbsp; - JDBC executeBatch error, retry times = 1
org.postgresql.util.PSQLException: This connection has been closed.
	at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857)
	at org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
	at org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
	at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
	at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)



从日志上看来,链接被关闭,只会导致重试一次,这里是Flink的重试代码
//JDBCUpsertOutputFormat.javapublic synchronized void flush() throws Exception {
   checkFlushException();

   for (int i = 1; i <= maxRetryTimes; i++) {
      try {
         jdbcWriter.executeBatch();
         batchCount = 0;
         break;
      } catch (SQLException e) {
         LOG.error("JDBC executeBatch error, retry times = {}", i, e);
         if (i &gt;= maxRetryTimes) {
            throw e;
         }
         Thread.sleep(1000 * i);
      }
   }
}


通过远程debug分析,在第一次执行
JDBCUpsertOutputFormat.flush
&nbsp; -&gt; AppendOnlyWriter.executeBatch
&nbsp; &nbsp; &nbsp;...
&nbsp; &nbsp; &nbsp;-&gt; PgConnection.getAutoCommit
抛出PSQLException: This connection has been closed时,batchStatements在这之前已经被清空
// PgStatement.java private BatchResultHandler internalExecuteBatch() throws SQLException {   // Construct query/parameter arrays.   transformQueriesAndParameters();   // Empty arrays should be passed to toArray   // see http://shipilev.net/blog/2016/arrays-wisdom-ancients/   Query[] queries = batchStatements.toArray(new Query[0]);   ParameterList[] parameterLists = batchParameters.toArray(new ParameterList[0]);   batchStatements.clear(); // 这里已经被清空   batchParameters.clear();   ...   if (connection.getAutoCommit()) { // 抛出异常   flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN;   }   ... }


所以在Flink第二次重试执行这段代码的时候jdbcWriter.executeBatch的时候,batchStatements已经为Empty并直接返回了,导致Flink认为是成功的,这样会导致其实语句没有被执行?
// PgStatement.java public int[] executeBatch() throws SQLException {   checkClosed();   closeForNextExecution();   if (batchStatements == null || batchStatements.isEmpty()) { //这里就直接返回了     return new int[0];   }   return internalExecuteBatch().getUpdateCount(); }


目前的想法是,当我出现异常捕获的时候,在重试之前,判断这个链接是否已经被关闭,如果已经被关闭,则重新open,不知道大家有啥看法,这是我提的issue
https://issues.apache.org/jira/browse/FLINK-16708

Re: Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效

Posted by Leonard Xu <xb...@gmail.com>.
Hi, shangwen

这应该是AppendOnlyWriter的一个bug[1], 在1.10.1/1.11-SNAPSHOT(master)中已经修复.
用1.10.1或master分支就好了,目前1.10.1还未发布,我了解到的1.10.1社区正在准备发布中。
如果急需修复,你可以参考1.10.1分支的代码。

Best,
Leonard

[1]https://issues.apache.org/jira/browse/FLINK-16281 <https://issues.apache.org/jira/browse/FLINK-16281>
[2]https://github.com/apache/flink/tree/release-1.10 <https://github.com/apache/flink/tree/release-1.10>

> 在 2020年3月23日,11:05,shangwen <58...@qq.com> 写道:
> 
> 我们在测试环境测试JDBC写入postgresql的场景,用tcpkill模拟链接被关闭的情况,测试对异常的兼容性,我们发现一直打印类似的日志
> 
> 
> 2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat&nbsp; - JDBC executeBatch error, retry times = 1
> org.postgresql.util.PSQLException: This connection has been closed.
> 	at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857)
> 	at org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817)
> 	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
> 	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
> 	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
> 	at org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
> 	at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
> 	at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> 
> 
> 
> 从日志上看来,链接被关闭,只会导致重试一次,这里是Flink的重试代码
> //JDBCUpsertOutputFormat.javapublic synchronized void flush() throws Exception {
>   checkFlushException();
> 
>   for (int i = 1; i <= maxRetryTimes; i++) {
>      try {
>         jdbcWriter.executeBatch();
>         batchCount = 0;
>         break;
>      } catch (SQLException e) {
>         LOG.error("JDBC executeBatch error, retry times = {}", i, e);
>         if (i &gt;= maxRetryTimes) {
>            throw e;
>         }
>         Thread.sleep(1000 * i);
>      }
>   }
> }
> 
> 
> 通过远程debug分析,在第一次执行
> JDBCUpsertOutputFormat.flush
> &nbsp; -&gt; AppendOnlyWriter.executeBatch
> &nbsp; &nbsp; &nbsp;...
> &nbsp; &nbsp; &nbsp;-&gt; PgConnection.getAutoCommit
> 抛出PSQLException: This connection has been closed时,batchStatements在这之前已经被清空
> // PgStatement.java private BatchResultHandler internalExecuteBatch() throws SQLException {   // Construct query/parameter arrays.   transformQueriesAndParameters();   // Empty arrays should be passed to toArray   // see http://shipilev.net/blog/2016/arrays-wisdom-ancients/   Query[] queries = batchStatements.toArray(new Query[0]);   ParameterList[] parameterLists = batchParameters.toArray(new ParameterList[0]);   batchStatements.clear(); // 这里已经被清空   batchParameters.clear();   ...   if (connection.getAutoCommit()) { // 抛出异常   flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN;   }   ... }
> 
> 
> 所以在Flink第二次重试执行这段代码的时候jdbcWriter.executeBatch的时候,batchStatements已经为Empty并直接返回了,导致Flink认为是成功的,这样会导致其实语句没有被执行?
> // PgStatement.java public int[] executeBatch() throws SQLException {   checkClosed();   closeForNextExecution();   if (batchStatements == null || batchStatements.isEmpty()) { //这里就直接返回了     return new int[0];   }   return internalExecuteBatch().getUpdateCount(); }
> 
> 
> 目前的想法是,当我出现异常捕获的时候,在重试之前,判断这个链接是否已经被关闭,如果已经被关闭,则重新open,不知道大家有啥看法,这是我提的issue
> https://issues.apache.org/jira/browse/FLINK-16708