You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/11 09:05:26 UTC

[flink] branch release-1.11 updated (671649e -> ea77c3f)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 671649e  [FLINK-18132][docs] Add documentation for the new CSV format
     new 18aa528  [hotfix][table-common] Fix TableSchemaUtils#getPhysicalSchema should keep the original constraint name
     new e0de5c1  [FLINK-16497][jdbc][table] Improve default flush strategy for new JDBC sink for better out-of-box
     new 7ecf7de  [FLINK-16496][hbase][table] Improve default flush strategy for new HBase sink for better out-of-box
     new ea77c3f  [FLINK-16495][elasticsearch][table] Improve default flush strategy for new Elasticsearch sink for better out-of-box

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/table/connectors/hbase.md                 |  13 +-
 docs/dev/table/connectors/hbase.zh.md              |  13 +-
 .../table/ElasticsearchConfiguration.java          |  19 +-
 .../elasticsearch/table/ElasticsearchOptions.java  |   6 +-
 .../elasticsearch/util/NoOpFailureHandler.java     |   9 +
 .../table/Elasticsearch6DynamicSink.java           |   6 +-
 .../table/Elasticsearch6DynamicSinkFactory.java    |  11 +-
 .../elasticsearch6/ElasticsearchSink.java          |  12 -
 .../Elasticsearch6DynamicSinkFactoryTest.java      |   6 +-
 .../table/Elasticsearch6DynamicSinkTest.java       |  29 ++
 .../table/Elasticsearch7DynamicSink.java           |   6 +-
 .../table/Elasticsearch7DynamicSinkFactory.java    |  11 +-
 .../Elasticsearch7DynamicSinkFactoryTest.java      |   6 +-
 .../table/Elasticsearch7DynamicSinkTest.java       |  29 ++
 .../connector/hbase/HBaseDynamicTableFactory.java  |  39 ++-
 .../connector/hbase/options/HBaseWriteOptions.java |  16 +-
 .../connector/hbase/sink/HBaseSinkFunction.java    |   6 +-
 .../hbase/HBaseDynamicTableFactoryTest.java        |  49 +++-
 .../flink/connector/jdbc/JdbcExecutionOptions.java |  20 ++
 .../jdbc/catalog/AbstractJdbcCatalog.java          |   4 +-
 .../connector/jdbc/catalog/PostgresCatalog.java    |  10 +-
 .../jdbc/internal/options/JdbcDmlOptions.java      |  25 ++
 ...nkFactory.java => JdbcDynamicTableFactory.java} |  39 +--
 .../org.apache.flink.table.factories.Factory       |   2 +-
 .../jdbc/table/JdbcDynamicTableFactoryTest.java    | 310 +++++++++++++++++++++
 .../jdbc/table/JdbcDynamicTableSinkITCase.java     |   2 +-
 .../apache/flink/table/utils/TableSchemaUtils.java |   4 +-
 27 files changed, 580 insertions(+), 122 deletions(-)
 rename flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/{JdbcDynamicTableSourceSinkFactory.java => JdbcDynamicTableFactory.java} (91%)
 create mode 100644 flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java


[flink] 04/04: [FLINK-16495][elasticsearch][table] Improve default flush strategy for new Elasticsearch sink for better out-of-box

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ea77c3fec0f28ed939f32e3fdfa23d9570847b92
Author: Jark Wu <ja...@apache.org>
AuthorDate: Tue Jun 9 10:47:15 2020 +0800

    [FLINK-16495][elasticsearch][table] Improve default flush strategy for new Elasticsearch sink for better out-of-box
    
    The default flush strategy for old Elasticsearch sink is no flush interval and 5MB buffered size and 1000 rows.
    The new default flush strategy for new Elasticsearch sink is '1s' flush interval and '1000' buffered rows and '2mb' buffered size.
    
    This closes #12536
---
 .../table/ElasticsearchConfiguration.java          | 19 ++++++++------
 .../elasticsearch/table/ElasticsearchOptions.java  |  6 ++---
 .../elasticsearch/util/NoOpFailureHandler.java     |  9 +++++++
 .../table/Elasticsearch6DynamicSink.java           |  6 ++---
 .../table/Elasticsearch6DynamicSinkFactory.java    | 11 +++++---
 .../elasticsearch6/ElasticsearchSink.java          | 12 ---------
 .../Elasticsearch6DynamicSinkFactoryTest.java      |  6 ++---
 .../table/Elasticsearch6DynamicSinkTest.java       | 29 ++++++++++++++++++++++
 .../table/Elasticsearch7DynamicSink.java           |  6 ++---
 .../table/Elasticsearch7DynamicSinkFactory.java    | 11 +++++---
 .../Elasticsearch7DynamicSinkFactoryTest.java      |  6 ++---
 .../table/Elasticsearch7DynamicSinkTest.java       | 29 ++++++++++++++++++++++
 12 files changed, 108 insertions(+), 42 deletions(-)

diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java
index 48b848c..6de8892 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.connectors.elasticsearch.table;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
@@ -81,16 +80,22 @@ class ElasticsearchConfiguration {
 		return config.get(ElasticsearchOptions.DOCUMENT_TYPE_OPTION);
 	}
 
