You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/22 08:18:35 UTC
[flink] branch release-1.11 updated: [FLINK-17544][jdbc] Fix NPE
and resource leak problem in JdbcOutputFormat
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 4bf1ed2 [FLINK-17544][jdbc] Fix NPE and resource leak problem in JdbcOutputFormat
4bf1ed2 is described below
commit 4bf1ed2ac4006ee5c4af54c028fe4eede07ffcc6
Author: Shengkai <33...@users.noreply.github.com>
AuthorDate: Mon Jun 22 14:15:32 2020 +0800
[FLINK-17544][jdbc] Fix NPE and resource leak problem in JdbcOutputFormat
This closes #12712
---
.../jdbc/internal/JdbcBatchingOutputFormat.java | 6 +--
.../jdbc/internal/TableJdbcUpsertOutputFormat.java | 4 +-
.../connector/jdbc/internal/JdbcFullTest.java | 47 ++++++++++++++++++++++
.../jdbc/internal/JdbcTableOutputFormatTest.java | 14 +++++++
4 files changed, 66 insertions(+), 5 deletions(-)
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java
index af41ed1..c7afc31 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java
@@ -209,8 +209,6 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatchStat
if (!closed) {
closed = true;
- checkFlushException();
-
if (this.scheduledFuture != null) {
scheduledFuture.cancel(false);
this.scheduler.shutdown();
@@ -220,7 +218,7 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatchStat
try {
flush();
} catch (Exception e) {
- throw new RuntimeException("Writing records to JDBC failed.", e);
+ LOG.warn("Writing records to JDBC failed.", e);
}
}
@@ -233,6 +231,7 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatchStat
}
}
super.close();
+ checkFlushException();
}
public static Builder builder() {
@@ -348,5 +347,4 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatchStat
static JdbcStatementBuilder<Row> createRowJdbcStatementBuilder(int[] types) {
return (st, record) -> setRecordToStatement(st, types, record);
}
-
}
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/TableJdbcUpsertOutputFormat.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/TableJdbcUpsertOutputFormat.java
index 1865ebc..4afea82 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/TableJdbcUpsertOutputFormat.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/TableJdbcUpsertOutputFormat.java
@@ -83,7 +83,9 @@ class TableJdbcUpsertOutputFormat extends JdbcBatchingOutputFormat<Tuple2<Boolea
super.close();
} finally {
try {
- deleteExecutor.closeStatements();
+ if (deleteExecutor != null){
+ deleteExecutor.closeStatements();
+ }
} catch (SQLException e) {
LOG.warn("unable to close delete statement runner", e);
}
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
index f2aec19..15e6425 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.connector.jdbc.JdbcDataTestBase;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcInputFormat;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.JdbcTestFixture;
import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
@@ -43,6 +44,7 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
+import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.function.Function;
@@ -57,8 +59,10 @@ import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
import static org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
/**
* Tests using both {@link JdbcInputFormat} and {@link JdbcBatchingOutputFormat}.
@@ -104,6 +108,49 @@ public class JdbcFullTest extends JdbcDataTestBase {
}
}
+ @Test
+ public void testJdbcBatchingOutputFormatCloseDuringRuntime() throws Exception{
+ JdbcOptions options = JdbcOptions.builder()
+ .setDBUrl(getDbMetadata().getUrl())
+ .setTableName(OUTPUT_TABLE)
+ .build();
+ // use scheduledThreadPool
+ JdbcExecutionOptions jdbcExecutionOptions = JdbcExecutionOptions.builder()
+ .withBatchIntervalMs(1000_000L)
+ .withBatchSize(2)
+ .withMaxRetries(1)
+ .build();
+ ExecutionConfig executionConfig = new ExecutionConfig();
+
+ RuntimeContext context = Mockito.mock(RuntimeContext.class);
+ JdbcBatchStatementExecutor executor = Mockito.mock(JdbcBatchStatementExecutor.class);
+
+ doReturn(executionConfig).when(context).getExecutionConfig();
+ // always throw Exception to trigger close() method
+ doThrow(SQLException.class).when(executor).executeBatch();
+
+ JdbcBatchingOutputFormat<Tuple2<Boolean, Row>, Row, JdbcBatchStatementExecutor<Row>> format =
+ new JdbcBatchingOutputFormat<>(
+ new SimpleJdbcConnectionProvider(options),
+ jdbcExecutionOptions,
+ (ctx) -> executor,
+ (tuple2) -> tuple2.f1);
+
+ format.setRuntimeContext(context);
+ format.open(0, 1);
+
+ try {
+ for (JdbcTestFixture.TestEntry entry : TEST_DATA) {
+ format.writeRecord(Tuple2.of(true, toRow(entry)));
+ }
+ } catch (Exception e) {
+ // artifact failure
+ format.close();
+ } finally {
+ assertNull(format.getConnection());
+ }
+ }
+
private void runTest(boolean exploitParallelism) throws Exception {
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
JdbcInputFormat.JdbcInputFormatBuilder inputBuilder = JdbcInputFormat.buildJdbcInputFormat()
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
index 2d67a4d..677db91 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
@@ -65,6 +65,20 @@ public class JdbcTableOutputFormatTest extends JdbcDataTestBase {
}
@Test
+ public void testUpsertFormatCloseBeforeOpen() throws Exception{
+ JdbcOptions options = JdbcOptions.builder()
+ .setDBUrl(getDbMetadata().getUrl())
+ .setTableName(OUTPUT_TABLE)
+ .build();
+ JdbcDmlOptions dmlOptions = JdbcDmlOptions.builder()
+ .withTableName(options.getTableName()).withDialect(options.getDialect())
+ .withFieldNames(fieldNames).withKeyFields(keyFields).build();
+ format = new TableJdbcUpsertOutputFormat(new SimpleJdbcConnectionProvider(options), dmlOptions, JdbcExecutionOptions.defaults());
+ // FLINK-17544: There should be no NPE thrown from this method
+ format.close();
+ }
+
+ @Test
public void testJdbcOutputFormat() throws Exception {
JdbcOptions options = JdbcOptions.builder()
.setDBUrl(getDbMetadata().getUrl())