You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/11 09:04:14 UTC

[flink] 04/04: [FLINK-16495][elasticsearch][table] Improve default flush strategy for new Elasticsearch sink for better out-of-box

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3d764dc7b21dde54e0bcd61e51546daa77ce3f12
Author: Jark Wu <ja...@apache.org>
AuthorDate: Tue Jun 9 10:47:15 2020 +0800

    [FLINK-16495][elasticsearch][table] Improve default flush strategy for new Elasticsearch sink for better out-of-box
    
    The default flush strategy for old Elasticsearch sink is no flush interval and 5MB buffered size and 1000 rows.
    The new default flush strategy for new Elasticsearch sink is '1s' flush interval and '1000' buffered rows and '2mb' buffered size.
    
    This closes #12536
---
 .../table/ElasticsearchConfiguration.java          | 19 ++++++++------
 .../elasticsearch/table/ElasticsearchOptions.java  |  6 ++---
 .../elasticsearch/util/NoOpFailureHandler.java     |  9 +++++++
 .../table/Elasticsearch6DynamicSink.java           |  6 ++---
 .../table/Elasticsearch6DynamicSinkFactory.java    | 11 +++++---
 .../elasticsearch6/ElasticsearchSink.java          | 12 ---------
 .../Elasticsearch6DynamicSinkFactoryTest.java      |  6 ++---
 .../table/Elasticsearch6DynamicSinkTest.java       | 29 ++++++++++++++++++++++
 .../table/Elasticsearch7DynamicSink.java           |  6 ++---
 .../table/Elasticsearch7DynamicSinkFactory.java    | 11 +++++---
 .../Elasticsearch7DynamicSinkFactoryTest.java      |  6 ++---
 .../table/Elasticsearch7DynamicSinkTest.java       | 29 ++++++++++++++++++++++
 12 files changed, 108 insertions(+), 42 deletions(-)

diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java
index 48b848c..6de8892 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.connectors.elasticsearch.table;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
@@ -81,16 +80,22 @@ class ElasticsearchConfiguration {
 		return config.get(ElasticsearchOptions.DOCUMENT_TYPE_OPTION);
 	}
 
-	public Optional<Integer> getBulkFlushMaxActions() {
-		return config.getOptional(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION);
+	public int getBulkFlushMaxActions() {
+		int maxActions = config.get(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION);
+		// convert 0 to -1, because Elasticsearch client use -1 to disable this configuration.
+		return maxActions == 0 ? -1 : maxActions;
 	}
 
-	public Optional<Integer> getBulkFlushMaxSize() {
-		return config.getOptional(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION).map(MemorySize::getMebiBytes);
+	public long getBulkFlushMaxByteSize() {
+		long maxSize = config.get(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION).getBytes();
+		// convert 0 to -1, because Elasticsearch client use -1 to disable this configuration.
+		return maxSize == 0 ? -1 : maxSize;
 	}
 
-	public Optional<Long> getBulkFlushInterval() {
-		return config.getOptional(BULK_FLUSH_INTERVAL_OPTION).map(Duration::toMillis);
+	public long getBulkFlushInterval() {
+		long interval = config.get(BULK_FLUSH_INTERVAL_OPTION).toMillis();
+		// convert 0 to -1, because Elasticsearch client use -1 to disable this configuration.
+		return interval == 0 ? -1 : interval;
 	}
 
 	public boolean isBulkFlushBackoffEnabled() {
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java
index 176414d..355f455 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java
@@ -84,17 +84,17 @@ public class ElasticsearchOptions {
 	public static final ConfigOption<Integer> BULK_FLUSH_MAX_ACTIONS_OPTION =
 		ConfigOptions.key("sink.bulk-flush.max-actions")
 			.intType()
-			.noDefaultValue()
+			.defaultValue(1000)
 			.withDescription("Maximum number of actions to buffer for each bulk request.");
 	public static final ConfigOption<MemorySize> BULK_FLASH_MAX_SIZE_OPTION =
 		ConfigOptions.key("sink.bulk-flush.max-size")
 			.memoryType()
-			.noDefaultValue()
+			.defaultValue(MemorySize.parse("2mb"))
 			.withDescription("Maximum size of buffered actions per bulk request");
 	public static final ConfigOption<Duration> BULK_FLUSH_INTERVAL_OPTION =
 		ConfigOptions.key("sink.bulk-flush.interval")
 			.durationType()
-			.noDefaultValue()
+			.defaultValue(Duration.ofSeconds(1))
 			.withDescription("Bulk flush interval");
 	public static final ConfigOption<BackOffType> BULK_FLUSH_BACKOFF_TYPE_OPTION =
 		ConfigOptions.key("sink.bulk-flush.backoff.strategy")
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
index dfcb9ee..4726dc1 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
@@ -37,4 +37,13 @@ public class NoOpFailureHandler implements ActionRequestFailureHandler {
 		throw failure;
 	}
 
+	@Override
+	public boolean equals(Object o) {
+		return o instanceof NoOpFailureHandler;
+	}
+
+	@Override
+	public int hashCode() {
+		return NoOpFailureHandler.class.hashCode();
+	}
 }
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
index 01b5f47..4e9a8f85 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
@@ -128,9 +128,9 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
 				upsertFunction);
 
 			builder.setFailureHandler(config.getFailureHandler());
-			config.getBulkFlushMaxActions().ifPresent(builder::setBulkFlushMaxActions);
-			config.getBulkFlushMaxSize().ifPresent(builder::setBulkFlushMaxSizeMb);
-			config.getBulkFlushInterval().ifPresent(builder::setBulkFlushInterval);
+			builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
+			builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20));
+			builder.setBulkFlushInterval(config.getBulkFlushInterval());
 			builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
 			config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
 			config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
index c5d9c89..071bbb6 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
@@ -109,17 +109,20 @@ public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory
 		validate(
 			config.getIndex().length() >= 1,
 			() -> String.format("'%s' must not be empty", INDEX_OPTION.key()));
+		int maxActions = config.getBulkFlushMaxActions();
 		validate(
-			config.getBulkFlushMaxActions().map(maxActions -> maxActions >= 1).orElse(true),
+			maxActions == -1 || maxActions >= 1,
 			() -> String.format(
 				"'%s' must be at least 1 character. Got: %s",
 				BULK_FLUSH_MAX_ACTIONS_OPTION.key(),
-				config.getBulkFlushMaxActions().get())
+				maxActions)
 		);