-	public Optional<Integer> getBulkFlushMaxActions() {
-		return config.getOptional(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION);
+	public int getBulkFlushMaxActions() {
+		int maxActions = config.get(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION);
+		// convert 0 to -1, because Elasticsearch client use -1 to disable this configuration.
+		return maxActions == 0 ? -1 : maxActions;
 	}
 
-	public Optional<Integer> getBulkFlushMaxSize() {
-		return config.getOptional(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION).map(MemorySize::getMebiBytes);
+	public long getBulkFlushMaxByteSize() {
+		long maxSize = config.get(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION).getBytes();
+		// convert 0 to -1, because Elasticsearch client use -1 to disable this configuration.
+		return maxSize == 0 ? -1 : maxSize;
 	}
 
-	public Optional<Long> getBulkFlushInterval() {
-		return config.getOptional(BULK_FLUSH_INTERVAL_OPTION).map(Duration::toMillis);
+	public long getBulkFlushInterval() {
+		long interval = config.get(BULK_FLUSH_INTERVAL_OPTION).toMillis();
+		// convert 0 to -1, because Elasticsearch client use -1 to disable this configuration.
+		return interval == 0 ? -1 : interval;
 	}
 
 	public boolean isBulkFlushBackoffEnabled() {
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java
index 176414d..355f455 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java
@@ -84,17 +84,17 @@ public class ElasticsearchOptions {
 	public static final ConfigOption<Integer> BULK_FLUSH_MAX_ACTIONS_OPTION =
 		ConfigOptions.key("sink.bulk-flush.max-actions")
 			.intType()
-			.noDefaultValue()
+			.defaultValue(1000)
 			.withDescription("Maximum number of actions to buffer for each bulk request.");
 	public static final ConfigOption<MemorySize> BULK_FLASH_MAX_SIZE_OPTION =
 		ConfigOptions.key("sink.bulk-flush.max-size")
 			.memoryType()
-			.noDefaultValue()
+			.defaultValue(MemorySize.parse("2mb"))
 			.withDescription("Maximum size of buffered actions per bulk request");
 	public static final ConfigOption<Duration> BULK_FLUSH_INTERVAL_OPTION =
 		ConfigOptions.key("sink.bulk-flush.interval")
 			.durationType()
-			.noDefaultValue()
+			.defaultValue(Duration.ofSeconds(1))
 			.withDescription("Bulk flush interval");
 	public static final ConfigOption<BackOffType> BULK_FLUSH_BACKOFF_TYPE_OPTION =
 		ConfigOptions.key("sink.bulk-flush.backoff.strategy")
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
index dfcb9ee..4726dc1 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
@@ -37,4 +37,13 @@ public class NoOpFailureHandler implements ActionRequestFailureHandler {
 		throw failure;
 	}
 
+	@Override
+	public boolean equals(Object o) {
+		return o instanceof NoOpFailureHandler;
+	}
+
+	@Override
+	public int hashCode() {
+		return NoOpFailureHandler.class.hashCode();
+	}
 }
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
index 01b5f47..4e9a8f85 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
@@ -128,9 +128,9 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
 				upsertFunction);
 
 			builder.setFailureHandler(config.getFailureHandler());
-			config.getBulkFlushMaxActions().ifPresent(builder::setBulkFlushMaxActions);
-			config.getBulkFlushMaxSize().ifPresent(builder::setBulkFlushMaxSizeMb);
-			config.getBulkFlushInterval().ifPresent(builder::setBulkFlushInterval);
+			builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
+			builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20));
+			builder.setBulkFlushInterval(config.getBulkFlushInterval());
 			builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
 			config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
 			config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
index c5d9c89..071bbb6 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
@@ -109,17 +109,20 @@ public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory
 		validate(
 			config.getIndex().length() >= 1,
 			() -> String.format("'%s' must not be empty", INDEX_OPTION.key()));
+		int maxActions = config.getBulkFlushMaxActions();
 		validate(
-			config.getBulkFlushMaxActions().map(maxActions -> maxActions >= 1).orElse(true),
+			maxActions == -1 || maxActions >= 1,
 			() -> String.format(
 				"'%s' must be at least 1 character. Got: %s",
 				BULK_FLUSH_MAX_ACTIONS_OPTION.key(),
-				config.getBulkFlushMaxActions().get())
+				maxActions)
 		);
+		long maxSize = config.getBulkFlushMaxByteSize();
+		long mb1 = 1024 * 1024;
 		validate(
-			config.getBulkFlushMaxSize().map(maxSize -> maxSize >= 1024 * 1024).orElse(true),
+			maxSize == -1 || (maxSize >= mb1 && maxSize % mb1 == 0),
 			() -> String.format(
-				"'%s' must be at least 1mb character. Got: %s",
+				"'%s' must be in MB granularity. Got: %s",
 				BULK_FLASH_MAX_SIZE_OPTION.key(),
 				originalConfiguration.get(BULK_FLASH_MAX_SIZE_OPTION).toHumanReadableString())
 		);
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
index 484e6f6..b847613 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
@@ -104,10 +104,6 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
 		 * @param numMaxActions the maxinum number of actions to buffer per bulk request.
 		 */
 		public void setBulkFlushMaxActions(int numMaxActions) {
-			Preconditions.checkArgument(
-				numMaxActions > 0,
-				"Max number of buffered actions must be larger than 0.");
-
 			this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, String.valueOf(numMaxActions));
 		}
 
@@ -117,10 +113,6 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
 		 * @param maxSizeMb the maximum size of buffered actions, in mb.
 		 */
 		public void setBulkFlushMaxSizeMb(int maxSizeMb) {
-			Preconditions.checkArgument(
-				maxSizeMb > 0,
-				"Max size of buffered actions must be larger than 0.");
-
 			this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, String.valueOf(maxSizeMb));
 		}
 
