You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/09/21 11:11:36 UTC

[flink] branch release-1.11 updated: [FLINK-19280][jdbc] Fix option "sink.buffer-flush.max-rows" for JDBC can't be disabled by setting to zero

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 3768dca  [FLINK-19280][jdbc] Fix option "sink.buffer-flush.max-rows" for JDBC can't be disabled by setting to zero
3768dca is described below

commit 3768dcaf7c2937ec03589747e6cd14734ceddee0
Author: Jark Wu <ja...@apache.org>
AuthorDate: Mon Sep 21 19:10:56 2020 +0800

    [FLINK-19280][jdbc] Fix option "sink.buffer-flush.max-rows" for JDBC can't be disabled by setting to zero
    
    This closes #13433
    
    Co-authored-by: dalong01.liu <da...@vipshop.com>
---
 .../jdbc/internal/JdbcBatchingOutputFormat.java    |  2 +-
 .../jdbc/table/JdbcDynamicOutputFormatTest.java    | 46 ++++++++++++++++++++++
 2 files changed, 47 insertions(+), 1 deletion(-)

diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java
index c7afc31..5a58154 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java
@@ -150,7 +150,7 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatchStat
 		try {
 			addToBatch(record, jdbcRecordExtractor.apply(record));
 			batchCount++;
-			if (batchCount >= executionOptions.getBatchSize()) {
+			if (executionOptions.getBatchSize() > 0 && batchCount >= executionOptions.getBatchSize()) {
 				flush();
 			}
 		} catch (Exception e) {
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicOutputFormatTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicOutputFormatTest.java
index 24cf8c6..9d00fd8 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicOutputFormatTest.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicOutputFormatTest.java
@@ -363,6 +363,52 @@ public class JdbcDynamicOutputFormatTest extends JdbcDataTestBase {
 	}
 
 	@Test
+	public void testFlushWithBatchSizeEqualsZero() throws SQLException, IOException {
+		JdbcOptions jdbcOptions = JdbcOptions.builder()
+			.setDriverName(DERBY_EBOOKSHOP_DB.getDriverClass())
+			.setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
+			.setTableName(OUTPUT_TABLE_2)
+			.build();
+		JdbcDmlOptions dmlOptions = JdbcDmlOptions.builder()
+			.withTableName(jdbcOptions.getTableName())
+			.withDialect(jdbcOptions.getDialect())
+			.withFieldNames(fieldNames)
+			.build();
+		JdbcExecutionOptions executionOptions = JdbcExecutionOptions.builder()
+			.withBatchSize(0)
+			.build();
+
+		outputFormat = new JdbcDynamicOutputFormatBuilder()
+			.setJdbcOptions(jdbcOptions)
+			.setFieldDataTypes(fieldDataTypes)
+			.setJdbcDmlOptions(dmlOptions)
+			.setJdbcExecutionOptions(executionOptions)
+			.setRowDataTypeInfo(rowDataTypeInfo)
+			.build();
+		setRuntimeContext(outputFormat, true);
+
+		try (
+			Connection dbConn = DriverManager.getConnection(DERBY_EBOOKSHOP_DB.getUrl());
+			PreparedStatement statement = dbConn.prepareStatement(SELECT_ALL_NEWBOOKS_2)
+		) {
+			outputFormat.open(0, 1);
+			for (int i = 0; i < 2; ++i) {
+				outputFormat.writeRecord(buildGenericData(
+					TEST_DATA[i].id,
+					TEST_DATA[i].title,
+					TEST_DATA[i].author,
+					TEST_DATA[i].price,
+					TEST_DATA[i].qty));
+			}
+			try (ResultSet resultSet = statement.executeQuery()) {
+				assertFalse(resultSet.next());
+			}
+		} finally {
+			outputFormat.close();
+		}
+	}
+
+	@Test
 	public void testInvalidConnectionInJdbcOutputFormat() throws IOException, SQLException {
 		JdbcOptions jdbcOptions = JdbcOptions.builder()
 			.setDriverName(DERBY_EBOOKSHOP_DB.getDriverClass())