You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/22 08:18:35 UTC

[flink] branch release-1.11 updated: [FLINK-17544][jdbc] Fix NPE and resource leak problem in JdbcOutputFormat

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 4bf1ed2  [FLINK-17544][jdbc] Fix NPE and resource leak problem in JdbcOutputFormat
4bf1ed2 is described below

commit 4bf1ed2ac4006ee5c4af54c028fe4eede07ffcc6
Author: Shengkai <33...@users.noreply.github.com>
AuthorDate: Mon Jun 22 14:15:32 2020 +0800

    [FLINK-17544][jdbc] Fix NPE and resource leak problem in JdbcOutputFormat
    
    This closes #12712
---
 .../jdbc/internal/JdbcBatchingOutputFormat.java    |  6 +--
 .../jdbc/internal/TableJdbcUpsertOutputFormat.java |  4 +-
 .../connector/jdbc/internal/JdbcFullTest.java      | 47 ++++++++++++++++++++++
 .../jdbc/internal/JdbcTableOutputFormatTest.java   | 14 +++++++
 4 files changed, 66 insertions(+), 5 deletions(-)

diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java
index af41ed1..c7afc31 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java
@@ -209,8 +209,6 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatchStat
 		if (!closed) {
 			closed = true;
 
-			checkFlushException();
-
 			if (this.scheduledFuture != null) {
 				scheduledFuture.cancel(false);
 				this.scheduler.shutdown();
@@ -220,7 +218,7 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatchStat
 				try {
 					flush();
 				} catch (Exception e) {
-					throw new RuntimeException("Writing records to JDBC failed.", e);
+					LOG.warn("Writing records to JDBC failed.", e);
 				}
 			}
 
@@ -233,6 +231,7 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatchStat
 			}
 		}
 		super.close();
+		checkFlushException();
 	}
 
 	public static Builder builder() {
@@ -348,5 +347,4 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatchStat
 	static JdbcStatementBuilder<Row> createRowJdbcStatementBuilder(int[] types) {
 		return (st, record) -> setRecordToStatement(st, types, record);
 	}
-
 }
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/TableJdbcUpsertOutputFormat.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/TableJdbcUpsertOutputFormat.java
index 1865ebc..4afea82 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/TableJdbcUpsertOutputFormat.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/TableJdbcUpsertOutputFormat.java
@@ -83,7 +83,9 @@ class TableJdbcUpsertOutputFormat extends JdbcBatchingOutputFormat<Tuple2<Boolea
 			super.close();
 		} finally {
 			try {
-				deleteExecutor.closeStatements();
+				if (deleteExecutor != null){
+					deleteExecutor.closeStatements();
+				}
 			} catch (SQLException e) {
 				LOG.warn("unable to close delete statement runner", e);
 			}
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
index f2aec19..15e6425 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.connector.jdbc.JdbcDataTestBase;
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
 import org.apache.flink.connector.jdbc.JdbcInputFormat;
 import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.JdbcTestFixture;
 import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
 import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
 import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
@@ -43,6 +44,7 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
 import java.util.function.Function;
@@ -57,8 +59,10 @@ import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
 import static org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 
 /**
  * Tests using both {@link JdbcInputFormat} and {@link JdbcBatchingOutputFormat}.
@@ -104,6 +108,49 @@ public class JdbcFullTest extends JdbcDataTestBase {
 		}
 	}
 
+	@Test
+	public void testJdbcBatchingOutputFormatCloseDuringRuntime() throws Exception{
+		JdbcOptions options = JdbcOptions.builder()
+			.setDBUrl(getDbMetadata().getUrl())
+			.setTableName(OUTPUT_TABLE)
+			.build();
+		// use scheduledThreadPool
+		JdbcExecutionOptions jdbcExecutionOptions = JdbcExecutionOptions.builder()
+			.withBatchIntervalMs(1000_000L)
+			.withBatchSize(2)
+			.withMaxRetries(1)
+			.build();
+		ExecutionConfig executionConfig = new ExecutionConfig();
+
+		RuntimeContext context = Mockito.mock(RuntimeContext.class);
+		JdbcBatchStatementExecutor executor = Mockito.mock(JdbcBatchStatementExecutor.class);
+
+		doReturn(executionConfig).when(context).getExecutionConfig();
+		// always throw Exception to trigger close() method
+		doThrow(SQLException.class).when(executor).executeBatch();
+
+		JdbcBatchingOutputFormat<Tuple2<Boolean, Row>, Row, JdbcBatchStatementExecutor<Row>> format =
+			new JdbcBatchingOutputFormat<>(
+				new SimpleJdbcConnectionProvider(options),
+				jdbcExecutionOptions,
+				(ctx) -> executor,
+				(tuple2) -> tuple2.f1);
+
+		format.setRuntimeContext(context);
+		format.open(0, 1);
+
+		try {
+			for (JdbcTestFixture.TestEntry entry : TEST_DATA) {
+				format.writeRecord(Tuple2.of(true, toRow(entry)));
+			}
+		} catch (Exception e) {
+			// artifact failure
+			format.close();
+		} finally {
+			assertNull(format.getConnection());
+		}
+	}
+
 	private void runTest(boolean exploitParallelism) throws Exception {
 		ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
 		JdbcInputFormat.JdbcInputFormatBuilder inputBuilder = JdbcInputFormat.buildJdbcInputFormat()
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
index 2d67a4d..677db91 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
@@ -65,6 +65,20 @@ public class JdbcTableOutputFormatTest extends JdbcDataTestBase {
 	}
 
 	@Test
+	public void testUpsertFormatCloseBeforeOpen() throws Exception{
+		JdbcOptions options = JdbcOptions.builder()
+			.setDBUrl(getDbMetadata().getUrl())
+			.setTableName(OUTPUT_TABLE)
+			.build();
+		JdbcDmlOptions dmlOptions = JdbcDmlOptions.builder()
+			.withTableName(options.getTableName()).withDialect(options.getDialect())
+			.withFieldNames(fieldNames).withKeyFields(keyFields).build();
+		format = new TableJdbcUpsertOutputFormat(new SimpleJdbcConnectionProvider(options), dmlOptions, JdbcExecutionOptions.defaults());
+		// FLINK-17544: There should be no NPE thrown from this method
+		format.close();
+	}
+
+	@Test
 	public void testJdbcOutputFormat() throws Exception {
 		JdbcOptions options = JdbcOptions.builder()
 				.setDBUrl(getDbMetadata().getUrl())