@@ -130,10 +122,6 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
 		 * @param intervalMillis the bulk flush interval, in milliseconds.
 		 */
 		public void setBulkFlushInterval(long intervalMillis) {
-			Preconditions.checkArgument(
-				intervalMillis >= 0,
-				"Interval (in milliseconds) between each flush must be larger than or equal to 0.");
-
 			this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, String.valueOf(intervalMillis));
 		}
 
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java
index 6d0878f..143b712 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java
@@ -102,7 +102,7 @@ public class Elasticsearch6DynamicSinkFactoryTest {
 
 		thrown.expect(ValidationException.class);
 		thrown.expectMessage(
-			"'sink.bulk-flush.max-size' must be at least 1mb character. Got: 1024 bytes");
+			"'sink.bulk-flush.max-size' must be in MB granularity. Got: 1024 bytes");
 		sinkFactory.createDynamicTableSink(
 			context()
 				.withSchema(TableSchema.builder()
@@ -142,7 +142,7 @@ public class Elasticsearch6DynamicSinkFactoryTest {
 
 		thrown.expect(ValidationException.class);
 		thrown.expectMessage(
-			"'sink.bulk-flush.max-actions' must be at least 1 character. Got: 0");
+			"'sink.bulk-flush.max-actions' must be at least 1 character. Got: -2");
 		sinkFactory.createDynamicTableSink(
 			context()
 				.withSchema(TableSchema.builder()
@@ -151,7 +151,7 @@ public class Elasticsearch6DynamicSinkFactoryTest {
 				.withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex")
 				.withOption(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), "MyType")
 				.withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234")
-				.withOption(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION.key(), "0")
+				.withOption(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION.key(), "-2")
 				.build()
 		);
 	}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java
index 1708efc..36d8f6d 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
@@ -41,6 +42,7 @@ import org.mockito.Mockito;
 import java.util.List;
 
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 
 /**
@@ -85,6 +87,33 @@ public class Elasticsearch6DynamicSinkTest {
 		verify(provider.sinkSpy).disableFlushOnCheckpoint();
 	}
 
+	@Test
+	public void testDefaultConfig() {
+		final TableSchema schema = createTestSchema();
+		Configuration configuration = new Configuration();
+		configuration.setString(ElasticsearchOptions.INDEX_OPTION.key(), INDEX);
+		configuration.setString(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), DOC_TYPE);
+		configuration.setString(ElasticsearchOptions.HOSTS_OPTION.key(), SCHEMA + "://" + HOSTNAME + ":" + PORT);
+
+		BuilderProvider provider = new BuilderProvider();
+		final Elasticsearch6DynamicSink testSink = new Elasticsearch6DynamicSink(
+			new DummyEncodingFormat(),
+			new Elasticsearch6Configuration(configuration, this.getClass().getClassLoader()),
+			schema,
+			provider
+		);
+
+		testSink.getSinkRuntimeProvider(new MockSinkContext()).createSinkFunction();
+
+		verify(provider.builderSpy).setFailureHandler(new NoOpFailureHandler());
+		verify(provider.builderSpy).setBulkFlushBackoff(false);
+		verify(provider.builderSpy).setBulkFlushInterval(1000);
+		verify(provider.builderSpy).setBulkFlushMaxActions(1000);
+		verify(provider.builderSpy).setBulkFlushMaxSizeMb(2);
+		verify(provider.builderSpy).setRestClientFactory(new Elasticsearch6DynamicSink.DefaultRestClientFactory(null));
+		verify(provider.sinkSpy, never()).disableFlushOnCheckpoint();
+	}
+
 	private Configuration getConfig() {
 		Configuration configuration = new Configuration();
 		configuration.setString(ElasticsearchOptions.INDEX_OPTION.key(), INDEX);
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
index 7aa52ea..2213ce8 100644
--- a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
+++ b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
@@ -128,9 +128,9 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
 				upsertFunction);
 
 			builder.setFailureHandler(config.getFailureHandler());
-			config.getBulkFlushMaxActions().ifPresent(builder::setBulkFlushMaxActions);
-			config.getBulkFlushMaxSize().ifPresent(builder::setBulkFlushMaxSizeMb);
-			config.getBulkFlushInterval().ifPresent(builder::setBulkFlushInterval);
+			builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
+			builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20));
+			builder.setBulkFlushInterval(config.getBulkFlushInterval());
 			builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
 			config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
 			config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java
index 320c894..7ee2cfb 100644
--- a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java
+++ b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java
@@ -108,17 +108,20 @@ public class Elasticsearch7DynamicSinkFactory implements DynamicTableSinkFactory
 		validate(
 			config.getIndex().length() >= 1,
 			() -> String.format("'%s' must not be empty", INDEX_OPTION.key()));
+		int maxActions = config.getBulkFlushMaxActions();
 		validate(
-			config.getBulkFlushMaxActions().map(maxActions -> maxActions >= 1).orElse(true),
+			maxActions == -1 || maxActions >= 1,
 			() -> String.format(
 				"'%s' must be at least 1 character. Got: %s",
 				BULK_FLUSH_MAX_ACTIONS_OPTION.key(),
-				config.getBulkFlushMaxActions().get())
+				maxActions)
 		);
+		long maxSize = config.getBulkFlushMaxByteSize();
+		long mb1 = 1024 * 1024;
 		validate(
-			config.getBulkFlushMaxSize().map(maxSize -> maxSize >= 1024 * 1024).orElse(true),
+			maxSize == -1 || (maxSize >= mb1 && maxSize % mb1 == 0),
 			() -> String.format(
-				"'%s' must be at least 1mb character. Got: %s",
+				"'%s' must be in MB granularity. Got: %s",
 				BULK_FLASH_MAX_SIZE_OPTION.key(),
 				originalConfiguration.get(BULK_FLASH_MAX_SIZE_OPTION).toHumanReadableString())
 		);
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactoryTest.java b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactoryTest.java
index 4fe3214..7189163 100644
--- a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactoryTest.java
+++ b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactoryTest.java
@@ -99,7 +99,7 @@ public class Elasticsearch7DynamicSinkFactoryTest {
 
 		thrown.expect(ValidationException.class);
 		thrown.expectMessage(
-			"'sink.bulk-flush.max-size' must be at least 1mb character. Got: 1024 bytes");
+			"'sink.bulk-flush.max-size' must be in MB granularity. Got: 1024 bytes");
 		sinkFactory.createDynamicTableSink(
 			context()
 				.withSchema(TableSchema.builder()
@@ -137,7 +137,7 @@ public class Elasticsearch7DynamicSinkFactoryTest {
 
 		thrown.expect(ValidationException.class);
 		thrown.expectMessage(
-			"'sink.bulk-flush.max-actions' must be at least 1 character. Got: 0");
+			"'sink.bulk-flush.max-actions' must be at least 1 character. Got: -2");
 		sinkFactory.createDynamicTableSink(
 			context()
 				.withSchema(TableSchema.builder()
@@ -145,7 +145,7 @@ public class Elasticsearch7DynamicSinkFactoryTest {
 					.build())
 				.withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex")
 				.withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234")
-				.withOption(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION.key(), "0")
+				.withOption(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION.key(), "-2")
 				.build()
 		);
 	}
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java
index c972cee..36c3135 100644
--- a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java
+++ b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
@@ -41,6 +42,7 @@ import org.mockito.Mockito;
 import java.util.List;
 
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 
 /**
@@ -85,6 +87,33 @@ public class Elasticsearch7DynamicSinkTest {
 		verify(provider.sinkSpy).disableFlushOnCheckpoint();
 	}
 
+	@Test
+	public void testDefaultConfig() {
+		final TableSchema schema = createTestSchema();
+		Configuration configuration = new Configuration();
+		configuration.setString(ElasticsearchOptions.INDEX_OPTION.key(), INDEX);
+		configuration.setString(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), DOC_TYPE);
+		configuration.setString(ElasticsearchOptions.HOSTS_OPTION.key(), SCHEMA + "://" + HOSTNAME + ":" + PORT);
+
+		BuilderProvider provider = new BuilderProvider();
+		final Elasticsearch7DynamicSink testSink = new Elasticsearch7DynamicSink(
+			new DummyEncodingFormat(),
+			new Elasticsearch7Configuration(configuration, this.getClass().getClassLoader()),
+			schema,
+			provider
+		);
+
+		testSink.getSinkRuntimeProvider(new MockSinkContext()).createSinkFunction();
+
+		verify(provider.builderSpy).setFailureHandler(new NoOpFailureHandler());
+		verify(provider.builderSpy).setBulkFlushBackoff(false);
+		verify(provider.builderSpy).setBulkFlushInterval(1000);
+		verify(provider.builderSpy).setBulkFlushMaxActions(1000);
+		verify(provider.builderSpy).setBulkFlushMaxSizeMb(2);
+		verify(provider.builderSpy).setRestClientFactory(new Elasticsearch7DynamicSink.DefaultRestClientFactory(null));
+		verify(provider.sinkSpy, never()).disableFlushOnCheckpoint();
+	}
+
 	private Configuration getConfig() {
 		Configuration configuration = new Configuration();
 		configuration.setString(ElasticsearchOptions.INDEX_OPTION.key(), INDEX);


[flink] 02/04: [FLINK-16497][jdbc][table] Improve default flush strategy for new JDBC sink for better out-of-box

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e0de5c14d5ad14dfbf8679d400686296ff16d799
Author: Jark Wu <ja...@apache.org>
AuthorDate: Mon Jun 8 18:48:37 2020 +0800

    [FLINK-16497][jdbc][table] Improve default flush strategy for new JDBC sink for better out-of-box
    
    The default flush strategy for old JDBC sink is no flush interval and 5000 buffered rows.
    The new default flush strategy for new JDBC sink is '1s' flush interval and '100' buffered rows.
    
    This closes #12536
---
 .../flink/connector/jdbc/JdbcExecutionOptions.java |  20 ++
 .../jdbc/catalog/AbstractJdbcCatalog.java          |   4 +-
 .../connector/jdbc/catalog/PostgresCatalog.java    |  10 +-
 .../jdbc/internal/options/JdbcDmlOptions.java      |  25 ++
 ...nkFactory.java => JdbcDynamicTableFactory.java} |  39 +--
 .../org.apache.flink.table.factories.Factory       |   2 +-
 .../jdbc/table/JdbcDynamicTableFactoryTest.java    | 310 +++++++++++++++++++++
 .../jdbc/table/JdbcDynamicTableSinkITCase.java     |   2 +-
 8 files changed, 384 insertions(+), 28 deletions(-)

diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.java
index d12706d..43b31f8 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 /**
  * JDBC sink batch options.
@@ -54,6 +55,25 @@ public class JdbcExecutionOptions implements Serializable {
 		return maxRetries;
 	}
 
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		JdbcExecutionOptions that = (JdbcExecutionOptions) o;
+		return batchIntervalMs == that.batchIntervalMs &&
+			batchSize == that.batchSize &&
+			maxRetries == that.maxRetries;
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(batchIntervalMs, batchSize, maxRetries);
+	}
+
 	public static Builder builder() {
 		return new Builder();
 	}
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
index 8e2f253..a00470d 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.connector.jdbc.catalog;
 
-import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceSinkFactory;
+import org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.constraints.UniqueConstraint;
 import org.apache.flink.table.catalog.AbstractCatalog;
@@ -153,7 +153,7 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog {
 
 	@Override
 	public Optional<Factory> getFactory() {
-		return Optional.of(new JdbcDynamicTableSourceSinkFactory());
+		return Optional.of(new JdbcDynamicTableFactory());
 	}
 
 	// ------ databases ------
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
index 0410d82..88263f3 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
@@ -52,11 +52,11 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
-import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceSinkFactory.IDENTIFIER;
-import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceSinkFactory.PASSWORD;
-import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceSinkFactory.TABLE_NAME;
-import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceSinkFactory.URL;
-import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceSinkFactory.USERNAME;
+import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER;
+import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.PASSWORD;
+import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.TABLE_NAME;
+import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.URL;
+import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.USERNAME;
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
 
 /**
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcDmlOptions.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcDmlOptions.java
index 367b3df..f8281b2 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcDmlOptions.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcDmlOptions.java
@@ -22,6 +22,8 @@ import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
+import java.util.Arrays;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Stream;
 
@@ -66,6 +68,29 @@ public class JdbcDmlOptions extends JdbcTypedQueryOptions {
 		return Optional.ofNullable(keyFields);
 	}
 
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		JdbcDmlOptions that = (JdbcDmlOptions) o;
+		return Arrays.equals(fieldNames, that.fieldNames) &&
+			Arrays.equals(keyFields, that.keyFields) &&
+			Objects.equals(tableName, that.tableName) &&
+			Objects.equals(dialect, that.dialect);
+	}
+
+	@Override
+	public int hashCode() {
+		int result = Objects.hash(tableName, dialect);
+		result = 31 * result + Arrays.hashCode(fieldNames);
+		result = 31 * result + Arrays.hashCode(keyFields);
+		return result;
+	}
+
 	/**
 	 * Builder for {@link JdbcDmlOptions}.
 	 */
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
similarity index 91%
rename from flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java
rename to flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
index 930a1b0..f5fd9a7 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
@@ -51,7 +51,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * and {@link JdbcDynamicTableSink}.
  */
 @Internal
-public class JdbcDynamicTableSourceSinkFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
 
 	public static final String IDENTIFIER = "jdbc";
 	public static final ConfigOption<String> URL = ConfigOptions
@@ -133,15 +133,15 @@ public class JdbcDynamicTableSourceSinkFactory implements DynamicTableSourceFact
 	private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions
 		.key("sink.buffer-flush.max-rows")
 		.intType()
-		.defaultValue(5000)
+		.defaultValue(100)
 		.withDescription("the flush max size (includes all append, upsert and delete records), over this number" +
-			" of records, will flush data. The default value is 5000.");
-	private static final ConfigOption<Long> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions
+			" of records, will flush data. The default value is 100.");
+	private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions
 		.key("sink.buffer-flush.interval")
-		.longType()
-		.defaultValue(0L)
+		.durationType()
+		.defaultValue(Duration.ofSeconds(1))
 		.withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " +
-			"default value is 0, which means no asynchronous flush thread will be scheduled.");
+			"default value is 1s.");
 	private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions
 		.key("sink.max-retries")
 		.intType()