+		long maxSize = config.getBulkFlushMaxByteSize();
+		long mb1 = 1024 * 1024;
 		validate(
-			config.getBulkFlushMaxSize().map(maxSize -> maxSize >= 1024 * 1024).orElse(true),
+			maxSize == -1 || (maxSize >= mb1 && maxSize % mb1 == 0),
 			() -> String.format(
-				"'%s' must be at least 1mb character. Got: %s",
+				"'%s' must be in MB granularity. Got: %s",
 				BULK_FLASH_MAX_SIZE_OPTION.key(),
 				originalConfiguration.get(BULK_FLASH_MAX_SIZE_OPTION).toHumanReadableString())
 		);
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
index 484e6f6..b847613 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
@@ -104,10 +104,6 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
 		 * @param numMaxActions the maxinum number of actions to buffer per bulk request.
 		 */
 		public void setBulkFlushMaxActions(int numMaxActions) {
-			Preconditions.checkArgument(
-				numMaxActions > 0,
-				"Max number of buffered actions must be larger than 0.");
-
 			this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, String.valueOf(numMaxActions));
 		}
 
@@ -117,10 +113,6 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
 		 * @param maxSizeMb the maximum size of buffered actions, in mb.
 		 */
 		public void setBulkFlushMaxSizeMb(int maxSizeMb) {
-			Preconditions.checkArgument(
-				maxSizeMb > 0,
-				"Max size of buffered actions must be larger than 0.");
-
 			this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, String.valueOf(maxSizeMb));
 		}
 
