You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/05/17 14:37:59 UTC

[flink] 02/02: [FLINK-17027] Introduce a new Elasticsearch 6 connector with new property keys

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ccd2d531d1cb577113d5021efd6277031eeef9d1
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri May 15 20:06:46 2020 +0200

    [FLINK-17027] Introduce a new Elasticsearch 6 connector with new property keys
    
    This closes #12184
---
 .../flink-connector-elasticsearch6/pom.xml         |  22 ++
 .../table/Elasticsearch6Configuration.java         |  80 +++++++
 .../table/Elasticsearch6DynamicSink.java           | 251 ++++++++++++++++++++
 .../table/Elasticsearch6DynamicSinkFactory.java    | 155 ++++++++++++
 .../org.apache.flink.table.factories.Factory       |  16 ++
 .../Elasticsearch6DynamicSinkFactoryTest.java      | 207 ++++++++++++++++
 .../table/Elasticsearch6DynamicSinkITCase.java     | 262 +++++++++++++++++++++
 .../table/Elasticsearch6DynamicSinkTest.java       | 195 +++++++++++++++
 8 files changed, 1188 insertions(+)

diff --git a/flink-connectors/flink-connector-elasticsearch6/pom.xml b/flink-connectors/flink-connector-elasticsearch6/pom.xml
index 25a9f5a..272f9af 100644
--- a/flink-connectors/flink-connector-elasticsearch6/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch6/pom.xml
@@ -145,6 +145,14 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<!-- Table API integration tests -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
 		<!-- Elasticsearch table sink factory testing -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
@@ -154,4 +162,18 @@ under the License.
 		</dependency>
 
 	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<configuration>
