You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 111 <xi...@163.com> on 2020/03/24 10:32:59 UTC
JDBC Sink参数connector.write.max-retries 在Oracle中的bug
Hi,
在使用jdbc sink时,底层使用oracle驱动会出现bug。
出现的现象:当max-retries参数设置为1时,任务能正常报错;当max-retries参数大于1时,虽然程序内部报错,但是任务总是正常结束。
在JDBCUpsertOutputFormat.java中的flush()方法中,设计了重试机制:
public synchronized void flush() throws Exception {
checkFlushException();
for (int i = 1; i <= maxRetryTimes; i++) {
try {
jdbcWriter.executeBatch();
batchCount = 0;
break;
} catch (SQLException e) {
LOG.error("JDBC executeBatch error, retry times = {}", i, e);
if (i >= maxRetryTimes) {
//throw e;
System.exit(-1);
}
Thread.sleep(1000 * i);
}
}
}
但是当executeBatch出现异常时,会进入异常捕获,并清空rank信息(ojdbc14的5339行):
} catch (SQLException var17) {
this.clearBatch();
this.needToParse = true;
if (this.sqlKind != 1 && this.sqlKind != 4) {
for(var3 = 0; var3 < var4.length; ++var3) {
var4[var3] = -3;
}
}
DatabaseError.throwBatchUpdateException(var17, this.sqlKind != 1 && this.sqlKind != 4 ? var4.length : var3, var4);
} finally {
下一次执行executeBatch时,由于rank为0,会直接跳过插入操作,返回成功。
public int[] executeBatch() throws SQLException {
synchronized(this.connection) {
int[] var10000;
synchronized(this) {
int var3 = 0;
this.setJdbcBatchStyle();
int[] var4 = new int[this.currentRank];
if (this.currentRank > 0) {
this.ensureOpen();
从而导致第二次重试的时候直接跳过插入操作,成功返回。
Re: JDBC Sink参数connector.write.max-retries 在Oracle中的bug
Posted by Leonard Xu <xb...@gmail.com>.
Hi, xinghalo
这是jdbc sink 的AppenOnlyWriter的一个已知bug,在1.10.1里已经修复[1],社区近期在准备1.10.1的发布,
建议等1.10.1发布后升级即可。
Best,
Leonard
[1]https://issues.apache.org/jira/browse/FLINK-16281 <https://issues.apache.org/jira/browse/FLINK-16281>
> 在 2020年3月24日,18:32,111 <xi...@163.com> 写道:
>
> Hi,
> 在使用jdbc sink时,底层使用oracle驱动会出现bug。
> 出现的现象:当max-retries参数设置为1时,任务能正常报错;当max-retries参数大于1时,虽然程序内部报错,但是任务总是正常结束。
>
>
> 在JDBCUpsertOutputFormat.java中的flush()方法中,设计了重试机制:
> public synchronized void flush() throws Exception {
> checkFlushException();
>
> for (int i = 1; i <= maxRetryTimes; i++) {
> try {
> jdbcWriter.executeBatch();
> batchCount = 0;
> break;
> } catch (SQLException e) {
> LOG.error("JDBC executeBatch error, retry times = {}", i, e);
> if (i >= maxRetryTimes) {
> //throw e;
> System.exit(-1);
> }
> Thread.sleep(1000 * i);
> }
> }
> }
> 但是当executeBatch出现异常时,会进入异常捕获,并清空rank信息(ojdbc14的5339行):
> } catch (SQLException var17) {
> this.clearBatch();
> this.needToParse = true;
> if (this.sqlKind != 1 && this.sqlKind != 4) {
> for(var3 = 0; var3 < var4.length; ++var3) {
> var4[var3] = -3;
> }
> }
>
> DatabaseError.throwBatchUpdateException(var17, this.sqlKind != 1 && this.sqlKind != 4 ? var4.length : var3, var4);
> } finally {
> 下一次执行executeBatch时,由于rank为0,会直接跳过插入操作,返回成功。
> public int[] executeBatch() throws SQLException {
> synchronized(this.connection) {
> int[] var10000;
> synchronized(this) {
> int var3 = 0;
> this.setJdbcBatchStyle();
> int[] var4 = new int[this.currentRank];
> if (this.currentRank > 0) {
> this.ensureOpen();
> 从而导致第二次重试的时候直接跳过插入操作,成功返回。
>
>
>
>
>