@@ -130,10 +122,6 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
 		 * @param intervalMillis the bulk flush interval, in milliseconds.
 		 */
 		public void setBulkFlushInterval(long intervalMillis) {
-			Preconditions.checkArgument(
-				intervalMillis >= 0,
-				"Interval (in milliseconds) between each flush must be larger than or equal to 0.");
-
 			this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, String.valueOf(intervalMillis));
 		}
 
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java
index 6d0878f..143b712 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java
@@ -102,7 +102,7 @@ public class Elasticsearch6DynamicSinkFactoryTest {
 
 		thrown.expect(ValidationException.class);
 		thrown.expectMessage(
-			"'sink.bulk-flush.max-size' must be at least 1mb character. Got: 1024 bytes");
+			"'sink.bulk-flush.max-size' must be in MB granularity. Got: 1024 bytes");
 		sinkFactory.createDynamicTableSink(
 			context()
 				.withSchema(TableSchema.builder()
@@ -142,7 +142,7 @@ public class Elasticsearch6DynamicSinkFactoryTest {
 
 		thrown.expect(ValidationException.class);
 		thrown.expectMessage(
-			"'sink.bulk-flush.max-actions' must be at least 1 character. Got: 0");
+			"'sink.bulk-flush.max-actions' must be at least 1 character. Got: -2");
 		sinkFactory.createDynamicTableSink(
 			context()
 				.withSchema(TableSchema.builder()
@@ -151,7 +151,7 @@ public class Elasticsearch6DynamicSinkFactoryTest {
 				.withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex")
 				.withOption(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), "MyType")
 				.withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234")
-				.withOption(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION.key(), "0")
+				.withOption(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION.key(), "-2")
 				.build()
 		);
 	}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java
index 1708efc..36d8f6d 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
@@ -41,6 +42,7 @@ import org.mockito.Mockito;
 import java.util.List;
 
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 
 /**
@@ -85,6 +87,33 @@ public class Elasticsearch6DynamicSinkTest {
 		verify(provider.sinkSpy).disableFlushOnCheckpoint();
 	}
 
+	@Test
+	public void testDefaultConfig() {
+		final TableSchema schema = createTestSchema();
+		Configuration configuration = new Configuration();
+		configuration.setString(ElasticsearchOptions.INDEX_OPTION.key(), INDEX);
+		configuration.setString(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), DOC_TYPE);
+		configuration.setString(ElasticsearchOptions.HOSTS_OPTION.key(), SCHEMA + "://" + HOSTNAME + ":" + PORT);
+
+		BuilderProvider provider = new BuilderProvider();
+		final Elasticsearch6DynamicSink testSink = new Elasticsearch6DynamicSink(
+			new DummyEncodingFormat(),
+			new Elasticsearch6Configuration(configuration, this.getClass().getClassLoader()),
+			schema,
+			provider
+		);
+
+		testSink.getSinkRuntimeProvider(new MockSinkContext()).createSinkFunction();
+
+		verify(provider.builderSpy).setFailureHandler(new NoOpFailureHandler());
+		verify(provider.builderSpy).setBulkFlushBackoff(false);
+		verify(provider.builderSpy).setBulkFlushInterval(1000);
+		verify(provider.builderSpy).setBulkFlushMaxActions(1000);
+		verify(provider.builderSpy).setBulkFlushMaxSizeMb(2);
+		verify(provider.builderSpy).setRestClientFactory(new Elasticsearch6DynamicSink.DefaultRestClientFactory(null));
+		verify(provider.sinkSpy, never()).disableFlushOnCheckpoint();
+	}
+
 	private Configuration getConfig() {
 		Configuration configuration = new Configuration();
 		configuration.setString(ElasticsearchOptions.INDEX_OPTION.key(), INDEX);
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
index 7aa52ea..2213ce8 100644
--- a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
+++ b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
@@ -128,9 +128,9 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
 				upsertFunction);
 
 			builder.setFailureHandler(config.getFailureHandler());
-			config.getBulkFlushMaxActions().ifPresent(builder::setBulkFlushMaxActions);
-			config.getBulkFlushMaxSize().ifPresent(builder::setBulkFlushMaxSizeMb);
-			config.getBulkFlushInterval().ifPresent(builder::setBulkFlushInterval);
+			builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
+			builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20));
+			builder.setBulkFlushInterval(config.getBulkFlushInterval());
 			builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
 			config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
 			config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java
index 320c894..7ee2cfb 100644
--- a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java
+++ b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java
@@ -108,17 +108,20 @@ public class Elasticsearch7DynamicSinkFactory implements DynamicTableSinkFactory
 		validate(
 			config.getIndex().length() >= 1,
 			() -> String.format("'%s' must not be empty", INDEX_OPTION.key()));
+		int maxActions = config.getBulkFlushMaxActions();
 		validate(
-			config.getBulkFlushMaxActions().map(maxActions -> maxActions >= 1).orElse(true),
+			maxActions == -1 || maxActions >= 1,
 			() -> String.format(
 				"'%s' must be at least 1 character. Got: %s",
 				BULK_FLUSH_MAX_ACTIONS_OPTION.key(),
-				config.getBulkFlushMaxActions().get())
+				maxActions)
 		);
+		long maxSize = config.getBulkFlushMaxByteSize();
+		long mb1 = 1024 * 1024;
 		validate(
-			config.getBulkFlushMaxSize().map(maxSize -> maxSize >= 1024 * 1024).orElse(true),
+			maxSize == -1 || (maxSize >= mb1 && maxSize % mb1 == 0),
 			() -> String.format(
-				"'%s' must be at least 1mb character. Got: %s",
+				"'%s' must be in MB granularity. Got: %s",
 				BULK_FLASH_MAX_SIZE_OPTION.key(),
 				originalConfiguration.get(BULK_FLASH_MAX_SIZE_OPTION).toHumanReadableString())
 		);
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactoryTest.java b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactoryTest.java
index 4fe3214..7189163 100644
--- a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactoryTest.java
+++ b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactoryTest.java
@@ -99,7 +99,7 @@ public class Elasticsearch7DynamicSinkFactoryTest {
 
 		thrown.expect(ValidationException.class);
 		thrown.expectMessage(
-			"'sink.bulk-flush.max-size' must be at least 1mb character. Got: 1024 bytes");
+			"'sink.bulk-flush.max-size' must be in MB granularity. Got: 1024 bytes");
 		sinkFactory.createDynamicTableSink(
 			context()
 				.withSchema(TableSchema.builder()
@@ -137,7 +137,7 @@ public class Elasticsearch7DynamicSinkFactoryTest {
 
 		thrown.expect(ValidationException.class);
 		thrown.expectMessage(
-			"'sink.bulk-flush.max-actions' must be at least 1 character. Got: 0");
+			"'sink.bulk-flush.max-actions' must be at least 1 character. Got: -2");
 		sinkFactory.createDynamicTableSink(
 			context()
 				.withSchema(TableSchema.builder()
@@ -145,7 +145,7 @@ public class Elasticsearch7DynamicSinkFactoryTest {
 					.build())
 				.withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex")
 				.withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234")
-				.withOption(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION.key(), "0")
+				.withOption(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION.key(), "-2")
 				.build()
 		);
 	}
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java
index c972cee..36c3135 100644
--- a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java
+++ b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
@@ -41,6 +42,7 @@ import org.mockito.Mockito;
 import java.util.List;
 
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 
 /**
@@ -85,6 +87,33 @@ public class Elasticsearch7DynamicSinkTest {
 		verify(provider.sinkSpy).disableFlushOnCheckpoint();
 	}
 
+	@Test
+	public void testDefaultConfig() {
+		final TableSchema schema = createTestSchema();
+		Configuration configuration = new Configuration();
+		configuration.setString(ElasticsearchOptions.INDEX_OPTION.key(), INDEX);
+		configuration.setString(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), DOC_TYPE);
+		configuration.setString(ElasticsearchOptions.HOSTS_OPTION.key(), SCHEMA + "://" + HOSTNAME + ":" + PORT);
+
+		BuilderProvider provider = new BuilderProvider();
+		final Elasticsearch7DynamicSink testSink = new Elasticsearch7DynamicSink(
+			new DummyEncodingFormat(),
+			new Elasticsearch7Configuration(configuration, this.getClass().getClassLoader()),
+			schema,
+			provider
+		);
+
+		testSink.getSinkRuntimeProvider(new MockSinkContext()).createSinkFunction();
+
+		verify(provider.builderSpy).setFailureHandler(new NoOpFailureHandler());
+		verify(provider.builderSpy).setBulkFlushBackoff(false);
+		verify(provider.builderSpy).setBulkFlushInterval(1000);
+		verify(provider.builderSpy).setBulkFlushMaxActions(1000);
+		verify(provider.builderSpy).setBulkFlushMaxSizeMb(2);
+		verify(provider.builderSpy).setRestClientFactory(new Elasticsearch7DynamicSink.DefaultRestClientFactory(null));
+		verify(provider.sinkSpy, never()).disableFlushOnCheckpoint();
+	}
+
 	private Configuration getConfig() {
 		Configuration configuration = new Configuration();
 		configuration.setString(ElasticsearchOptions.INDEX_OPTION.key(), INDEX);