+					<!-- Enforce single fork execution because of spawning
+					 Elasticsearch cluster multiple times -->
+					<forkCount>1</forkCount>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
 </project>
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6Configuration.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6Configuration.java
new file mode 100644
index 0000000..c06898e
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6Configuration.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+
+import org.apache.http.HttpHost;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+
+/**
+ * Elasticsearch 7 specific configuration.
+ */
+@Internal
+final class Elasticsearch6Configuration extends ElasticsearchConfiguration {
+	Elasticsearch6Configuration(ReadableConfig config, ClassLoader classLoader) {
+		super(config, classLoader);
+	}
+
+	public List<HttpHost> getHosts() {
+		return config.get(HOSTS_OPTION).stream()
+			.map(Elasticsearch6Configuration::validateAndParseHostsString)
+			.collect(Collectors.toList());
+	}
+
+	/**
+	 * Parse Hosts String to list.
+	 *
+	 * <p>Hosts String format was given as following:
+	 *
+	 * <pre>
+	 *     connector.hosts = http://host_name:9092;http://host_name:9093
+	 * </pre>
+	 */
+	private static HttpHost validateAndParseHostsString(String host) {
+		try {
+			HttpHost httpHost = HttpHost.create(host);
+			if (httpHost.getPort() < 0) {
+				throw new ValidationException(String.format(
+					"Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing port.",
+					host,
+					HOSTS_OPTION.key()));
+			}
+
+			if (httpHost.getSchemeName() == null) {
+				throw new ValidationException(String.format(
+					"Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing scheme.",
+					host,
+					HOSTS_OPTION.key()));
+			}
+			return httpHost;
+		} catch (Exception e) {
+			throw new ValidationException(String.format(
+				"Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'.",
+				host,
+				HOSTS_OPTION.key()), e);
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
new file mode 100644
index 0000000..eadf659
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * A {@link DynamicTableSink} that describes how to create a {@link ElasticsearchSink} from a logical
+ * description.
+ */
+@PublicEvolving
+final class Elasticsearch6DynamicSink implements DynamicTableSink {
+	@VisibleForTesting
+	static final Elasticsearch7RequestFactory REQUEST_FACTORY = new Elasticsearch7RequestFactory();
+
+	private final SinkFormat<SerializationSchema<RowData>> format;
+	private final TableSchema schema;
+	private final Elasticsearch6Configuration config;
+
+	public Elasticsearch6DynamicSink(
+			SinkFormat<SerializationSchema<RowData>> format,
+			Elasticsearch6Configuration config,
+			TableSchema schema) {
+		this(format, config, schema, (ElasticsearchSink.Builder::new));
+	}
+
+	//--------------------------------------------------------------
+	// Hack to make configuration testing possible.
+	//
+	// The code in this block should never be used outside of tests.
+	// Having a way to inject a builder we can assert the builder in
+	// the test. We can not assert everything though, e.g. it is not
+	// possible to assert flushing on checkpoint, as it is configured
+	// on the sink itself.
+	//--------------------------------------------------------------
+
+	private final ElasticSearchBuilderProvider builderProvider;
+
+	@FunctionalInterface
+	interface ElasticSearchBuilderProvider {
+		ElasticsearchSink.Builder<RowData> createBuilder(
+			List<HttpHost> httpHosts,
+			RowElasticsearchSinkFunction upsertSinkFunction);
+	}
+
+	Elasticsearch6DynamicSink(
+			SinkFormat<SerializationSchema<RowData>> format,
+			Elasticsearch6Configuration config,
+			TableSchema schema,
+			ElasticSearchBuilderProvider builderProvider) {
+		this.format = format;
+		this.schema = schema;
+		this.config = config;
+		this.builderProvider = builderProvider;
+	}
+
+	//--------------------------------------------------------------
+	// End of hack to make configuration testing possible
+	//--------------------------------------------------------------
+
+	@Override
+	public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+		ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+		for (RowKind kind : requestedMode.getContainedKinds()) {
+			if (kind != RowKind.UPDATE_BEFORE) {
+				builder.addContainedKind(kind);
+			}
+		}
+		return builder.build();
+	}
+
+	@Override
+	public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
+		return () -> {
+			SerializationSchema<RowData> format = this.format.createSinkFormat(context, schema.toRowDataType());
+
+			final RowElasticsearchSinkFunction upsertFunction =
+				new RowElasticsearchSinkFunction(
+					IndexGeneratorFactory.createIndexGenerator(config.getIndex(), schema),
+					config.getDocumentType(),
+					format,
+					XContentType.JSON,
+					REQUEST_FACTORY,
+					KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter())
+				);
+
+			final ElasticsearchSink.Builder<RowData> builder = builderProvider.createBuilder(
+				config.getHosts(),
+				upsertFunction);
+
+			builder.setFailureHandler(config.getFailureHandler());
+			config.getBulkFlushMaxActions().ifPresent(builder::setBulkFlushMaxActions);
+			config.getBulkFlushMaxSize().ifPresent(builder::setBulkFlushMaxSizeMb);
+			config.getBulkFlushInterval().ifPresent(builder::setBulkFlushInterval);
+			builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
+			config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
+			config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
+			config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
+
+			config.getPathPrefix()
+				.ifPresent(pathPrefix -> builder.setRestClientFactory(new DefaultRestClientFactory(pathPrefix)));
+
+			final ElasticsearchSink<RowData> sink = builder.build();
+
+			if (config.isDisableFlushOnCheckpoint()) {
+				sink.disableFlushOnCheckpoint();
+			}
+
+			return sink;
+		};
+	}
+
+	@Override
+	public DynamicTableSink copy() {
+		return this;
+	}
+
+	@Override
+	public String asSummaryString() {
+		return "Elasticsearch7";
+	}
+
+	/**
+	 * Serializable {@link RestClientFactory} used by the sink.
+	 */
+	@VisibleForTesting
+	static class DefaultRestClientFactory implements RestClientFactory {
+
+		private final String pathPrefix;
+
+		public DefaultRestClientFactory(@Nullable String pathPrefix) {
+			this.pathPrefix = pathPrefix;
+		}
+
+		@Override
+		public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
+			if (pathPrefix != null) {
+				restClientBuilder.setPathPrefix(pathPrefix);
+			}
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			DefaultRestClientFactory that = (DefaultRestClientFactory) o;
+			return Objects.equals(pathPrefix, that.pathPrefix);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(pathPrefix);
+		}
+	}
+
+	/**
+	 * Version-specific creation of {@link org.elasticsearch.action.ActionRequest}s used by the sink.
+	 */
+	private static class Elasticsearch7RequestFactory implements RequestFactory {
+		@Override
+		public UpdateRequest createUpdateRequest(
+			String index,
+			String docType,
+			String key,
+			XContentType contentType,
+			byte[] document) {
+			return new UpdateRequest(index, docType, key)
+				.doc(document, contentType)
+				.upsert(document, contentType);
+		}
+
+		@Override
+		public IndexRequest createIndexRequest(
+				String index,
+				String docType,
+				String key,
+				XContentType contentType,
+				byte[] document) {
+			return new IndexRequest(index, docType, index)
+				.source(document, contentType);
+		}
+
+		@Override
+		public DeleteRequest createDeleteRequest(String index, String docType, String key) {
+			return new DeleteRequest(index, docType, key);
+		}
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		Elasticsearch6DynamicSink that = (Elasticsearch6DynamicSink) o;
+		return Objects.equals(format, that.format) &&
+			Objects.equals(schema, that.schema) &&
+			Objects.equals(config, that.config) &&
+			Objects.equals(builderProvider, that.builderProvider);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(format, schema, config, builderProvider);
+	}
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
new file mode 100644
index 0000000..65c90b5
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_MAX_RETRY_TIMEOUT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_PATH_PREFIX;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.DOCUMENT_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FAILURE_HANDLER_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FORMAT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.INDEX_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.KEY_DELIMITER_OPTION;
+
+/**
+ * A {@link DynamicTableSinkFactory} for discovering {@link Elasticsearch6DynamicSink}.
+ */
+@Internal
+public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory {
+	private static final Set<ConfigOption<?>> requiredOptions = Stream.of(
+		HOSTS_OPTION,
+		INDEX_OPTION,
+		DOCUMENT_TYPE_OPTION
+	).collect(Collectors.toSet());
+	private static final Set<ConfigOption<?>> optionalOptions = Stream.of(
+		KEY_DELIMITER_OPTION,
+		FAILURE_HANDLER_OPTION,
+		FLUSH_ON_CHECKPOINT_OPTION,
+		BULK_FLASH_MAX_SIZE_OPTION,
+		BULK_FLUSH_MAX_ACTIONS_OPTION,
+		BULK_FLUSH_INTERVAL_OPTION,
+		BULK_FLUSH_BACKOFF_TYPE_OPTION,
+		BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
+		BULK_FLUSH_BACKOFF_DELAY_OPTION,
+		CONNECTION_MAX_RETRY_TIMEOUT_OPTION,
+		CONNECTION_PATH_PREFIX,
+		FORMAT_OPTION
+	).collect(Collectors.toSet());
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		TableSchema tableSchema = context.getCatalogTable().getSchema();
+		ElasticsearchValidationUtils.validatePrimaryKey(tableSchema);
+		final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		final SinkFormat<SerializationSchema<RowData>> format = helper.discoverSinkFormat(
+			SerializationFormatFactory.class,
+			FORMAT_OPTION);
+
+		helper.validate();
+		Configuration configuration = new Configuration();
+		context.getCatalogTable()
+			.getOptions()
+			.forEach(configuration::setString);
+		Elasticsearch6Configuration config = new Elasticsearch6Configuration(configuration, context.getClassLoader());
+
+		validate(config, configuration);
+
+		return new Elasticsearch6DynamicSink(
+			format,
+			config,
+			TableSchemaUtils.getPhysicalSchema(tableSchema));
+	}
+
+	private void validate(Elasticsearch6Configuration config, Configuration originalConfiguration) {
+		config.getFailureHandler(); // checks if we can instantiate the custom failure handler
+		config.getHosts(); // validate hosts
+		validate(
+			config.getIndex().length() >= 1,
+			() -> String.format("'%s' must not be empty", INDEX_OPTION.key()));
+		validate(
+			config.getBulkFlushMaxActions().map(maxActions -> maxActions >= 1).orElse(true),
+			() -> String.format(
+				"'%s' must be at least 1 character. Got: %s",
+				BULK_FLUSH_MAX_ACTIONS_OPTION.key(),
+				config.getBulkFlushMaxActions().get())
+		);
+		validate(
+			config.getBulkFlushMaxSize().map(maxSize -> maxSize >= 1024 * 1024).orElse(true),
+			() -> String.format(
+				"'%s' must be at least 1mb character. Got: %s",
+				BULK_FLASH_MAX_SIZE_OPTION.key(),
+				originalConfiguration.get(BULK_FLASH_MAX_SIZE_OPTION).toHumanReadableString())
+		);
+		validate(
+			config.getBulkFlushBackoffRetries().map(retries -> retries >= 1).orElse(true),
+			() -> String.format(
+				"'%s' must be at least 1. Got: %s",
+				BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(),
+				config.getBulkFlushBackoffRetries().get())
+		);
+	}
+
+	private static void validate(boolean condition, Supplier<String> message) {
+		if (!condition) {
+			throw new ValidationException(message.get());
+		}
+	}
+
+	@Override
+	public String factoryIdentifier() {
+		return "elasticsearch-6";
+	}
+
+	@Override
+	public Set<ConfigOption<?>> requiredOptions() {
+		return requiredOptions;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> optionalOptions() {
+		return optionalOptions;
+	}
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connectors/flink-connector-elasticsearch6/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000..29a8593
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch6DynamicSinkFactory
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java
new file mode 100644
index 0000000..f1be1b2
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.TestContext.context;
+
+/**
+ * Tests for validation in {@link Elasticsearch6DynamicSinkFactory}.
+ */
+public class Elasticsearch6DynamicSinkFactoryTest {
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
+	@Test
+	public void validateEmptyConfiguration() {
+		Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+		thrown.expect(ValidationException.class);
+		thrown.expectMessage(
+			"One or more required options are missing.\n" +
+				"\n" +
+				"Missing required options are:\n" +
+				"\n" +
+				"document-type\n" +
+				"hosts\n" +
+				"index");
+		sinkFactory.createDynamicTableSink(
+			context()
+				.withSchema(TableSchema.builder()
+					.field("a", DataTypes.TIME())
+					.build())
+				.build()
+		);
+	}
+
+	@Test
+	public void validateWrongIndex() {
+		Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+		thrown.expect(ValidationException.class);
+		thrown.expectMessage(
+			"'index' must not be empty");
+		sinkFactory.createDynamicTableSink(
+			context()
+				.withSchema(TableSchema.builder()
+					.field("a", DataTypes.TIME())
+					.build())
+				.withOption("index", "")
+				.withOption("document-type", "MyType")
+				.withOption("hosts", "http://localhost:12345")
+				.build()
+		);
+	}
+
+	@Test
+	public void validateWrongHosts() {
+		Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+		thrown.expect(ValidationException.class);
+		thrown.expectMessage(
+			"Could not parse host 'wrong-host' in option 'hosts'. It should follow the format 'http://host_name:port'.");
+		sinkFactory.createDynamicTableSink(
+			context()
+				.withSchema(TableSchema.builder()
+					.field("a", DataTypes.TIME())
+					.build())
+				.withOption("index", "MyIndex")
+				.withOption("document-type", "MyType")
+				.withOption("hosts", "wrong-host")
+				.build()
+		);
+	}
+
+	@Test
+	public void validateWrongFlushSize() {
+		Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+		thrown.expect(ValidationException.class);
+		thrown.expectMessage(
+			"'sink.bulk-flush.max-size' must be at least 1mb character. Got: 1024 bytes");
+		sinkFactory.createDynamicTableSink(
+			context()
+				.withSchema(TableSchema.builder()
+					.field("a", DataTypes.TIME())
+					.build())
+				.withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex")
+				.withOption(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), "MyType")
+				.withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234")
+				.withOption(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION.key(), "1kb")
+				.build()
+		);
+	}
+
+	@Test
+	public void validateWrongRetries() {
+		Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+		thrown.expect(ValidationException.class);
+		thrown.expectMessage(
+			"'sink.bulk-flush.back-off.max-retries' must be at least 1. Got: 0");
+		sinkFactory.createDynamicTableSink(
+			context()
+				.withSchema(TableSchema.builder()
+					.field("a", DataTypes.TIME())
+					.build())
+				.withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex")
+				.withOption(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), "MyType")
+				.withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234")
+				.withOption(ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(), "0")
+				.build()
+		);
+	}
+
+	@Test
+	public void validateWrongMaxActions() {
+		Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+		thrown.expect(ValidationException.class);
+		thrown.expectMessage(
+			"'sink.bulk-flush.max-actions' must be at least 1 character. Got: 0");
+		sinkFactory.createDynamicTableSink(
+			context()
+				.withSchema(TableSchema.builder()
+					.field("a", DataTypes.TIME())
+					.build())
+				.withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex")
+				.withOption(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), "MyType")
+				.withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234")
+				.withOption(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION.key(), "0")
+				.build()
+		);
+	}
+
+	@Test
+	public void validateWrongBackoffDelay() {
+		Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+		thrown.expect(ValidationException.class);
+		thrown.expectMessage(
+			"Invalid value for option 'sink.bulk-flush.back-off.delay'.");
+		sinkFactory.createDynamicTableSink(
+			context()
+				.withSchema(TableSchema.builder()
+					.field("a", DataTypes.TIME())
+					.build())
+				.withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex")
+				.withOption(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), "MyType")
+				.withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234")
+				.withOption(ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION.key(), "-1s")
+				.build()
+		);
+	}
+
+	@Test
+	public void validatePrimaryKeyOnIllegalColumn() {
+		Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+		thrown.expect(ValidationException.class);
+		thrown.expectMessage(
+			"The table has a primary key on columns of illegal types: " +
+				"[ARRAY, MAP, MULTISET, ROW, RAW, VARBINARY].\n" +
+				" Elasticsearch sink does not support primary keys on columns of types: " +
+				"[ARRAY, MAP, MULTISET, STRUCTURED_TYPE, ROW, RAW, BINARY, VARBINARY].");
+		sinkFactory.createDynamicTableSink(
+			context()
+				.withSchema(TableSchema.builder()
+					.field("a", DataTypes.BIGINT().notNull())
+					.field("b", DataTypes.ARRAY(DataTypes.BIGINT().notNull()).notNull())
+					.field("c", DataTypes.MAP(DataTypes.BIGINT(), DataTypes.STRING()).notNull())
+					.field("d", DataTypes.MULTISET(DataTypes.BIGINT().notNull()).notNull())
+					.field("e", DataTypes.ROW(DataTypes.FIELD("a", DataTypes.BIGINT())).notNull())
+					.field("f", DataTypes.RAW(Types.BIG_INT).notNull())
+					.field("g", DataTypes.BYTES().notNull())
+					.primaryKey("a", "b", "c", "d", "e", "f", "g")
+					.build())
+				.withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex")
+				.withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234")
+				.withOption(ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION.key(), "1s")
+				.build()
+		);
+	}
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
new file mode 100644
index 0000000..3c09653
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.testutils.ElasticsearchResource;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.search.SearchHits;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.TestContext.context;
+import static org.apache.flink.table.api.Expressions.row;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * IT tests for {@link Elasticsearch6DynamicSink}.
+ */
+public class Elasticsearch6DynamicSinkITCase {
+
+	@ClassRule
+	public static ElasticsearchResource elasticsearchResource = new ElasticsearchResource("es-6-dynamic-sink-tests");
+
+	@Test
+	public void testWritingDocuments() throws Exception {
+		TableSchema schema = TableSchema.builder()
+			.field("a", DataTypes.BIGINT().notNull())
+			.field("b", DataTypes.TIME())
+			.field("c", DataTypes.STRING().notNull())
+			.field("d", DataTypes.FLOAT())
+			.field("e", DataTypes.TINYINT().notNull())
+			.field("f", DataTypes.DATE())
+			.field("g", DataTypes.TIMESTAMP().notNull())
+			.primaryKey("a", "g")
+			.build();
+		GenericRowData rowData = GenericRowData.of(
+			1L,
+			12345,
+			StringData.fromString("ABCDE"),
+			12.12f,
+			(byte) 2,
+			12345,
+			TimestampData.fromLocalDateTime(LocalDateTime.parse("2012-12-12T12:12:12")));
+
+		String index = "writing-documents";
+		String myType = "MyType";
+		Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+		SinkFunctionProvider sinkRuntimeProvider = (SinkFunctionProvider) sinkFactory.createDynamicTableSink(
+			context()
+				.withSchema(schema)
+				.withOption(ElasticsearchOptions.INDEX_OPTION.key(), index)
+				.withOption(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), myType)
+				.withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://127.0.0.1:9200")
+				.withOption(ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false")
+				.build()
+		).getSinkRuntimeProvider(new MockContext());
+
+		SinkFunction<RowData> sinkFunction = sinkRuntimeProvider.createSinkFunction();
+		StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+		rowData.setRowKind(RowKind.UPDATE_AFTER);
+		environment.<RowData>fromElements(rowData).addSink(sinkFunction);
+		environment.execute();
+
+		Client client = elasticsearchResource.getClient();
+		Map<String, Object> response = client.get(new GetRequest(index, myType, "1_2012-12-12T12:12:12")).actionGet().getSource();
+		Map<Object, Object> expectedMap = new HashMap<>();
+		expectedMap.put("a", 1);
+		expectedMap.put("b", "00:00:12Z");
+		expectedMap.put("c", "ABCDE");
+		expectedMap.put("d", 12.12d);
+		expectedMap.put("e", 2);
+		expectedMap.put("f", "2003-10-20");
+		expectedMap.put("g", "2012-12-12T12:12:12Z");
+		assertThat(response, equalTo(expectedMap));
+	}
+
+	@Test
+	public void testWritingDocumentsFromTableApi() throws Exception {
+		TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance()
+			.useBlinkPlanner()
+			.inStreamingMode()
+			.build());
+
+		String index = "table-api";
+		String myType = "MyType";
+		tableEnvironment.executeSql("CREATE TABLE esTable (" +
+			"a BIGINT NOT NULL,\n" +
+			"b TIME,\n" +
+			"c STRING NOT NULL,\n" +
+			"d FLOAT,\n" +
+			"e TINYINT NOT NULL,\n" +
+			"f DATE,\n" +
+			"g TIMESTAMP NOT NULL,\n" +
+			"h as a + 2,\n" +
+			"PRIMARY KEY (a, g) NOT ENFORCED\n" +
+			")\n" +
+			"WITH (\n" +
+			String.format("'%s'='%s',\n", "connector", "elasticsearch-6") +
+			String.format("'%s'='%s',\n", ElasticsearchOptions.INDEX_OPTION.key(), index) +
+			String.format("'%s'='%s',\n", ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), myType) +
+			String.format("'%s'='%s',\n", ElasticsearchOptions.HOSTS_OPTION.key(), "http://127.0.0.1:9200") +
+			String.format("'%s'='%s'\n", ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false") +
+			")");
+
+		tableEnvironment.fromValues(
+			row(
+				1L,
+				LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+				"ABCDE",
+				12.12f,
+				(byte) 2,
+				LocalDate.ofEpochDay(12345),
+				LocalDateTime.parse("2012-12-12T12:12:12"))
+		).executeInsert("esTable")
+			.getJobClient()
+			.get()
+			.getJobExecutionResult(this.getClass().getClassLoader())
+			.get();
+
+		Client client = elasticsearchResource.getClient();
+		Map<String, Object> response = client.get(new GetRequest(index, myType, "1_2012-12-12T12:12:12"))
+			.actionGet()
+			.getSource();
+		Map<Object, Object> expectedMap = new HashMap<>();
+		expectedMap.put("a", 1);
+		expectedMap.put("b", "00:00:12Z");
+		expectedMap.put("c", "ABCDE");
+		expectedMap.put("d", 12.12d);
+		expectedMap.put("e", 2);
+		expectedMap.put("f", "2003-10-20");
+		expectedMap.put("g", "2012-12-12T12:12:12Z");
+		assertThat(response, equalTo(expectedMap));
+	}
+
+	@Test
+	public void testWritingDocumentsNoPrimaryKey() throws Exception {
+		TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance()
+			.useBlinkPlanner()
+			.inStreamingMode()
+			.build());
+
+		String index = "no-primary-key";
+		String myType = "MyType";
+		tableEnvironment.executeSql("CREATE TABLE esTable (" +
+			"a BIGINT NOT NULL,\n" +
+			"b TIME,\n" +
+			"c STRING NOT NULL,\n" +
+			"d FLOAT,\n" +
+			"e TINYINT NOT NULL,\n" +
+			"f DATE,\n" +
+			"g TIMESTAMP NOT NULL\n" +
+			")\n" +
+			"WITH (\n" +
+			String.format("'%s'='%s',\n", "connector", "elasticsearch-6") +
+			String.format("'%s'='%s',\n", ElasticsearchOptions.INDEX_OPTION.key(), index) +
+			String.format("'%s'='%s',\n", ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), myType) +
+			String.format("'%s'='%s',\n", ElasticsearchOptions.HOSTS_OPTION.key(), "http://127.0.0.1:9200") +
+			String.format("'%s'='%s'\n", ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false") +
+			")");
+
+		tableEnvironment.fromValues(
+			row(
+				1L,
+				LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+				"ABCDE",
+				12.12f,
+				(byte) 2,
+				LocalDate.ofEpochDay(12345),
+				LocalDateTime.parse("2012-12-12T12:12:12"))
+		).executeInsert("esTable")
+			.getJobClient()
+			.get()
+			.getJobExecutionResult(this.getClass().getClassLoader())
+			.get();
+
+		Client client = elasticsearchResource.getClient();
+
+		// search API does not return documents that were not indexed, we might need to query
+		// the index a few times
+		Deadline deadline = Deadline.fromNow(Duration.ofSeconds(1));
+		SearchHits hits;
+		do {
+			hits = client.prepareSearch(index)
+				.execute()
+				.actionGet()
+				.getHits();
+			if (hits.getTotalHits() == 0) {
+				Thread.sleep(100);
+			}
+		} while (hits.getTotalHits() == 0 && deadline.hasTimeLeft());
+
+		Map<String, Object> result = hits.getAt(0).getSourceAsMap();
+		Map<Object, Object> expectedMap = new HashMap<>();
+		expectedMap.put("a", 1);
+		expectedMap.put("b", "00:00:12Z");
+		expectedMap.put("c", "ABCDE");
+		expectedMap.put("d", 12.12d);
+		expectedMap.put("e", 2);
+		expectedMap.put("f", "2003-10-20");
+		expectedMap.put("g", "2012-12-12T12:12:12Z");
+		assertThat(result, equalTo(expectedMap));
+	}
+
+	private static class MockContext implements DynamicTableSink.Context {
+		@Override
+		public boolean isBounded() {
+			return false;
+		}
+
+		@Override
+		public TypeInformation<?> createTypeInformation(DataType consumedDataType) {
+			return null;
+		}
+
+		@Override
+		public DynamicTableSink.DataStructureConverter createDataStructureConverter(DataType consumedDataType) {
+			return null;
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java
new file mode 100644
index 0000000..df54147
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.List;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for {@link Elasticsearch6DynamicSink} parameters.
+ */
+public class Elasticsearch6DynamicSinkTest {
+
+	private static final String FIELD_KEY = "key";
+	private static final String FIELD_FRUIT_NAME = "fruit_name";
+	private static final String FIELD_COUNT = "count";
+	private static final String FIELD_TS = "ts";
+
+	private static final String HOSTNAME = "host1";
+	private static final int PORT = 1234;
+	private static final String SCHEMA = "https";
+	private static final String INDEX = "MyIndex";
+	private static final String DOC_TYPE = "MyType";
+
+	@Test
+	public void testBuilder() {
+		final TableSchema schema = createTestSchema();
+
+		BuilderProvider provider = new BuilderProvider();
+		final Elasticsearch6DynamicSink testSink = new Elasticsearch6DynamicSink(
+			new DummySinkFormat(),
+			new Elasticsearch6Configuration(getConfig(), this.getClass().getClassLoader()),
+			schema,
+			provider
+		);
+
+		testSink.getSinkRuntimeProvider(new MockSinkContext()).createSinkFunction();
+
+		verify(provider.builderSpy).setFailureHandler(new DummyFailureHandler());
+		verify(provider.builderSpy).setBulkFlushBackoff(true);
+		verify(provider.builderSpy).setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
+		verify(provider.builderSpy).setBulkFlushBackoffDelay(123);
+		verify(provider.builderSpy).setBulkFlushBackoffRetries(3);
+		verify(provider.builderSpy).setBulkFlushInterval(100);
+		verify(provider.builderSpy).setBulkFlushMaxActions(1000);
+		verify(provider.builderSpy).setBulkFlushMaxSizeMb(1);
+		verify(provider.builderSpy).setRestClientFactory(new Elasticsearch6DynamicSink.DefaultRestClientFactory("/myapp"));
+		verify(provider.sinkSpy).disableFlushOnCheckpoint();
+	}
+
+	private Configuration getConfig() {
+		Configuration configuration = new Configuration();
+		configuration.setString(ElasticsearchOptions.INDEX_OPTION.key(), INDEX);
+		configuration.setString(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), DOC_TYPE);
+		configuration.setString(ElasticsearchOptions.HOSTS_OPTION.key(), SCHEMA + "://" + HOSTNAME + ":" + PORT);
+		configuration.setString(ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION.key(), "exponential");
+		configuration.setString(ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION.key(), "123");
+		configuration.setString(ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(), "3");
+		configuration.setString(ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION.key(), "100");
+		configuration.setString(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION.key(), "1000");
+		configuration.setString(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION.key(), "1mb");
+		configuration.setString(ElasticsearchOptions.CONNECTION_PATH_PREFIX.key(), "/myapp");
+		configuration.setString(ElasticsearchOptions.FAILURE_HANDLER_OPTION.key(), DummyFailureHandler.class.getName());
+		configuration.setString(ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false");
+		return configuration;
+	}
+
+	private static class BuilderProvider implements Elasticsearch6DynamicSink.ElasticSearchBuilderProvider {
+		public ElasticsearchSink.Builder<RowData> builderSpy;
+		public ElasticsearchSink<RowData> sinkSpy;
+
+		@Override
+		public ElasticsearchSink.Builder<RowData> createBuilder(
+				List<HttpHost> httpHosts,
+				RowElasticsearchSinkFunction upsertSinkFunction) {
+			builderSpy = Mockito.spy(new ElasticsearchSink.Builder<>(httpHosts, upsertSinkFunction));
+			doAnswer(
+				invocation -> {
+					sinkSpy = Mockito.spy((ElasticsearchSink<RowData>) invocation.callRealMethod());
+					return sinkSpy;
+				}
+			).when(builderSpy).build();
+
+			return builderSpy;
+		}
+	}
+
+	private TableSchema createTestSchema() {
+		return TableSchema.builder()
+			.field(FIELD_KEY, DataTypes.BIGINT())
+			.field(FIELD_FRUIT_NAME, DataTypes.STRING())
+			.field(FIELD_COUNT, DataTypes.DECIMAL(10, 4))
+			.field(FIELD_TS, DataTypes.TIMESTAMP(3))
+			.build();
+	}
+
+	private static class DummySerializationSchema implements SerializationSchema<RowData> {
+
+		private static final DummySerializationSchema INSTANCE = new DummySerializationSchema();
+
+		@Override
+		public byte[] serialize(RowData element) {
+			return new byte[0];
+		}
+	}
+
+	private static class DummySinkFormat implements SinkFormat<SerializationSchema<RowData>> {
+		@Override
+		public SerializationSchema<RowData> createSinkFormat(
+				DynamicTableSink.Context context,
+				DataType consumedDataType) {
+			return DummySerializationSchema.INSTANCE;
+		}
+
+		@Override
+		public ChangelogMode getChangelogMode() {
+			return null;
+		}
+	}
+
+	private static class MockSinkContext implements DynamicTableSink.Context {
+		@Override
+		public boolean isBounded() {
+			return false;
+		}
+
+		@Override
+		public TypeInformation<?> createTypeInformation(DataType consumedDataType) {
+			return null;
+		}
+
+		@Override
+		public DynamicTableSink.DataStructureConverter createDataStructureConverter(DataType consumedDataType) {
+			return null;
+		}
+	}
+
+	/**
+	 * Custom failure handler for testing.
+	 */
+	public static class DummyFailureHandler implements ActionRequestFailureHandler {
+
+		@Override
+		public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) {
+			// do nothing
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			return o instanceof DummyFailureHandler;
+		}
+
+		@Override
+		public int hashCode() {
+			return DummyFailureHandler.class.hashCode();
+		}
+	}
+}