You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "shizhengchao (Jira)" <ji...@apache.org> on 2020/10/20 03:26:00 UTC

[jira] [Created] (FLINK-19723) The retry mechanism of jdbc connector has the risk of data duplication

shizhengchao created FLINK-19723:
------------------------------------

             Summary: The retry mechanism of jdbc connector has the risk of data duplication
                 Key: FLINK-19723
                 URL: https://issues.apache.org/jira/browse/FLINK-19723
             Project: Flink
          Issue Type: Bug
          Components: Connectors / JDBC
    Affects Versions: 1.11.1
            Reporter: shizhengchao


for example,  if statement.executeBatch() failur for some reason, but the "statement" was not closed in the retry, there is a risk of data duplication:
{code:java}
for (int i = 1; i <= executionOptions.getMaxRetries(); i++) {
	try {
		attemptFlush();
		batchCount = 0;
		break;
	} catch (SQLException e) {
		LOG.error("JDBC executeBatch error, retry times = {}", i, e);
		if (i >= executionOptions.getMaxRetries()) {
			throw new IOException(e);
		}
		try {
			if (!connection.isValid(CONNECTION_CHECK_TIMEOUT_SECONDS)) {
				connection = connectionProvider.reestablishConnection();
                                jdbcStatementExecutor.closeStatements();
			        jdbcStatementExecutor.prepareStatements(connection);
			}
		} catch (Exception excpetion) {
			LOG.error("JDBC connection is not valid, and reestablish connection failed.", excpetion);
			throw new IOException("Reestablish JDBC connection failed", excpetion);
		}
		try {
			Thread.sleep(1000 * i);
		} catch (InterruptedException ex) {
			Thread.currentThread().interrupt();
			throw new IOException("unable to flush; interrupted while doing another attempt", e);
		}
	}
}
{code}

the correct code should be:
{code:java}
try {
			if (!connection.isValid(CONNECTION_CHECK_TIMEOUT_SECONDS)) {
				connection = connectionProvider.reestablishConnection();
			}
                        jdbcStatementExecutor.closeStatements();
			jdbcStatementExecutor.prepareStatements(connection);
		} catch (Exception excpetion) {
			LOG.error("JDBC connection is not valid, and reestablish connection failed.", excpetion);
			throw new IOException("Reestablish JDBC connection failed", excpetion);
		}
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)