You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/09/21 11:11:36 UTC
[flink] branch release-1.11 updated: [FLINK-19280][jdbc] Fix option
"sink.buffer-flush.max-rows" for JDBC can't be disabled by setting to zero
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 3768dca [FLINK-19280][jdbc] Fix option "sink.buffer-flush.max-rows" for JDBC can't be disabled by setting to zero
3768dca is described below
commit 3768dcaf7c2937ec03589747e6cd14734ceddee0
Author: Jark Wu <ja...@apache.org>
AuthorDate: Mon Sep 21 19:10:56 2020 +0800
[FLINK-19280][jdbc] Fix option "sink.buffer-flush.max-rows" for JDBC can't be disabled by setting to zero
This closes #13433
Co-authored-by: dalong01.liu <da...@vipshop.com>
---
.../jdbc/internal/JdbcBatchingOutputFormat.java | 2 +-
.../jdbc/table/JdbcDynamicOutputFormatTest.java | 46 ++++++++++++++++++++++
2 files changed, 47 insertions(+), 1 deletion(-)
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java
index c7afc31..5a58154 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java
@@ -150,7 +150,7 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatchStat
try {
addToBatch(record, jdbcRecordExtractor.apply(record));
batchCount++;
- if (batchCount >= executionOptions.getBatchSize()) {
+ if (executionOptions.getBatchSize() > 0 && batchCount >= executionOptions.getBatchSize()) {
flush();
}
} catch (Exception e) {
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicOutputFormatTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicOutputFormatTest.java
index 24cf8c6..9d00fd8 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicOutputFormatTest.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicOutputFormatTest.java
@@ -363,6 +363,52 @@ public class JdbcDynamicOutputFormatTest extends JdbcDataTestBase {
}
@Test
+ public void testFlushWithBatchSizeEqualsZero() throws SQLException, IOException {
+ JdbcOptions jdbcOptions = JdbcOptions.builder()
+ .setDriverName(DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
+ .setTableName(OUTPUT_TABLE_2)
+ .build();
+ JdbcDmlOptions dmlOptions = JdbcDmlOptions.builder()
+ .withTableName(jdbcOptions.getTableName())
+ .withDialect(jdbcOptions.getDialect())
+ .withFieldNames(fieldNames)
+ .build();
+ JdbcExecutionOptions executionOptions = JdbcExecutionOptions.builder()
+ .withBatchSize(0)
+ .build();
+
+ outputFormat = new JdbcDynamicOutputFormatBuilder()
+ .setJdbcOptions(jdbcOptions)
+ .setFieldDataTypes(fieldDataTypes)
+ .setJdbcDmlOptions(dmlOptions)
+ .setJdbcExecutionOptions(executionOptions)
+ .setRowDataTypeInfo(rowDataTypeInfo)
+ .build();
+ setRuntimeContext(outputFormat, true);
+
+ try (
+ Connection dbConn = DriverManager.getConnection(DERBY_EBOOKSHOP_DB.getUrl());
+ PreparedStatement statement = dbConn.prepareStatement(SELECT_ALL_NEWBOOKS_2)
+ ) {
+ outputFormat.open(0, 1);
+ for (int i = 0; i < 2; ++i) {
+ outputFormat.writeRecord(buildGenericData(
+ TEST_DATA[i].id,
+ TEST_DATA[i].title,
+ TEST_DATA[i].author,
+ TEST_DATA[i].price,
+ TEST_DATA[i].qty));
+ }
+ try (ResultSet resultSet = statement.executeQuery()) {
+ assertFalse(resultSet.next());
+ }
+ } finally {
+ outputFormat.close();
+ }
+ }
+
+ @Test
public void testInvalidConnectionInJdbcOutputFormat() throws IOException, SQLException {
JdbcOptions jdbcOptions = JdbcOptions.builder()
.setDriverName(DERBY_EBOOKSHOP_DB.getDriverClass())