@@ -216,7 +216,7 @@ public class JdbcDynamicTableSourceSinkFactory implements DynamicTableSourceFact
 	private JdbcExecutionOptions getJdbcExecutionOptions(ReadableConfig config) {
 		final JdbcExecutionOptions.Builder builder = new JdbcExecutionOptions.Builder();
 		builder.withBatchSize(config.get(SINK_BUFFER_FLUSH_MAX_ROWS));
-		builder.withBatchIntervalMs(config.get(SINK_BUFFER_FLUSH_INTERVAL));
+		builder.withBatchIntervalMs(config.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
 		builder.withMaxRetries(config.get(SINK_MAX_RETRIES));
 		return builder.build();
 	}
@@ -268,19 +268,14 @@ public class JdbcDynamicTableSourceSinkFactory implements DynamicTableSourceFact
 	}
 
 	private void validateConfigOptions(ReadableConfig config) {
-		config.getOptional(URL).orElseThrow(() -> new IllegalArgumentException(
-			String.format("Could not find required option: %s", URL.key())));
-		config.getOptional(TABLE_NAME).orElseThrow(() -> new IllegalArgumentException(
-			String.format("Could not find required option: %s", TABLE_NAME.key())));
-
 		String jdbcUrl = config.get(URL);
 		final Optional<JdbcDialect> dialect = JdbcDialects.get(jdbcUrl);
 		checkState(dialect.isPresent(), "Cannot handle such jdbc url: " + jdbcUrl);
 
-		if (config.getOptional(USERNAME).isPresent()) {
-			checkState(config.getOptional(PASSWORD).isPresent(),
-				"Database username must be provided when database password is provided");
-		}
+		checkAllOrNone(config, new ConfigOption[]{
+			USERNAME,
+			PASSWORD
+		});
 
 		checkAllOrNone(config, new ConfigOption[]{
 			SCAN_PARTITION_COLUMN,
@@ -291,8 +286,14 @@ public class JdbcDynamicTableSourceSinkFactory implements DynamicTableSourceFact
 
 		if (config.getOptional(SCAN_PARTITION_LOWER_BOUND).isPresent() &&
 			config.getOptional(SCAN_PARTITION_UPPER_BOUND).isPresent()) {
-			checkState(config.get(SCAN_PARTITION_LOWER_BOUND) <= config.get(SCAN_PARTITION_UPPER_BOUND),
-				String.format("%s must not be larger than %s", SCAN_PARTITION_LOWER_BOUND.key(), SCAN_PARTITION_UPPER_BOUND.key()));
+			long lowerBound = config.get(SCAN_PARTITION_LOWER_BOUND);
+			long upperBound = config.get(SCAN_PARTITION_UPPER_BOUND);
+			if (lowerBound > upperBound) {
+				throw new IllegalArgumentException(String.format(
+					"'%s'='%s' must not be larger than '%s'='%s'.",
+					SCAN_PARTITION_LOWER_BOUND.key(), lowerBound,
+					SCAN_PARTITION_UPPER_BOUND.key(), upperBound));
+			}
 		}
 
 		checkAllOrNone(config, new ConfigOption[]{
diff --git a/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 9fb9fe0..86713a6 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceSinkFactory
+org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java
new file mode 100644
index 0000000..93c90f7
--- /dev/null
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Test for {@link JdbcTableSource} and {@link JdbcUpsertTableSink} created
+ * by {@link JdbcTableSourceSinkFactory}.
+ */
+public class JdbcDynamicTableFactoryTest {
+
+	private static final TableSchema schema = TableSchema.builder()
+		.field("aaa", DataTypes.INT().notNull())
+		.field("bbb", DataTypes.STRING().notNull())
+		.field("ccc", DataTypes.DOUBLE())
+		.field("ddd", DataTypes.DECIMAL(31, 18))
+		.field("eee", DataTypes.TIMESTAMP(3))
+		.primaryKey("bbb", "aaa")
+		.build();
+
+	@Test
+	public void testJdbcCommonProperties() {
+		Map<String, String> properties = getAllOptions();
+		properties.put("driver", "org.apache.derby.jdbc.EmbeddedDriver");
+		properties.put("username", "user");
+		properties.put("password", "pass");
+
+		// validation for source
+		DynamicTableSource actualSource = createTableSource(properties);
+		JdbcOptions options = JdbcOptions.builder()
+			.setDBUrl("jdbc:derby:memory:mydb")
+			.setTableName("mytable")
+			.setDriverName("org.apache.derby.jdbc.EmbeddedDriver")
+			.setUsername("user")
+			.setPassword("pass")
+			.build();
+		JdbcLookupOptions lookupOptions = JdbcLookupOptions.builder()
+			.setCacheMaxSize(-1)
+			.setCacheExpireMs(10_000)
+			.setMaxRetryTimes(3)
+			.build();
+		JdbcDynamicTableSource expectedSource = new JdbcDynamicTableSource(
+			options,
+			JdbcReadOptions.builder().build(),
+			lookupOptions,
+			schema);
+		assertEquals(expectedSource, actualSource);
+
+		// validation for sink
+		DynamicTableSink actualSink = createTableSink(properties);
+		// default flush configurations
+		JdbcExecutionOptions executionOptions = JdbcExecutionOptions.builder()
+			.withBatchSize(100)
+			.withBatchIntervalMs(1000)
+			.withMaxRetries(3)
+			.build();
+		JdbcDmlOptions dmlOptions = JdbcDmlOptions.builder()
+			.withTableName(options.getTableName())
+			.withDialect(options.getDialect())
+			.withFieldNames(schema.getFieldNames())
+			.withKeyFields("bbb", "aaa")
+			.build();
+		JdbcDynamicTableSink expectedSink = new JdbcDynamicTableSink(
+			options,
+			executionOptions,
+			dmlOptions,
+			schema);
+		assertEquals(expectedSink, actualSink);
+	}
+
+	@Test
+	public void testJdbcReadProperties() {
+		Map<String, String> properties = getAllOptions();
+		properties.put("scan.partition.column", "aaa");
+		properties.put("scan.partition.lower-bound", "-10");
+		properties.put("scan.partition.upper-bound", "100");
+		properties.put("scan.partition.num", "10");
+		properties.put("scan.fetch-size", "20");
+
+		DynamicTableSource actual = createTableSource(properties);
+
+		JdbcOptions options = JdbcOptions.builder()
+			.setDBUrl("jdbc:derby:memory:mydb")
+			.setTableName("mytable")
+			.build();
+		JdbcReadOptions readOptions = JdbcReadOptions.builder()
+			.setPartitionColumnName("aaa")
+			.setPartitionLowerBound(-10)
+			.setPartitionUpperBound(100)
+			.setNumPartitions(10)
+			.setFetchSize(20)
+			.build();
+		JdbcLookupOptions lookupOptions = JdbcLookupOptions.builder()
+			.setCacheMaxSize(-1)
+			.setCacheExpireMs(10_000)
+			.setMaxRetryTimes(3)
+			.build();
+		JdbcDynamicTableSource expected = new JdbcDynamicTableSource(
+			options,
+			readOptions,
+			lookupOptions,
+			schema);
+
+		assertEquals(expected, actual);
+	}
+
+	@Test
+	public void testJdbcLookupProperties() {
+		Map<String, String> properties = getAllOptions();
+		properties.put("lookup.cache.max-rows", "1000");
+		properties.put("lookup.cache.ttl", "10s");
+		properties.put("lookup.max-retries", "10");
+
+		DynamicTableSource actual = createTableSource(properties);
+
+		JdbcOptions options = JdbcOptions.builder()
+			.setDBUrl("jdbc:derby:memory:mydb")
+			.setTableName("mytable")
+			.build();
+		JdbcLookupOptions lookupOptions = JdbcLookupOptions.builder()
+			.setCacheMaxSize(1000)
+			.setCacheExpireMs(10_000)
+			.setMaxRetryTimes(10)
+			.build();
+		JdbcDynamicTableSource expected = new JdbcDynamicTableSource(
+			options,
+			JdbcReadOptions.builder().build(),
+			lookupOptions,
+			schema);
+
+		assertEquals(expected, actual);
+	}
+
+	@Test
+	public void testJdbcSinkProperties() {
+		Map<String, String> properties = getAllOptions();
+		properties.put("sink.buffer-flush.max-rows", "1000");
+		properties.put("sink.buffer-flush.interval", "2min");
+		properties.put("sink.max-retries", "5");
+
+		DynamicTableSink actual = createTableSink(properties);
+
+		JdbcOptions options = JdbcOptions.builder()
+			.setDBUrl("jdbc:derby:memory:mydb")
+			.setTableName("mytable")
+			.build();
+		JdbcExecutionOptions executionOptions = JdbcExecutionOptions.builder()
+			.withBatchSize(1000)
+			.withBatchIntervalMs(120_000)
+			.withMaxRetries(5)
+			.build();
+		JdbcDmlOptions dmlOptions = JdbcDmlOptions.builder()
+			.withTableName(options.getTableName())
+			.withDialect(options.getDialect())
+			.withFieldNames(schema.getFieldNames())
+			.withKeyFields("bbb", "aaa")
+			.build();
+
+		JdbcDynamicTableSink expected = new JdbcDynamicTableSink(
+			options,
+			executionOptions,
+			dmlOptions,
+			schema);
+
+		assertEquals(expected, actual);
+	}
+
+	@Test
+	public void testJdbcValidation() {
+		// only password, no username
+		try {
+			Map<String, String> properties = getAllOptions();
+			properties.put("password", "pass");
+
+			createTableSource(properties);
+			fail("exception expected");
+		} catch (Throwable t) {
+			assertTrue(ExceptionUtils.findThrowableWithMessage(t,
+				"Either all or none of the following options should be provided:\n" +
+				"username\npassword").isPresent());
+		}
+
+		// read partition properties not complete
+		try {
+			Map<String, String> properties = getAllOptions();
+			properties.put("scan.partition.column", "aaa");
+			properties.put("scan.partition.lower-bound", "-10");
+			properties.put("scan.partition.upper-bound", "100");
+
+			createTableSource(properties);
+			fail("exception expected");
+		} catch (Throwable t) {
+			assertTrue(ExceptionUtils.findThrowableWithMessage(t,
+				"Either all or none of the following options should be provided:\n" +
+					"scan.partition.column\n" +
+					"scan.partition.num\n" +
+					"scan.partition.lower-bound\n" +
+					"scan.partition.upper-bound").isPresent());
+		}
+
+		// read partition lower-bound > upper-bound
+		try {
+			Map<String, String> properties = getAllOptions();
+			properties.put("scan.partition.column", "aaa");
+			properties.put("scan.partition.lower-bound", "100");
+			properties.put("scan.partition.upper-bound", "-10");
+			properties.put("scan.partition.num", "10");
+
+			createTableSource(properties);
+			fail("exception expected");
+		} catch (Throwable t) {
+			assertTrue(ExceptionUtils.findThrowableWithMessage(t,
+				"'scan.partition.lower-bound'='100' must not be larger than " +
+					"'scan.partition.upper-bound'='-10'.").isPresent());
+		}
+
+		// lookup cache properties not complete
+		try {
+			Map<String, String> properties = getAllOptions();
+			properties.put("lookup.cache.max-rows", "10");
+
+			createTableSource(properties);
+			fail("exception expected");
+		} catch (Throwable t) {
+			assertTrue(ExceptionUtils.findThrowableWithMessage(t,
+				"Either all or none of the following options should be provided:\n" +
+					"lookup.cache.max-rows\n" +
+					"lookup.cache.ttl").isPresent());
+		}
+
+		// lookup cache properties not complete
+		try {
+			Map<String, String> properties = getAllOptions();
+			properties.put("lookup.cache.ttl", "1s");
+
+			createTableSource(properties);
+			fail("exception expected");
+		} catch (Throwable t) {
+			assertTrue(ExceptionUtils.findThrowableWithMessage(t,
+				"Either all or none of the following options should be provided:\n" +
+					"lookup.cache.max-rows\n" +
+					"lookup.cache.ttl").isPresent());
+		}
+	}
+
+	private Map<String, String> getAllOptions() {
+		Map<String, String> options = new HashMap<>();
+		options.put("connector", "jdbc");
+		options.put("url", "jdbc:derby:memory:mydb");
+		options.put("table-name", "mytable");
+		return options;
+	}
+
+	private static DynamicTableSource createTableSource(Map<String, String> options) {
+		return FactoryUtil.createTableSource(
+			null,
+			ObjectIdentifier.of("default", "default", "t1"),
+			new CatalogTableImpl(JdbcDynamicTableFactoryTest.schema, options, "mock source"),
+			new Configuration(),
+			JdbcDynamicTableFactoryTest.class.getClassLoader());
+	}
+
+	private static DynamicTableSink createTableSink(Map<String, String> options) {
+		return FactoryUtil.createTableSink(
+			null,
+			ObjectIdentifier.of("default", "default", "t1"),
+			new CatalogTableImpl(JdbcDynamicTableFactoryTest.schema, options, "mock sink"),
+			new Configuration(),
+			JdbcDynamicTableFactoryTest.class.getClassLoader());
+	}
+}
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java
index 7288a01..76d3cf6 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java
@@ -253,7 +253,7 @@ public class JdbcDynamicTableSinkITCase extends AbstractTestBase {
 				"'url'='" + DB_URL + "'," +
 				"'table-name' = '" + OUTPUT_TABLE3 + "'," +
 				"'sink.buffer-flush.max-rows' = '2'," +
-				"'sink.buffer-flush.interval' = '3'," +
+				"'sink.buffer-flush.interval' = '300ms'," +
 				"'sink.max-retries' = '4'" +
 				")");
 


[flink] 01/04: [hotfix][table-common] Fix TableSchemaUtils#getPhysicalSchema should keep the original constraint name

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 18aa52882ef75fbd08d693d1ebf5a97e31376244
Author: Jark Wu <ja...@apache.org>
AuthorDate: Mon Jun 8 18:46:01 2020 +0800

    [hotfix][table-common] Fix TableSchemaUtils#getPhysicalSchema should keep the original constraint name
---
 .../src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java  | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
index 67ee125..9efc1f3 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
@@ -58,7 +58,9 @@ public class TableSchemaUtils {
 				}
 			});
 		tableSchema.getPrimaryKey().ifPresent(
-			uniqueConstraint -> builder.primaryKey(uniqueConstraint.getColumns().toArray(new String[0]))
+			uniqueConstraint -> builder.primaryKey(
+				uniqueConstraint.getName(),
+				uniqueConstraint.getColumns().toArray(new String[0]))
 		);
 		return builder.build();
 	}


[flink] 03/04: [FLINK-16496][hbase][table] Improve default flush strategy for new HBase sink for better out-of-box

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7ecf7de43c8bfde5baf0a18aa5a0e65f0a3060a3
Author: Jark Wu <ja...@apache.org>
AuthorDate: Mon Jun 8 20:43:43 2020 +0800

    [FLINK-16496][hbase][table] Improve default flush strategy for new HBase sink for better out-of-box
    
    The default flush strategy for old HBase sink is no flush interval and 2MB buffered size.
    The new default flush strategy for new HBase sink is '1s' flush interval and '1000' buffered rows and '2mb' buffered size.
    
    This closes #12536
---
 docs/dev/table/connectors/hbase.md                 | 13 +++---
 docs/dev/table/connectors/hbase.zh.md              | 13 +++---
 .../connector/hbase/HBaseDynamicTableFactory.java  | 39 +++++++++--------
 .../connector/hbase/options/HBaseWriteOptions.java | 16 +------
 .../connector/hbase/sink/HBaseSinkFunction.java    |  6 ++-
 .../hbase/HBaseDynamicTableFactoryTest.java        | 49 +++++++++++++++++++---
 6 files changed, 85 insertions(+), 51 deletions(-)

diff --git a/docs/dev/table/connectors/hbase.md b/docs/dev/table/connectors/hbase.md
index 503e11e..60f6eb8 100644
--- a/docs/dev/table/connectors/hbase.md
+++ b/docs/dev/table/connectors/hbase.md
@@ -110,7 +110,7 @@ Connector Options
       <td>optional</td>
       <td style="word-wrap: break-word;">/hbase</td>
       <td>String</td>
-      <td>The root dir in Zookeeper for HBase cluster</td>
+      <td>The root dir in Zookeeper for HBase cluster.</td>
     </tr>
     <tr>
       <td><h5>null-string-literal</h5></td>
@@ -126,6 +126,7 @@ Connector Options
       <td>MemorySize</td>
       <td>Writing option, maximum size in memory of buffered rows for each writing request.
       This can improve performance for writing data to HBase database, but may increase the latency.
+      Can be set to '0' to disable it.
       </td>
     </tr>
     <tr>
@@ -135,16 +136,18 @@ Connector Options
       <td>Integer</td>
       <td>Writing option, maximum number of rows to buffer for each writing request.
       This can improve performance for writing data to HBase database, but may increase the latency.
-      No default value, which means the default flushing is not depends on the number of buffered rows
+      Can be set to '0' to disable it.
       </td>
     </tr>
     <tr>
       <td><h5>sink.buffer-flush.interval</h5></td>
       <td>optional</td>
-      <td style="word-wrap: break-word;">(none)</td>
+      <td style="word-wrap: break-word;">1s</td>
       <td>Duration</td>
-      <td>Writing option, the interval to flush buffered rows.
-      No default value, which means no asynchronous flush thread will be scheduled. Examples: '1s', '5 s'.
+      <td>Writing option, the interval to flush any buffered rows.
+      This can improve performance for writing data to HBase database, but may increase the latency.
+      Can be set to '0' to disable it. Note, both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows'
+      can be set to '0' with the flush interval set allowing for complete async processing of buffered actions.
       </td>
     </tr>
     </tbody>
diff --git a/docs/dev/table/connectors/hbase.zh.md b/docs/dev/table/connectors/hbase.zh.md
index 503e11e..60f6eb8 100644
--- a/docs/dev/table/connectors/hbase.zh.md
+++ b/docs/dev/table/connectors/hbase.zh.md
@@ -110,7 +110,7 @@ Connector Options
       <td>optional</td>
       <td style="word-wrap: break-word;">/hbase</td>
       <td>String</td>
-      <td>The root dir in Zookeeper for HBase cluster</td>
+      <td>The root dir in Zookeeper for HBase cluster.</td>
     </tr>
     <tr>
       <td><h5>null-string-literal</h5></td>
@@ -126,6 +126,7 @@ Connector Options
       <td>MemorySize</td>
       <td>Writing option, maximum size in memory of buffered rows for each writing request.
       This can improve performance for writing data to HBase database, but may increase the latency.
+      Can be set to '0' to disable it.
       </td>
     </tr>
     <tr>
@@ -135,16 +136,18 @@ Connector Options
       <td>Integer</td>
       <td>Writing option, maximum number of rows to buffer for each writing request.
       This can improve performance for writing data to HBase database, but may increase the latency.
-      No default value, which means the default flushing is not depends on the number of buffered rows
+      Can be set to '0' to disable it.
       </td>
     </tr>
     <tr>
       <td><h5>sink.buffer-flush.interval</h5></td>
       <td>optional</td>
-      <td style="word-wrap: break-word;">(none)</td>
+      <td style="word-wrap: break-word;">1s</td>
       <td>Duration</td>
-      <td>Writing option, the interval to flush buffered rows.
-      No default value, which means no asynchronous flush thread will be scheduled. Examples: '1s', '5 s'.
+      <td>Writing option, the interval to flush any buffered rows.
+      This can improve performance for writing data to HBase database, but may increase the latency.
+      Can be set to '0' to disable it. Note, both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows'
+      can be set to '0' with the flush interval set allowing for complete async processing of buffered actions.
       </td>
     </tr>
     </tbody>
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
index b0e6dee..5db9968 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
@@ -54,50 +54,51 @@ public class HBaseDynamicTableFactory implements DynamicTableSourceFactory, Dyna
 		.key("table-name")
 		.stringType()
 		.noDefaultValue()
-		.withDescription("Required. It defines the HBase table name.");
+		.withDescription("The name of HBase table to connect.");
 
 	private static final ConfigOption<String> ZOOKEEPER_QUORUM = ConfigOptions
 		.key("zookeeper.quorum")
 		.stringType()
 		.noDefaultValue()
-		.withDescription("Required. It defines HBase Zookeeper quorum.");
+		.withDescription("The HBase Zookeeper quorum.");
 
 	private static final ConfigOption<String> ZOOKEEPER_ZNODE_PARENT = ConfigOptions
 		.key("zookeeper.znode.parent")
 		.stringType()
 		.defaultValue("/hbase")
-		.withDescription("Optional. The root dir in Zookeeper for HBase cluster, default value is '/hbase'");
+		.withDescription("The root dir in Zookeeper for HBase cluster.");
 
 	private static final ConfigOption<String> NULL_STRING_LITERAL = ConfigOptions
 		.key("null-string-literal")
 		.stringType()
 		.defaultValue("null")
-		.withDescription("Optional. Representation for null values for string fields. (\"null\" by default). " +
-			"HBase connector encode/decode empty bytes as null values except string types.");
+		.withDescription("Representation for null values for string fields. HBase source and " +
+			"sink encodes/decodes empty bytes as null values for all types except string type.");
 
 	private static final ConfigOption<MemorySize> SINK_BUFFER_FLUSH_MAX_SIZE = ConfigOptions
 		.key("sink.buffer-flush.max-size")
 		.memoryType()
 		.defaultValue(MemorySize.parse("2mb"))
-		.withDescription("Optional. Writing option, determines how many size in memory of " +
-			"buffered rows to insert per round trip. This can help performance on writing " +
-			"to JDBC database. The default value is '2mb'.");
+		.withDescription("Writing option, maximum size in memory of buffered rows for each " +
+			"writing request. This can improve performance for writing data to HBase database, " +
+			"but may increase the latency. Can be set to '0' to disable it. ");
 
 	private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions
 		.key("sink.buffer-flush.max-rows")
 		.intType()
-		.noDefaultValue()
-		.withDescription("Optional. Writing option, determines how many rows to insert " +
-			"per round trip. This can help performance on writing to JDBC database. " +
-			"No default value, i.e. the default flushing is not depends on the number of buffered rows.");
+		.defaultValue(1000)
+		.withDescription("Writing option, maximum number of rows to buffer for each writing request. " +
+			"This can improve performance for writing data to HBase database, but may increase the latency. " +
+			"Can be set to '0' to disable it.");
 
 	private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions
 		.key("sink.buffer-flush.interval")
 		.durationType()
-		.noDefaultValue()
-		.withDescription("Optional. Writing option, sets a flush interval flushing " +
-			"buffered requesting if the interval passes, in milliseconds. Default value is '0s', " +
-			"which means no asynchronous flush thread will be scheduled.");
+		.defaultValue(Duration.ofSeconds(1))
+		.withDescription("Writing option, the interval to flush any buffered rows. " +
+			"This can improve performance for writing data to HBase database, but may increase the latency. " +
+			"Can be set to '0' to disable it. Note, both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' " +
+			"can be set to '0' with the flush interval set allowing for complete async processing of buffered actions.");
 
 	@Override
 	public DynamicTableSource createDynamicTableSource(Context context) {
@@ -135,10 +136,8 @@ public class HBaseDynamicTableFactory implements DynamicTableSourceFactory, Dyna
 
 		HBaseWriteOptions.Builder writeBuilder = HBaseWriteOptions.builder();
 		writeBuilder.setBufferFlushMaxSizeInBytes(helper.getOptions().get(SINK_BUFFER_FLUSH_MAX_SIZE).getBytes());
-		helper.getOptions().getOptional(SINK_BUFFER_FLUSH_INTERVAL)
-			.ifPresent(v -> writeBuilder.setBufferFlushIntervalMillis(v.toMillis()));
-		helper.getOptions().getOptional(SINK_BUFFER_FLUSH_MAX_ROWS)
-			.ifPresent(writeBuilder::setBufferFlushMaxRows);
+		writeBuilder.setBufferFlushIntervalMillis(helper.getOptions().get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
+		writeBuilder.setBufferFlushMaxRows(helper.getOptions().get(SINK_BUFFER_FLUSH_MAX_ROWS));
 		String nullStringLiteral = helper.getOptions().get(NULL_STRING_LITERAL);
 		HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
 
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java
index f4143d4..3f7c0a9 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java
@@ -25,8 +25,6 @@ import org.apache.hadoop.hbase.client.ConnectionConfiguration;
 import java.io.Serializable;
 import java.util.Objects;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
-
 /**
  * Options for HBase writing.
  */
@@ -100,19 +98,15 @@ public class HBaseWriteOptions implements Serializable {
 	 */
 	public static class Builder {
 
-		// default is 2mb which is defined in hbase
 		private long bufferFlushMaxSizeInBytes = ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
-		private long bufferFlushMaxRows = -1;
-		private long bufferFlushIntervalMillis = -1;
+		private long bufferFlushMaxRows = 0;
+		private long bufferFlushIntervalMillis = 0;
 
 		/**
 		 * Optional. Sets when to flush a buffered request based on the memory size of rows currently added.
 		 * Default to <code>2mb</code>.
 		 */
 		public Builder setBufferFlushMaxSizeInBytes(long bufferFlushMaxSizeInBytes) {
-			checkArgument(
-				bufferFlushMaxSizeInBytes > 0,
-				"Max byte size of buffered rows must be larger than 0.");
 			this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes;
 			return this;
 		}
@@ -122,9 +116,6 @@ public class HBaseWriteOptions implements Serializable {
 		 * Defaults to not set, i.e. won't flush based on the number of buffered rows.
 		 */
 		public Builder setBufferFlushMaxRows(long bufferFlushMaxRows) {
-			checkArgument(
-				bufferFlushMaxRows > 0,
-				"Max number of buffered rows must be larger than 0.");
 			this.bufferFlushMaxRows = bufferFlushMaxRows;
 			return this;
 		}
@@ -134,9 +125,6 @@ public class HBaseWriteOptions implements Serializable {
 		 * Defaults to not set, i.e. won't flush based on flush interval.
 		 */
 		public Builder setBufferFlushIntervalMillis(long bufferFlushIntervalMillis) {
-			checkArgument(
-				bufferFlushIntervalMillis > 0,
-				"Interval (in milliseconds) between each flush must be larger than 0.");
 			this.bufferFlushIntervalMillis = bufferFlushIntervalMillis;
 			return this;
 		}
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
index 3d96e5d..3ba533a 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
@@ -118,8 +118,10 @@ public class HBaseSinkFunction<T>
 			}
 			// create a parameter instance, set the table name and custom listener reference.
 			BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(hTableName))
-				.listener(this)
-				.writeBufferSize(bufferFlushMaxSizeInBytes);
+				.listener(this);
+			if (bufferFlushMaxSizeInBytes > 0) {
+				params.writeBufferSize(bufferFlushMaxSizeInBytes);
+			}
 			this.mutator = connection.getBufferedMutator(params);
 
 			if (bufferFlushIntervalMillis > 0) {
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java
index 32e104a..d51f11b 100644
--- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java
@@ -182,14 +182,56 @@ public class HBaseDynamicTableFactoryTest {
 
 		HBaseWriteOptions expectedWriteOptions = HBaseWriteOptions.builder()
 			.setBufferFlushMaxRows(1000)
-			.setBufferFlushIntervalMillis(10 * 1000)
-			.setBufferFlushMaxSizeInBytes(10 * 1024 * 1024)
+			.setBufferFlushIntervalMillis(1000)
+			.setBufferFlushMaxSizeInBytes(2 * 1024 * 1024)
 			.build();
 		HBaseWriteOptions actualWriteOptions = hbaseSink.getWriteOptions();
 		assertEquals(expectedWriteOptions, actualWriteOptions);
 	}
 
 	@Test
+	public void testBufferFlushOptions() {
+		Map<String, String> options = getAllOptions();
+		options.put("sink.buffer-flush.max-size", "10mb");
+		options.put("sink.buffer-flush.max-rows", "100");
+		options.put("sink.buffer-flush.interval", "10s");
+
+		TableSchema schema = TableSchema.builder()
+			.field(ROWKEY, STRING())
+			.build();
+
+		DynamicTableSink sink = createTableSink(schema, options);
+		HBaseWriteOptions expected = HBaseWriteOptions.builder()
+			.setBufferFlushMaxRows(100)
+			.setBufferFlushIntervalMillis(10 * 1000)
+			.setBufferFlushMaxSizeInBytes(10 * 1024 * 1024)
+			.build();
+		HBaseWriteOptions actual = ((HBaseDynamicTableSink) sink).getWriteOptions();
+		assertEquals(expected, actual);
+	}
+
+	@Test
+	public void testDisabledBufferFlushOptions() {
+		Map<String, String> options = getAllOptions();
+		options.put("sink.buffer-flush.max-size", "0");
+		options.put("sink.buffer-flush.max-rows", "0");
+		options.put("sink.buffer-flush.interval", "0");
+
+		TableSchema schema = TableSchema.builder()
+			.field(ROWKEY, STRING())
+			.build();
+
+		DynamicTableSink sink = createTableSink(schema, options);
+		HBaseWriteOptions expected = HBaseWriteOptions.builder()
+			.setBufferFlushMaxRows(0)
+			.setBufferFlushIntervalMillis(0)
+			.setBufferFlushMaxSizeInBytes(0)
+			.build();
+		HBaseWriteOptions actual = ((HBaseDynamicTableSink) sink).getWriteOptions();
+		assertEquals(expected, actual);
+	}
+
+	@Test
 	public void testUnknownOption() {
 		Map<String, String> options = getAllOptions();
 		options.put("sink.unknown.key", "unknown-value");
@@ -225,9 +267,6 @@ public class HBaseDynamicTableFactoryTest {
 		options.put("table-name", "testHBastTable");
 		options.put("zookeeper.quorum", "localhost:2181");
 		options.put("zookeeper.znode.parent", "/flink");
-		options.put("sink.buffer-flush.max-size", "10mb");
-		options.put("sink.buffer-flush.max-rows", "1000");
-		options.put("sink.buffer-flush.interval", "10s");
 		return options;
 	}