You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/03/22 08:35:16 UTC

[flink] branch release-1.15 updated (cbdf95e -> e7b19d8)

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a change to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from cbdf95e  [FLINK-26779][rest] OperationKey implements Serializable
     new 317c39f  [FLINK-26638][connectors/elasticsearch] Revert Table-API implementation to SinkFunction-based one
     new e7b19d8  [FLINK-26638][connectors/elasticsearch] Update docs for Table-API implementation to SinkFunction-based one

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../content/docs/connectors/table/elasticsearch.md |  35 +-
 .../table/AbstractTimeIndexGenerator.java          |  41 +++
 .../table/ElasticsearchConfiguration.java          | 169 +++++++++
 .../table/ElasticsearchConnectorOptions.java       | 171 +++++++++
 .../table/ElasticsearchValidationUtils.java        |  94 +++++
 .../elasticsearch/table/IndexGenerator.java        |  39 ++
 .../elasticsearch/table/IndexGeneratorBase.java    |  52 +++
 .../elasticsearch/table/IndexGeneratorFactory.java | 312 ++++++++++++++++
 .../elasticsearch/table/KeyExtractor.java          | 130 +++++++
 .../elasticsearch/table/RequestFactory.java        |  54 +++
 .../table/RowElasticsearchSinkFunction.java        | 140 ++++++++
 .../elasticsearch/table/StaticIndexGenerator.java  |  35 ++
 .../table/IndexGeneratorFactoryTest.java           | 282 +++++++++++++++
 .../elasticsearch/table/KeyExtractorTest.java      | 135 +++++++
 .../elasticsearch/table/TestContext.java           |  72 ++++
 .../table/Elasticsearch6Configuration.java         |  79 ++++
 .../table/Elasticsearch6DynamicSink.java           | 335 +++++++++++++++++
 .../table/Elasticsearch6DynamicSinkFactory.java    | 186 ++++++++++
 .../org.apache.flink.table.factories.Factory       |   2 +-
 .../Elasticsearch6DynamicSinkFactoryTest.java      | 250 +++++++++++++
 .../table/Elasticsearch6DynamicSinkITCase.java     | 399 +++++++++++++++++++++
 .../table/Elasticsearch6DynamicSinkTest.java       | 298 +++++++++++++++
 .../table/Elasticsearch7Configuration.java         |  70 ++++
 .../table/Elasticsearch7DynamicSink.java           | 335 +++++++++++++++++
 .../table/Elasticsearch7DynamicSinkFactory.java    | 186 ++++++++++
 .../org.apache.flink.table.factories.Factory       |   2 +-
 .../Elasticsearch7DynamicSinkFactoryTest.java      | 234 ++++++++++++
 .../table/Elasticsearch7DynamicSinkITCase.java     | 376 +++++++++++++++++++
 .../table/Elasticsearch7DynamicSinkTest.java       | 298 +++++++++++++++
 29 files changed, 4800 insertions(+), 11 deletions(-)
 create mode 100644 flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/AbstractTimeIndexGenerator.java
 create mode 100644 flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java
 create mode 100644 flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java
 create mode 100644 flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchValidationUtils.java
 create mode 100644 flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGenerator.java
 create mode 100644 flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorBase.java
 create mode 100644 flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java
 create mode 100644 flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractor.java
 create mode 100644 flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RequestFactory.java
 create mode 100644 flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java
 create mode 100644 flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/StaticIndexGenerator.java
 create mode 100644 flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java
 create mode 100644 flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractorTest.java
 create mode 100644 flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/TestContext.java
 create mode 100644 flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6Configuration.java
 create mode 100644 flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
 create mode 100644 flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
 create mode 100644 flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java
 create mode 100644 flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
 create mode 100644 flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java
 create mode 100644 flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7Configuration.java
 create mode 100644 flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
 create mode 100644 flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java
 create mode 100644 flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactoryTest.java
 create mode 100644 flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
 create mode 100644 flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java

[flink] 02/02: [FLINK-26638][connectors/elasticsearch] Update docs for Table-API implementation to SinkFunction-based one

Posted by fp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e7b19d8bfac83b20ebe49ff52afb5d1616d104c7
Author: Alexander Preuß <11...@users.noreply.github.com>
AuthorDate: Mon Mar 21 11:44:37 2022 +0100

    [FLINK-26638][connectors/elasticsearch] Update docs for Table-API implementation to SinkFunction-based one
---
 .../content/docs/connectors/table/elasticsearch.md | 35 ++++++++++++++++------
 1 file changed, 26 insertions(+), 9 deletions(-)

diff --git a/docs/content/docs/connectors/table/elasticsearch.md b/docs/content/docs/connectors/table/elasticsearch.md
index 4d92903..a28d67f 100644
--- a/docs/content/docs/connectors/table/elasticsearch.md
+++ b/docs/content/docs/connectors/table/elasticsearch.md
@@ -140,12 +140,29 @@ Connector Options
       <td>Password used to connect to Elasticsearch instance. If <code>username</code> is configured, this option must be configured with non-empty string as well.</td>
     </tr>
     <tr>
-      <td><h5>sink.delivery-guarantee</h5></td>
+      <td><h5>failure-handler</h5></td>
       <td>optional</td>
-      <td>no</td>
-      <td style="word-wrap: break-word;">AT_LEAST_ONCE</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">fail</td>
       <td>String</td>
-      <td>Optional delivery guarantee when committing. Valid values are <code>NONE</code> or <code>AT_LEAST_ONCE</code>.</td>
+      <td>Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are:
+      <ul>
+        <li><code>fail</code>: throws an exception if a request fails and thus causes a job failure.</li>
+        <li><code>ignore</code>: ignores failures and drops the request.</li>
+        <li><code>retry-rejected</code>: re-adds requests that have failed due to queue capacity saturation.</li>
+        <li>custom class name: for failure handling with a ActionRequestFailureHandler subclass.</li>
+      </ul>
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.flush-on-checkpoint</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">true</td>
+      <td>Boolean</td>
+      <td>Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests
+       to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong
+       guarantees for at-least-once delivery of action requests.
+      </td>
     </tr>
     <tr>
       <td><h5>sink.bulk-flush.max-actions</h5></td>
@@ -182,11 +199,11 @@ Connector Options
       <td><h5>sink.bulk-flush.backoff.strategy</h5></td>
       <td>optional</td>
       <td>yes</td>
-      <td style="word-wrap: break-word;">NONE</td>
+      <td style="word-wrap: break-word;">DISABLED</td>
       <td>String</td>
       <td>Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are:
       <ul>
-        <li><code>NONE</code>: no retry performed, i.e. fail after the first request error.</li>
+        <li><code>DISABLED</code>: no retry performed, i.e. fail after the first request error.</li>
         <li><code>CONSTANT</code>: wait for backoff delay between retries.</li>
         <li><code>EXPONENTIAL</code>: initially wait for backoff delay and increase exponentially between retries.</li>
       </ul>
@@ -209,12 +226,12 @@ Connector Options
       <td>Delay between each backoff attempt. For <code>CONSTANT</code> backoff, this is simply the delay between each retry. For <code>EXPONENTIAL</code> backoff, this is the initial base delay.</td>
     </tr>
     <tr>
-      <td><h5>sink.parallelism</h5></td>
+      <td><h5>connection.max-retry-timeout</h5></td>
       <td>optional</td>
       <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
-      <td>Integer</td>
-      <td>Defines the parallelism of the Elasticsearch sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.</td>
+      <td>Duration</td>
+      <td>Maximum timeout between retries.</td>
     </tr>
     <tr>
       <td><h5>connection.path-prefix</h5></td>

[flink] 01/02: [FLINK-26638][connectors/elasticsearch] Revert Table-API implementation to SinkFunction-based one

Posted by fp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 317c39fd84be6ee5f391a3b06c43f6b143c82f1f
Author: Alexander Preuß <11...@users.noreply.github.com>
AuthorDate: Thu Mar 17 11:51:22 2022 +0100

    [FLINK-26638][connectors/elasticsearch] Revert Table-API implementation to SinkFunction-based one
---
 .../table/AbstractTimeIndexGenerator.java          |  41 +++
 .../table/ElasticsearchConfiguration.java          | 169 +++++++++
 .../table/ElasticsearchConnectorOptions.java       | 171 +++++++++
 .../table/ElasticsearchValidationUtils.java        |  94 +++++
 .../elasticsearch/table/IndexGenerator.java        |  39 ++
 .../elasticsearch/table/IndexGeneratorBase.java    |  52 +++
 .../elasticsearch/table/IndexGeneratorFactory.java | 312 ++++++++++++++++
 .../elasticsearch/table/KeyExtractor.java          | 130 +++++++
 .../elasticsearch/table/RequestFactory.java        |  54 +++
 .../table/RowElasticsearchSinkFunction.java        | 140 ++++++++
 .../elasticsearch/table/StaticIndexGenerator.java  |  35 ++
 .../table/IndexGeneratorFactoryTest.java           | 282 +++++++++++++++
 .../elasticsearch/table/KeyExtractorTest.java      | 135 +++++++
 .../elasticsearch/table/TestContext.java           |  72 ++++
 .../table/Elasticsearch6Configuration.java         |  79 ++++
 .../table/Elasticsearch6DynamicSink.java           | 335 +++++++++++++++++
 .../table/Elasticsearch6DynamicSinkFactory.java    | 186 ++++++++++
 .../org.apache.flink.table.factories.Factory       |   2 +-
 .../Elasticsearch6DynamicSinkFactoryTest.java      | 250 +++++++++++++
 .../table/Elasticsearch6DynamicSinkITCase.java     | 399 +++++++++++++++++++++
 .../table/Elasticsearch6DynamicSinkTest.java       | 298 +++++++++++++++
 .../table/Elasticsearch7Configuration.java         |  70 ++++
 .../table/Elasticsearch7DynamicSink.java           | 335 +++++++++++++++++
 .../table/Elasticsearch7DynamicSinkFactory.java    | 186 ++++++++++
 .../org.apache.flink.table.factories.Factory       |   2 +-
 .../Elasticsearch7DynamicSinkFactoryTest.java      | 234 ++++++++++++
 .../table/Elasticsearch7DynamicSinkITCase.java     | 376 +++++++++++++++++++
 .../table/Elasticsearch7DynamicSinkTest.java       | 298 +++++++++++++++
 28 files changed, 4774 insertions(+), 2 deletions(-)

diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/AbstractTimeIndexGenerator.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/AbstractTimeIndexGenerator.java
new file mode 100644
index 0000000..6c22cf3
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/AbstractTimeIndexGenerator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+
+import java.time.format.DateTimeFormatter;
+
+/** Abstract class for time related {@link IndexGenerator}. */
+@Internal
+abstract class AbstractTimeIndexGenerator extends IndexGeneratorBase {
+
+    private final String dateTimeFormat;
+    protected transient DateTimeFormatter dateTimeFormatter;
+
+    public AbstractTimeIndexGenerator(String index, String dateTimeFormat) {
+        super(index);
+        this.dateTimeFormat = dateTimeFormat;
+    }
+
+    @Override
+    public void open() {
+        this.dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat);
+    }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java
new file mode 100644
index 0000000..04c7633
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.FAILURE_HANDLER_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION;
+
+/** Accessor methods to elasticsearch options. */
+@Internal
+class ElasticsearchConfiguration {
+    protected final ReadableConfig config;
+    private final ClassLoader classLoader;
+
+    ElasticsearchConfiguration(ReadableConfig config, ClassLoader classLoader) {
+        this.config = config;
+        this.classLoader = classLoader;
+    }
+
+    public ActionRequestFailureHandler getFailureHandler() {
+        final ActionRequestFailureHandler failureHandler;
+        String value = config.get(FAILURE_HANDLER_OPTION);
+        switch (value.toUpperCase()) {
+            case "FAIL":
+                failureHandler = new NoOpFailureHandler();
+                break;
+            case "IGNORE":
+                failureHandler = new IgnoringFailureHandler();
+                break;
+            case "RETRY-REJECTED":
+                failureHandler = new RetryRejectedExecutionFailureHandler();
+                break;
+            default:
+                try {
+                    Class<?> failureHandlerClass = Class.forName(value, false, classLoader);
+                    failureHandler =
+                            (ActionRequestFailureHandler)
+                                    InstantiationUtil.instantiate(failureHandlerClass);
+                } catch (ClassNotFoundException e) {
+                    throw new ValidationException(
+                            "Could not instantiate the failure handler class: " + value, e);
+                }
+                break;
+        }
+        return failureHandler;
+    }
+
+    public String getDocumentType() {
+        return config.get(ElasticsearchConnectorOptions.DOCUMENT_TYPE_OPTION);
+    }
+
+    public int getBulkFlushMaxActions() {
+        int maxActions = config.get(ElasticsearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION);
+        // convert 0 to -1, because Elasticsearch client use -1 to disable this configuration.
+        return maxActions == 0 ? -1 : maxActions;
+    }
+
+    public long getBulkFlushMaxByteSize() {
+        long maxSize =
+                config.get(ElasticsearchConnectorOptions.BULK_FLASH_MAX_SIZE_OPTION).getBytes();
+        // convert 0 to -1, because Elasticsearch client use -1 to disable this configuration.
+        return maxSize == 0 ? -1 : maxSize;
+    }
+
+    public long getBulkFlushInterval() {
+        long interval = config.get(BULK_FLUSH_INTERVAL_OPTION).toMillis();
+        // convert 0 to -1, because Elasticsearch client use -1 to disable this configuration.
+        return interval == 0 ? -1 : interval;
+    }
+
+    public Optional<String> getUsername() {
+        return config.getOptional(USERNAME_OPTION);
+    }
+
+    public Optional<String> getPassword() {
+        return config.getOptional(PASSWORD_OPTION);
+    }
+
+    public boolean isBulkFlushBackoffEnabled() {
+        return config.get(BULK_FLUSH_BACKOFF_TYPE_OPTION)
+                != ElasticsearchConnectorOptions.BackOffType.DISABLED;
+    }
+
+    public Optional<ElasticsearchSinkBase.FlushBackoffType> getBulkFlushBackoffType() {
+        switch (config.get(BULK_FLUSH_BACKOFF_TYPE_OPTION)) {
+            case CONSTANT:
+                return Optional.of(ElasticsearchSinkBase.FlushBackoffType.CONSTANT);
+            case EXPONENTIAL:
+                return Optional.of(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
+            default:
+                return Optional.empty();
+        }
+    }
+
+    public Optional<Integer> getBulkFlushBackoffRetries() {
+        return config.getOptional(BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION);
+    }
+
+    public Optional<Long> getBulkFlushBackoffDelay() {
+        return config.getOptional(BULK_FLUSH_BACKOFF_DELAY_OPTION).map(Duration::toMillis);
+    }
+
+    public boolean isDisableFlushOnCheckpoint() {
+        return !config.get(ElasticsearchConnectorOptions.FLUSH_ON_CHECKPOINT_OPTION);
+    }
+
+    public String getIndex() {
+        return config.get(ElasticsearchConnectorOptions.INDEX_OPTION);
+    }
+
+    public String getKeyDelimiter() {
+        return config.get(ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION);
+    }
+
+    public Optional<String> getPathPrefix() {
+        return config.getOptional(ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ElasticsearchConfiguration that = (ElasticsearchConfiguration) o;
+        return Objects.equals(config, that.config) && Objects.equals(classLoader, that.classLoader);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(config, classLoader);
+    }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java
new file mode 100644
index 0000000..1b0fc5e
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/** Options for the Elasticsearch connector. */
+@PublicEvolving
+public class ElasticsearchConnectorOptions {
+
+    public static final ConfigOption<List<String>> HOSTS_OPTION =
+            ConfigOptions.key("hosts")
+                    .stringType()
+                    .asList()
+                    .noDefaultValue()
+                    .withDescription("Elasticsearch hosts to connect to.");
+
+    public static final ConfigOption<String> INDEX_OPTION =
+            ConfigOptions.key("index")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Elasticsearch index for every record.");
+
+    public static final ConfigOption<String> DOCUMENT_TYPE_OPTION =
+            ConfigOptions.key("document-type")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Elasticsearch document type.");
+
+    public static final ConfigOption<String> PASSWORD_OPTION =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Password used to connect to Elasticsearch instance.");
+
+    public static final ConfigOption<String> USERNAME_OPTION =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Username used to connect to Elasticsearch instance.");
+
+    public static final ConfigOption<String> KEY_DELIMITER_OPTION =
+            ConfigOptions.key("document-id.key-delimiter")
+                    .stringType()
+                    .defaultValue("_")
+                    .withDescription(
+                            "Delimiter for composite keys e.g., \"$\" would result in IDs \"KEY1$KEY2$KEY3\".");
+
+    public static final ConfigOption<String> FAILURE_HANDLER_OPTION =
+            ConfigOptions.key("failure-handler")
+                    .stringType()
+                    .defaultValue("fail")
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Failure handling strategy in case a request to Elasticsearch fails")
+                                    .list(
+                                            text(
+                                                    "\"fail\" (throws an exception if a request fails and thus causes a job failure)"),
+                                            text(
+                                                    "\"ignore\" (ignores failures and drops the request)"),
+                                            text(
+                                                    "\"retry-rejected\" (re-adds requests that have failed due to queue capacity saturation)"),
+                                            text(
+                                                    "\"class name\" for failure handling with a ActionRequestFailureHandler subclass"))
+                                    .build());
+
+    public static final ConfigOption<Boolean> FLUSH_ON_CHECKPOINT_OPTION =
+            ConfigOptions.key("sink.flush-on-checkpoint")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Disables flushing on checkpoint");
+
+    public static final ConfigOption<Integer> BULK_FLUSH_MAX_ACTIONS_OPTION =
+            ConfigOptions.key("sink.bulk-flush.max-actions")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription("Maximum number of actions to buffer for each bulk request.");
+
+    public static final ConfigOption<MemorySize> BULK_FLASH_MAX_SIZE_OPTION =
+            ConfigOptions.key("sink.bulk-flush.max-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("2mb"))
+                    .withDescription("Maximum size of buffered actions per bulk request");
+
+    public static final ConfigOption<Duration> BULK_FLUSH_INTERVAL_OPTION =
+            ConfigOptions.key("sink.bulk-flush.interval")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(1))
+                    .withDescription("Bulk flush interval");
+
+    public static final ConfigOption<BackOffType> BULK_FLUSH_BACKOFF_TYPE_OPTION =
+            ConfigOptions.key("sink.bulk-flush.backoff.strategy")
+                    .enumType(BackOffType.class)
+                    .defaultValue(BackOffType.DISABLED)
+                    .withDescription("Backoff strategy");
+
+    public static final ConfigOption<Integer> BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION =
+            ConfigOptions.key("sink.bulk-flush.backoff.max-retries")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("Maximum number of retries.");
+
+    public static final ConfigOption<Duration> BULK_FLUSH_BACKOFF_DELAY_OPTION =
+            ConfigOptions.key("sink.bulk-flush.backoff.delay")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription("Delay between each backoff attempt.");
+
+    public static final ConfigOption<Duration> CONNECTION_MAX_RETRY_TIMEOUT_OPTION =
+            ConfigOptions.key("connection.max-retry-timeout")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription("Maximum timeout between retries.");
+
+    public static final ConfigOption<String> CONNECTION_PATH_PREFIX =
+            ConfigOptions.key("connection.path-prefix")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Prefix string to be added to every REST communication.");
+
+    public static final ConfigOption<String> FORMAT_OPTION =
+            ConfigOptions.key("format")
+                    .stringType()
+                    .defaultValue("json")
+                    .withDescription(
+                            "The format must produce a valid JSON document. "
+                                    + "Please refer to the documentation on formats for more details.");
+
+    // --------------------------------------------------------------------------------------------
+    // Enums
+    // --------------------------------------------------------------------------------------------
+
+    /**
+     * Backoff strategy. Extends {@link ElasticsearchSinkBase.FlushBackoffType} with {@code
+     * DISABLED} option.
+     */
+    public enum BackOffType {
+        DISABLED,
+        CONSTANT,
+        EXPONENTIAL
+    }
+
+    private ElasticsearchConnectorOptions() {}
+}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchValidationUtils.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchValidationUtils.java
new file mode 100644
index 0000000..6452d00
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchValidationUtils.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Utility methods for validating Elasticsearch properties. */
+@Internal
+class ElasticsearchValidationUtils {
+
+    private static final Set<LogicalTypeRoot> ILLEGAL_PRIMARY_KEY_TYPES = new LinkedHashSet<>();
+
+    static {
+        ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.ARRAY);
+        ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.MAP);
+        ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.MULTISET);
+        ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.STRUCTURED_TYPE);
+        ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.ROW);
+        ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.RAW);
+        ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.BINARY);
+        ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.VARBINARY);
+    }
+
+    /**
+     * Checks that the table does not have primary key defined on illegal types. In Elasticsearch
+     * the primary key is used to calculate the Elasticsearch document id, which is a string of up
+     * to 512 bytes. It cannot have whitespaces. As of now it is calculated by concatenating the
+     * fields. Certain types do not have a good string representation to be used in this scenario.
+     * The illegal types are mostly {@link LogicalTypeFamily#COLLECTION} types and {@link
+     * LogicalTypeRoot#RAW} type.
+     */
+    public static void validatePrimaryKey(TableSchema schema) {
+        schema.getPrimaryKey()
+                .ifPresent(
+                        key -> {
+                            List<LogicalTypeRoot> illegalTypes =
+                                    key.getColumns().stream()
+                                            .map(
+                                                    fieldName -> {
+                                                        LogicalType logicalType =
+                                                                schema.getFieldDataType(fieldName)
+                                                                        .get()
+                                                                        .getLogicalType();
+                                                        if (logicalType.is(
+                                                                LogicalTypeRoot.DISTINCT_TYPE)) {
+                                                            return ((DistinctType) logicalType)
+                                                                    .getSourceType()
+                                                                    .getTypeRoot();
+                                                        } else {
+                                                            return logicalType.getTypeRoot();
+                                                        }
+                                                    })
+                                            .filter(ILLEGAL_PRIMARY_KEY_TYPES::contains)
+                                            .collect(Collectors.toList());
+
+                            if (!illegalTypes.isEmpty()) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "The table has a primary key on columns of illegal types: %s.\n"
+                                                        + " Elasticsearch sink does not support primary keys on columns of types: %s.",
+                                                illegalTypes, ILLEGAL_PRIMARY_KEY_TYPES));
+                            }
+                        });
+    }
+
+    private ElasticsearchValidationUtils() {}
+}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGenerator.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGenerator.java
new file mode 100644
index 0000000..636f340
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGenerator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+
+/** This interface is responsible to generate index name from given {@link Row} record. */
+@Internal
+interface IndexGenerator extends Serializable {
+
+    /**
+     * Initialize the index generator, this will be called only once before {@link
+     * #generate(RowData)} is called.
+     */
+    default void open() {}
+
+    /** Generate index name according the the given row. */
+    String generate(RowData row);
+}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorBase.java
new file mode 100644
index 0000000..adfcaa4
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorBase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Objects;
+
+/** Base class for {@link IndexGenerator}. */
+@Internal
+public abstract class IndexGeneratorBase implements IndexGenerator {
+
+    private static final long serialVersionUID = 1L;
+    protected final String index;
+
+    public IndexGeneratorBase(String index) {
+        this.index = index;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof IndexGeneratorBase)) {
+            return false;
+        }
+        IndexGeneratorBase that = (IndexGeneratorBase) o;
+        return index.equals(that.index);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(index);
+    }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java
new file mode 100644
index 0000000..48f0107
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Factory of {@link IndexGenerator}.
+ *
+ * <p>Flink supports both static index and dynamic index.
+ *
+ * <p>If you want to have a static index, this option value should be a plain string, e.g.
+ * 'myusers', all the records will be consistently written into "myusers" index.
+ *
+ * <p>If you want to have a dynamic index, you can use '{field_name}' to reference a field value in
+ * the record to dynamically generate a target index. You can also use
+ * '{field_name|date_format_string}' to convert a field value of TIMESTAMP/DATE/TIME type into the
+ * format specified by date_format_string. The date_format_string is compatible with {@link
+ * java.text.SimpleDateFormat}. For example, if the option value is 'myusers_{log_ts|yyyy-MM-dd}',
+ * then a record with log_ts field value 2020-03-27 12:25:55 will be written into
+ * "myusers_2020-03-27" index.
+ */
+@Internal
+final class IndexGeneratorFactory {
+
+    private IndexGeneratorFactory() {}
+
+    public static IndexGenerator createIndexGenerator(String index, TableSchema schema) {
+        return createIndexGenerator(index, schema, ZoneId.systemDefault());
+    }
+
+    public static IndexGenerator createIndexGenerator(
+            String index, TableSchema schema, ZoneId localTimeZoneId) {
+        final IndexHelper indexHelper = new IndexHelper();
+        if (indexHelper.checkIsDynamicIndex(index)) {
+            return createRuntimeIndexGenerator(
+                    index,
+                    schema.getFieldNames(),
+                    schema.getFieldDataTypes(),
+                    indexHelper,
+                    localTimeZoneId);
+        } else {
+            return new StaticIndexGenerator(index);
+        }
+    }
+
+    interface DynamicFormatter extends Serializable {
+        String format(@Nonnull Object fieldValue, DateTimeFormatter formatter);
+    }
+
+    private static IndexGenerator createRuntimeIndexGenerator(
+            String index,
+            String[] fieldNames,
+            DataType[] fieldTypes,
+            IndexHelper indexHelper,
+            ZoneId localTimeZoneId) {
+        final String dynamicIndexPatternStr = indexHelper.extractDynamicIndexPatternStr(index);
+        final String indexPrefix = index.substring(0, index.indexOf(dynamicIndexPatternStr));
+        final String indexSuffix =
+                index.substring(indexPrefix.length() + dynamicIndexPatternStr.length());
+
+        if (indexHelper.checkIsDynamicIndexWithSystemTimeFormat(index)) {
+            final String dateTimeFormat =
+                    indexHelper.extractDateFormat(
+                            index, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+            return new AbstractTimeIndexGenerator(index, dateTimeFormat) {
+                @Override
+                public String generate(RowData row) {
+                    return indexPrefix
+                            .concat(LocalDateTime.now(localTimeZoneId).format(dateTimeFormatter))
+                            .concat(indexSuffix);
+                }
+            };
+        }
+
+        final boolean isDynamicIndexWithFormat = indexHelper.checkIsDynamicIndexWithFormat(index);
+        final int indexFieldPos =
+                indexHelper.extractIndexFieldPos(index, fieldNames, isDynamicIndexWithFormat);
+        final LogicalType indexFieldType = fieldTypes[indexFieldPos].getLogicalType();
+        final LogicalTypeRoot indexFieldLogicalTypeRoot = indexFieldType.getTypeRoot();
+
+        // validate index field type
+        indexHelper.validateIndexFieldType(indexFieldLogicalTypeRoot);
+
+        // time extract dynamic index pattern
+        final RowData.FieldGetter fieldGetter =
+                RowData.createFieldGetter(indexFieldType, indexFieldPos);
+
+        if (isDynamicIndexWithFormat) {
+            final String dateTimeFormat =
+                    indexHelper.extractDateFormat(index, indexFieldLogicalTypeRoot);
+            DynamicFormatter formatFunction =
+                    createFormatFunction(indexFieldType, indexFieldLogicalTypeRoot);
+
+            return new AbstractTimeIndexGenerator(index, dateTimeFormat) {
+                @Override
+                public String generate(RowData row) {
+                    Object fieldOrNull = fieldGetter.getFieldOrNull(row);
+                    final String formattedField;
+                    // TODO we can possibly optimize it to use the nullability of the field
+                    if (fieldOrNull != null) {
+                        formattedField = formatFunction.format(fieldOrNull, dateTimeFormatter);
+                    } else {
+                        formattedField = "null";
+                    }
+                    return indexPrefix.concat(formattedField).concat(indexSuffix);
+                }
+            };
+        }
+        // general dynamic index pattern
+        return new IndexGeneratorBase(index) {
+            @Override
+            public String generate(RowData row) {
+                Object indexField = fieldGetter.getFieldOrNull(row);
+                return indexPrefix
+                        .concat(indexField == null ? "null" : indexField.toString())
+                        .concat(indexSuffix);
+            }
+        };
+    }
+
+    private static DynamicFormatter createFormatFunction(
+            LogicalType indexFieldType, LogicalTypeRoot indexFieldLogicalTypeRoot) {
+        switch (indexFieldLogicalTypeRoot) {
+            case DATE:
+                return (value, dateTimeFormatter) -> {
+                    Integer indexField = (Integer) value;
+                    return LocalDate.ofEpochDay(indexField).format(dateTimeFormatter);
+                };
+            case TIME_WITHOUT_TIME_ZONE:
+                return (value, dateTimeFormatter) -> {
+                    Integer indexField = (Integer) value;
+                    return LocalTime.ofNanoOfDay(indexField * 1_000_000L).format(dateTimeFormatter);
+                };
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return (value, dateTimeFormatter) -> {
+                    TimestampData indexField = (TimestampData) value;
+                    return indexField.toLocalDateTime().format(dateTimeFormatter);
+                };
+            case TIMESTAMP_WITH_TIME_ZONE:
+                throw new UnsupportedOperationException(
+                        "TIMESTAMP_WITH_TIME_ZONE is not supported yet");
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return (value, dateTimeFormatter) -> {
+                    TimestampData indexField = (TimestampData) value;
+                    return indexField.toInstant().atZone(ZoneOffset.UTC).format(dateTimeFormatter);
+                };
+            default:
+                throw new TableException(
+                        String.format(
+                                "Unsupported type '%s' found in Elasticsearch dynamic index field, "
+                                        + "time-related pattern only support types are: DATE,TIME,TIMESTAMP.",
+                                indexFieldType));
+        }
+    }
+
+    /**
+     * Helper class for {@link IndexGeneratorFactory}, this helper can use to validate index field
+     * type ans parse index format from pattern.
+     */
+    static class IndexHelper {
+        private static final Pattern dynamicIndexPattern = Pattern.compile("\\{[^\\{\\}]+\\}?");
+        private static final Pattern dynamicIndexTimeExtractPattern =
+                Pattern.compile(".*\\{.+\\|.*\\}.*");
+        private static final Pattern dynamicIndexSystemTimeExtractPattern =
+                Pattern.compile(
+                        ".*\\{\\s*(now\\(\\s*\\)|NOW\\(\\s*\\)|current_timestamp|CURRENT_TIMESTAMP)\\s*\\|.*\\}.*");
+        private static final List<LogicalTypeRoot> supportedTypes = new ArrayList<>();
+        private static final Map<LogicalTypeRoot, String> defaultFormats = new HashMap<>();
+
+        static {
+            // time related types
+            supportedTypes.add(LogicalTypeRoot.DATE);
+            supportedTypes.add(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE);
+            supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+            supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE);
+            supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+            // general types
+            supportedTypes.add(LogicalTypeRoot.VARCHAR);
+            supportedTypes.add(LogicalTypeRoot.CHAR);
+            supportedTypes.add(LogicalTypeRoot.TINYINT);
+            supportedTypes.add(LogicalTypeRoot.INTEGER);
+            supportedTypes.add(LogicalTypeRoot.BIGINT);
+        }
+
+        static {
+            defaultFormats.put(LogicalTypeRoot.DATE, "yyyy_MM_dd");
+            defaultFormats.put(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, "HH_mm_ss");
+            defaultFormats.put(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, "yyyy_MM_dd_HH_mm_ss");
+            defaultFormats.put(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE, "yyyy_MM_dd_HH_mm_ss");
+            defaultFormats.put(
+                    LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, "yyyy_MM_dd_HH_mm_ssX");
+        }
+
+        /** Validate the index field Type. */
+        void validateIndexFieldType(LogicalTypeRoot logicalType) {
+            if (!supportedTypes.contains(logicalType)) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Unsupported type %s of index field, " + "Supported types are: %s",
+                                logicalType, supportedTypes));
+            }
+        }
+
+        /** Get the default date format. */
+        String getDefaultFormat(LogicalTypeRoot logicalType) {
+            return defaultFormats.get(logicalType);
+        }
+
+        /** Check general dynamic index is enabled or not by index pattern. */
+        boolean checkIsDynamicIndex(String index) {
+            final Matcher matcher = dynamicIndexPattern.matcher(index);
+            int count = 0;
+            while (matcher.find()) {
+                count++;
+            }
+            if (count > 1) {
+                throw new TableException(
+                        String.format(
+                                "Chaining dynamic index pattern %s is not supported,"
+                                        + " only support single dynamic index pattern.",
+                                index));
+            }
+            return count == 1;
+        }
+
+        /** Check time extract dynamic index is enabled or not by index pattern. */
+        boolean checkIsDynamicIndexWithFormat(String index) {
+            return dynamicIndexTimeExtractPattern.matcher(index).matches();
+        }
+
+        /** Check generate dynamic index is from system time or not. */
+        boolean checkIsDynamicIndexWithSystemTimeFormat(String index) {
+            return dynamicIndexSystemTimeExtractPattern.matcher(index).matches();
+        }
+
+        /** Extract dynamic index pattern string from index pattern string. */
+        String extractDynamicIndexPatternStr(String index) {
+            int start = index.indexOf("{");
+            int end = index.lastIndexOf("}");
+            return index.substring(start, end + 1);
+        }
+
+        /** Extract index field position in a fieldNames, return the field position. */
+        int extractIndexFieldPos(
+                String index, String[] fieldNames, boolean isDynamicIndexWithFormat) {
+            List<String> fieldList = Arrays.asList(fieldNames);
+            String indexFieldName;
+            if (isDynamicIndexWithFormat) {
+                indexFieldName = index.substring(index.indexOf("{") + 1, index.indexOf("|"));
+            } else {
+                indexFieldName = index.substring(index.indexOf("{") + 1, index.indexOf("}"));
+            }
+            if (!fieldList.contains(indexFieldName)) {
+                throw new TableException(
+                        String.format(
+                                "Unknown field '%s' in index pattern '%s', please check the field name.",
+                                indexFieldName, index));
+            }
+            return fieldList.indexOf(indexFieldName);
+        }
+
+        /** Extract dateTime format by the date format that extracted from index pattern string. */
+        private String extractDateFormat(String index, LogicalTypeRoot logicalType) {
+            String format = index.substring(index.indexOf("|") + 1, index.indexOf("}"));
+            if ("".equals(format)) {
+                format = getDefaultFormat(logicalType);
+            }
+            return format;
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractor.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractor.java
new file mode 100644
index 0000000..ae7c522
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractor.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.Period;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/** An extractor for a Elasticsearch key from a {@link RowData}. */
+@Internal
+class KeyExtractor implements Function<RowData, String>, Serializable {
+    private final FieldFormatter[] fieldFormatters;
+    private final String keyDelimiter;
+
+    private interface FieldFormatter extends Serializable {
+        String format(RowData rowData);
+    }
+
+    private KeyExtractor(FieldFormatter[] fieldFormatters, String keyDelimiter) {
+        this.fieldFormatters = fieldFormatters;
+        this.keyDelimiter = keyDelimiter;
+    }
+
+    @Override
+    public String apply(RowData rowData) {
+        final StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < fieldFormatters.length; i++) {
+            if (i > 0) {
+                builder.append(keyDelimiter);
+            }
+            final String value = fieldFormatters[i].format(rowData);
+            builder.append(value);
+        }
+        return builder.toString();
+    }
+
+    private static class ColumnWithIndex {
+        public TableColumn column;
+        public int index;
+
+        public ColumnWithIndex(TableColumn column, int index) {
+            this.column = column;
+            this.index = index;
+        }
+
+        public LogicalType getType() {
+            return column.getType().getLogicalType();
+        }
+
+        public int getIndex() {
+            return index;
+        }
+    }
+
+    public static Function<RowData, String> createKeyExtractor(
+            TableSchema schema, String keyDelimiter) {
+        return schema.getPrimaryKey()
+                .map(
+                        key -> {
+                            Map<String, ColumnWithIndex> namesToColumns = new HashMap<>();
+                            List<TableColumn> tableColumns = schema.getTableColumns();
+                            for (int i = 0; i < schema.getFieldCount(); i++) {
+                                TableColumn column = tableColumns.get(i);
+                                namesToColumns.put(
+                                        column.getName(), new ColumnWithIndex(column, i));
+                            }
+
+                            FieldFormatter[] fieldFormatters =
+                                    key.getColumns().stream()
+                                            .map(namesToColumns::get)
+                                            .map(
+                                                    column ->
+                                                            toFormatter(
+                                                                    column.index, column.getType()))
+                                            .toArray(FieldFormatter[]::new);
+
+                            return (Function<RowData, String>)
+                                    new KeyExtractor(fieldFormatters, keyDelimiter);
+                        })
+                .orElseGet(() -> (Function<RowData, String> & Serializable) (row) -> null);
+    }
+
+    private static FieldFormatter toFormatter(int index, LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case DATE:
+                return (row) -> LocalDate.ofEpochDay(row.getInt(index)).toString();
+            case TIME_WITHOUT_TIME_ZONE:
+                return (row) ->
+                        LocalTime.ofNanoOfDay((long) row.getInt(index) * 1_000_000L).toString();
+            case INTERVAL_YEAR_MONTH:
+                return (row) -> Period.ofDays(row.getInt(index)).toString();
+            case INTERVAL_DAY_TIME:
+                return (row) -> Duration.ofMillis(row.getLong(index)).toString();
+            case DISTINCT_TYPE:
+                return toFormatter(index, ((DistinctType) type).getSourceType());
+            default:
+                RowData.FieldGetter fieldGetter = RowData.createFieldGetter(type, index);
+                return (row) -> fieldGetter.getFieldOrNull(row).toString();
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RequestFactory.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RequestFactory.java
new file mode 100644
index 0000000..f5b2418
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RequestFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import java.io.Serializable;
+
+/** For version-agnostic creating of {@link ActionRequest}s. */
+@Internal
+interface RequestFactory extends Serializable {
+    /**
+     * Creates an update request to be added to a {@link RequestIndexer}. Note: the type field has
+     * been deprecated since Elasticsearch 7.x and it would not take any effort.
+     */
+    UpdateRequest createUpdateRequest(
+            String index, String docType, String key, XContentType contentType, byte[] document);
+
+    /**
+     * Creates an index request to be added to a {@link RequestIndexer}. Note: the type field has
+     * been deprecated since Elasticsearch 7.x and it would not take any effort.
+     */
+    IndexRequest createIndexRequest(
+            String index, String docType, String key, XContentType contentType, byte[] document);
+
+    /**
+     * Creates a delete request to be added to a {@link RequestIndexer}. Note: the type field has
+     * been deprecated since Elasticsearch 7.x and it would not take any effort.
+     */
+    DeleteRequest createDeleteRequest(String index, String docType, String key);
+}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java
new file mode 100644
index 0000000..7fb1e4d
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.function.Function;
+
+/** Sink function for converting upserts into Elasticsearch {@link ActionRequest}s. */
+@Internal
+class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final IndexGenerator indexGenerator;
+    private final String docType;
+    private final SerializationSchema<RowData> serializationSchema;
+    private final XContentType contentType;
+    private final RequestFactory requestFactory;
+    private final Function<RowData, String> createKey;
+
+    public RowElasticsearchSinkFunction(
+            IndexGenerator indexGenerator,
+            @Nullable String docType, // this is deprecated in es 7+
+            SerializationSchema<RowData> serializationSchema,
+            XContentType contentType,
+            RequestFactory requestFactory,
+            Function<RowData, String> createKey) {
+        this.indexGenerator = Preconditions.checkNotNull(indexGenerator);
+        this.docType = docType;
+        this.serializationSchema = Preconditions.checkNotNull(serializationSchema);
+        this.contentType = Preconditions.checkNotNull(contentType);
+        this.requestFactory = Preconditions.checkNotNull(requestFactory);
+        this.createKey = Preconditions.checkNotNull(createKey);
+    }
+
+    @Override
+    public void open() {
+        indexGenerator.open();
+    }
+
+    @Override
+    public void process(RowData element, RuntimeContext ctx, RequestIndexer indexer) {
+        switch (element.getRowKind()) {
+            case INSERT:
+            case UPDATE_AFTER:
+                processUpsert(element, indexer);
+                break;
+            case UPDATE_BEFORE:
+            case DELETE:
+                processDelete(element, indexer);
+                break;
+            default:
+                throw new TableException("Unsupported message kind: " + element.getRowKind());
+        }
+    }
+
+    private void processUpsert(RowData row, RequestIndexer indexer) {
+        final byte[] document = serializationSchema.serialize(row);
+        final String key = createKey.apply(row);
+        if (key != null) {
+            final UpdateRequest updateRequest =
+                    requestFactory.createUpdateRequest(
+                            indexGenerator.generate(row), docType, key, contentType, document);
+            indexer.add(updateRequest);
+        } else {
+            final IndexRequest indexRequest =
+                    requestFactory.createIndexRequest(
+                            indexGenerator.generate(row), docType, key, contentType, document);
+            indexer.add(indexRequest);
+        }
+    }
+
+    private void processDelete(RowData row, RequestIndexer indexer) {
+        final String key = createKey.apply(row);
+        final DeleteRequest deleteRequest =
+                requestFactory.createDeleteRequest(indexGenerator.generate(row), docType, key);
+        indexer.add(deleteRequest);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        RowElasticsearchSinkFunction that = (RowElasticsearchSinkFunction) o;
+        return Objects.equals(indexGenerator, that.indexGenerator)
+                && Objects.equals(docType, that.docType)
+                && Objects.equals(serializationSchema, that.serializationSchema)
+                && contentType == that.contentType
+                && Objects.equals(requestFactory, that.requestFactory)
+                && Objects.equals(createKey, that.createKey);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                indexGenerator,
+                docType,
+                serializationSchema,
+                contentType,
+                requestFactory,
+                createKey);
+    }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/StaticIndexGenerator.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/StaticIndexGenerator.java
new file mode 100644
index 0000000..1ffcac4
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/StaticIndexGenerator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+
+/** A static {@link IndexGenerator} which generate fixed index name. */
+@Internal
+final class StaticIndexGenerator extends IndexGeneratorBase {
+
+    public StaticIndexGenerator(String index) {
+        super(index);
+    }
+
+    public String generate(RowData row) {
+        return index;
+    }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java
new file mode 100644
index 0000000..a5f7759
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.UnsupportedTemporalTypeException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/** Tests for {@link IndexGeneratorFactory}. */
+public class IndexGeneratorFactoryTest extends TestLogger {
+
+    private TableSchema schema;
+    private List<RowData> rows;
+
+    @Before
+    public void prepareData() {
+        schema =
+                new TableSchema.Builder()
+                        .field("id", DataTypes.INT())
+                        .field("item", DataTypes.STRING())
+                        .field("log_ts", DataTypes.BIGINT())
+                        .field("log_date", DataTypes.DATE())
+                        .field("log_time", DataTypes.TIME())
+                        .field("order_timestamp", DataTypes.TIMESTAMP())
+                        .field("local_timestamp", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+                        .field("status", DataTypes.BOOLEAN())
+                        .build();
+
+        rows = new ArrayList<>();
+        rows.add(
+                GenericRowData.of(
+                        1,
+                        StringData.fromString("apple"),
+                        Timestamp.valueOf("2020-03-18 12:12:14").getTime(),
+                        (int) LocalDate.parse("2020-03-18").toEpochDay(),
+                        (int) (LocalTime.parse("12:12:14").toNanoOfDay() / 1_000_000L),
+                        TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-03-18T12:12:14")),
+                        TimestampData.fromInstant(Instant.parse("2020-03-18T12:12:14Z")),
+                        true));
+        rows.add(
+                GenericRowData.of(
+                        2,
+                        StringData.fromString("peanut"),
+                        Timestamp.valueOf("2020-03-19 12:12:14").getTime(),
+                        (int) LocalDate.parse("2020-03-19").toEpochDay(),
+                        (int) (LocalTime.parse("12:22:21").toNanoOfDay() / 1_000_000L),
+                        TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-03-19T12:22:14")),
+                        TimestampData.fromInstant(Instant.parse("2020-03-19T12:12:14Z")),
+                        false));
+    }
+
+    @Test
+    public void testDynamicIndexFromTimestamp() {
+        IndexGenerator indexGenerator =
+                IndexGeneratorFactory.createIndexGenerator(
+                        "{order_timestamp|yyyy_MM_dd_HH-ss}_index", schema);
+        indexGenerator.open();
+        Assert.assertEquals("2020_03_18_12-14_index", indexGenerator.generate(rows.get(0)));
+        IndexGenerator indexGenerator1 =
+                IndexGeneratorFactory.createIndexGenerator(
+                        "{order_timestamp|yyyy_MM_dd_HH_mm}_index", schema);
+        indexGenerator1.open();
+        Assert.assertEquals("2020_03_19_12_22_index", indexGenerator1.generate(rows.get(1)));
+    }
+
+    @Test
+    public void testDynamicIndexFromDate() {
+        IndexGenerator indexGenerator =
+                IndexGeneratorFactory.createIndexGenerator(
+                        "my-index-{log_date|yyyy/MM/dd}", schema);
+        indexGenerator.open();
+        Assert.assertEquals("my-index-2020/03/18", indexGenerator.generate(rows.get(0)));
+        Assert.assertEquals("my-index-2020/03/19", indexGenerator.generate(rows.get(1)));
+    }
+
+    @Test
+    public void testDynamicIndexFromTime() {
+        IndexGenerator indexGenerator =
+                IndexGeneratorFactory.createIndexGenerator("my-index-{log_time|HH-mm}", schema);
+        indexGenerator.open();
+        Assert.assertEquals("my-index-12-12", indexGenerator.generate(rows.get(0)));
+        Assert.assertEquals("my-index-12-22", indexGenerator.generate(rows.get(1)));
+    }
+
+    @Test
+    public void testDynamicIndexDefaultFormat() {
+        IndexGenerator indexGenerator =
+                IndexGeneratorFactory.createIndexGenerator("my-index-{log_time|}", schema);
+        indexGenerator.open();
+        Assert.assertEquals("my-index-12_12_14", indexGenerator.generate(rows.get(0)));
+        Assert.assertEquals("my-index-12_22_21", indexGenerator.generate(rows.get(1)));
+    }
+
+    @Test
+    public void testDynamicIndexFromSystemTime() {
+        List<String> supportedUseCases =
+                Arrays.asList(
+                        "now()",
+                        "NOW()",
+                        "now( )",
+                        "NOW(\t)",
+                        "\t NOW( ) \t",
+                        "current_timestamp",
+                        "CURRENT_TIMESTAMP",
+                        "\tcurrent_timestamp\t",
+                        " current_timestamp ");
+
+        supportedUseCases.stream()
+                .forEach(
+                        f -> {
+                            DateTimeFormatter dateTimeFormatter =
+                                    DateTimeFormatter.ofPattern("yyyy_MM_dd");
+                            IndexGenerator indexGenerator =
+                                    IndexGeneratorFactory.createIndexGenerator(
+                                            String.format("my-index-{%s|yyyy_MM_dd}", f), schema);
+                            indexGenerator.open();
+                            // The date may change during the running of the unit test.
+                            // Generate expected index-name based on the current time
+                            // before and after calling the generate method.
+                            String expectedIndex1 =
+                                    "my-index-" + LocalDateTime.now().format(dateTimeFormatter);
+                            String actualIndex = indexGenerator.generate(rows.get(1));
+                            String expectedIndex2 =
+                                    "my-index-" + LocalDateTime.now().format(dateTimeFormatter);
+                            Assert.assertTrue(
+                                    actualIndex.equals(expectedIndex1)
+                                            || actualIndex.equals(expectedIndex2));
+                        });
+
+        List<String> invalidUseCases =
+                Arrays.asList(
+                        "now",
+                        "now(",
+                        "NOW",
+                        "NOW)",
+                        "current_timestamp()",
+                        "CURRENT_TIMESTAMP()",
+                        "CURRENT_timestamp");
+        invalidUseCases.stream()
+                .forEach(
+                        f -> {
+                            String expectedExceptionMsg =
+                                    String.format(
+                                            "Unknown field '%s' in index pattern 'my-index-{%s|yyyy_MM_dd}',"
+                                                    + " please check the field name.",
+                                            f, f);
+                            try {
+                                IndexGenerator indexGenerator =
+                                        IndexGeneratorFactory.createIndexGenerator(
+                                                String.format("my-index-{%s|yyyy_MM_dd}", f),
+                                                schema);
+                                indexGenerator.open();
+                            } catch (TableException e) {
+                                Assert.assertEquals(expectedExceptionMsg, e.getMessage());
+                            }
+                        });
+    }
+
+    @Test
+    public void testDynamicIndexDefaultFormatTimestampWithLocalTimeZone() {
+        IndexGenerator indexGenerator =
+                IndexGeneratorFactory.createIndexGenerator("my-index-{local_timestamp|}", schema);
+        indexGenerator.open();
+        Assert.assertEquals("my-index-2020_03_18_12_12_14Z", indexGenerator.generate(rows.get(0)));
+        Assert.assertEquals("my-index-2020_03_19_12_12_14Z", indexGenerator.generate(rows.get(1)));
+    }
+
+    @Test
+    public void testGeneralDynamicIndex() {
+        IndexGenerator indexGenerator =
+                IndexGeneratorFactory.createIndexGenerator("index_{item}", schema);
+        indexGenerator.open();
+        Assert.assertEquals("index_apple", indexGenerator.generate(rows.get(0)));
+        Assert.assertEquals("index_peanut", indexGenerator.generate(rows.get(1)));
+    }
+
+    @Test
+    public void testStaticIndex() {
+        IndexGenerator indexGenerator =
+                IndexGeneratorFactory.createIndexGenerator("my-index", schema);
+        indexGenerator.open();
+        Assert.assertEquals("my-index", indexGenerator.generate(rows.get(0)));
+        Assert.assertEquals("my-index", indexGenerator.generate(rows.get(1)));
+    }
+
+    @Test
+    public void testUnknownField() {
+        String expectedExceptionMsg =
+                "Unknown field 'unknown_ts' in index pattern 'my-index-{unknown_ts|yyyy-MM-dd}',"
+                        + " please check the field name.";
+        try {
+            IndexGeneratorFactory.createIndexGenerator("my-index-{unknown_ts|yyyy-MM-dd}", schema);
+        } catch (TableException e) {
+            Assert.assertEquals(e.getMessage(), expectedExceptionMsg);
+        }
+    }
+
+    @Test
+    public void testUnsupportedTimeType() {
+        String expectedExceptionMsg =
+                "Unsupported type 'INT' found in Elasticsearch dynamic index field, "
+                        + "time-related pattern only support types are: DATE,TIME,TIMESTAMP.";
+        try {
+            IndexGeneratorFactory.createIndexGenerator("my-index-{id|yyyy-MM-dd}", schema);
+        } catch (TableException e) {
+            Assert.assertEquals(expectedExceptionMsg, e.getMessage());
+        }
+    }
+
+    @Test
+    public void testUnsupportedMultiParametersType() {
+        String expectedExceptionMsg =
+                "Chaining dynamic index pattern my-index-{local_date}-{local_time} is not supported,"
+                        + " only support single dynamic index pattern.";
+        try {
+            IndexGeneratorFactory.createIndexGenerator(
+                    "my-index-{local_date}-{local_time}", schema);
+        } catch (TableException e) {
+            Assert.assertEquals(expectedExceptionMsg, e.getMessage());
+        }
+    }
+
+    @Test
+    public void testDynamicIndexUnsupportedFormat() {
+        String expectedExceptionMsg = "Unsupported field: HourOfDay";
+        try {
+            IndexGeneratorFactory.createIndexGenerator(
+                    "my-index-{log_date|yyyy/MM/dd HH:mm}", schema);
+        } catch (UnsupportedTemporalTypeException e) {
+            Assert.assertEquals(expectedExceptionMsg, e.getMessage());
+        }
+    }
+
+    @Test
+    public void testUnsupportedIndexFieldType() {
+        String expectedExceptionMsg =
+                "Unsupported type BOOLEAN of index field, Supported types are:"
+                        + " [DATE, TIME_WITHOUT_TIME_ZONE, TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_TIME_ZONE,"
+                        + " TIMESTAMP_WITH_LOCAL_TIME_ZONE, VARCHAR, CHAR, TINYINT, INTEGER, BIGINT]";
+        try {
+            IndexGeneratorFactory.createIndexGenerator("index_{status}", schema);
+        } catch (IllegalArgumentException e) {
+            Assert.assertEquals(expectedExceptionMsg, e.getMessage());
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractorTest.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractorTest.java
new file mode 100644
index 0000000..bcfb68d
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractorTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+
+import org.junit.Test;
+
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.function.Function;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/** Tests for {@link KeyExtractor}. */
+public class KeyExtractorTest {
+    @Test
+    public void testSimpleKey() {
+        TableSchema schema =
+                TableSchema.builder()
+                        .field("a", DataTypes.BIGINT().notNull())
+                        .field("b", DataTypes.STRING())
+                        .primaryKey("a")
+                        .build();
+
+        Function<RowData, String> keyExtractor = KeyExtractor.createKeyExtractor(schema, "_");
+
+        String key = keyExtractor.apply(GenericRowData.of(12L, StringData.fromString("ABCD")));
+        assertThat(key, equalTo("12"));
+    }
+
+    @Test
+    public void testNoPrimaryKey() {
+        TableSchema schema =
+                TableSchema.builder()
+                        .field("a", DataTypes.BIGINT().notNull())
+                        .field("b", DataTypes.STRING())
+                        .build();
+
+        Function<RowData, String> keyExtractor = KeyExtractor.createKeyExtractor(schema, "_");
+
+        String key = keyExtractor.apply(GenericRowData.of(12L, StringData.fromString("ABCD")));
+        assertThat(key, nullValue());
+    }
+
+    @Test
+    public void testTwoFieldsKey() {
+        TableSchema schema =
+                TableSchema.builder()
+                        .field("a", DataTypes.BIGINT().notNull())
+                        .field("b", DataTypes.STRING())
+                        .field("c", DataTypes.TIMESTAMP().notNull())
+                        .primaryKey("a", "c")
+                        .build();
+
+        Function<RowData, String> keyExtractor = KeyExtractor.createKeyExtractor(schema, "_");
+
+        String key =
+                keyExtractor.apply(
+                        GenericRowData.of(
+                                12L,
+                                StringData.fromString("ABCD"),
+                                TimestampData.fromLocalDateTime(
+                                        LocalDateTime.parse("2012-12-12T12:12:12"))));
+        assertThat(key, equalTo("12_2012-12-12T12:12:12"));
+    }
+
+    @Test
+    public void testAllTypesKey() {
+        TableSchema schema =
+                TableSchema.builder()
+                        .field("a", DataTypes.TINYINT().notNull())
+                        .field("b", DataTypes.SMALLINT().notNull())
+                        .field("c", DataTypes.INT().notNull())
+                        .field("d", DataTypes.BIGINT().notNull())
+                        .field("e", DataTypes.BOOLEAN().notNull())
+                        .field("f", DataTypes.FLOAT().notNull())
+                        .field("g", DataTypes.DOUBLE().notNull())
+                        .field("h", DataTypes.STRING().notNull())
+                        .field("i", DataTypes.TIMESTAMP().notNull())
+                        .field("j", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE().notNull())
+                        .field("k", DataTypes.TIME().notNull())
+                        .field("l", DataTypes.DATE().notNull())
+                        .primaryKey("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l")
+                        .build();
+
+        Function<RowData, String> keyExtractor = KeyExtractor.createKeyExtractor(schema, "_");
+
+        String key =
+                keyExtractor.apply(
+                        GenericRowData.of(
+                                (byte) 1,
+                                (short) 2,
+                                3,
+                                (long) 4,
+                                true,
+                                1.0f,
+                                2.0d,
+                                StringData.fromString("ABCD"),
+                                TimestampData.fromLocalDateTime(
+                                        LocalDateTime.parse("2012-12-12T12:12:12")),
+                                TimestampData.fromInstant(Instant.parse("2013-01-13T13:13:13Z")),
+                                (int) (LocalTime.parse("14:14:14").toNanoOfDay() / 1_000_000),
+                                (int) LocalDate.parse("2015-05-15").toEpochDay()));
+        assertThat(
+                key,
+                equalTo(
+                        "1_2_3_4_true_1.0_2.0_ABCD_2012-12-12T12:12:12_2013-01-13T13:13:13_14:14:14_2015-05-15"));
+    }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/TestContext.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/TestContext.java
new file mode 100644
index 0000000..4040514
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/TestContext.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/** A utility class for mocking {@link DynamicTableFactory.Context}. */
+class TestContext {
+
+    private ResolvedSchema schema = ResolvedSchema.of(Column.physical("a", DataTypes.TIME()));
+
+    private final Map<String, String> options = new HashMap<>();
+
+    public static TestContext context() {
+        return new TestContext();
+    }
+
+    public TestContext withSchema(ResolvedSchema schema) {
+        this.schema = schema;
+        return this;
+    }
+
+    DynamicTableFactory.Context build() {
+        return new FactoryUtil.DefaultDynamicTableContext(
+                ObjectIdentifier.of("default", "default", "t1"),
+                new ResolvedCatalogTable(
+                        CatalogTable.of(
+                                Schema.newBuilder().fromResolvedSchema(schema).build(),
+                                "mock context",
+                                Collections.emptyList(),
+                                options),
+                        schema),
+                Collections.emptyMap(),
+                new Configuration(),
+                TestContext.class.getClassLoader(),
+                false);
+    }
+
+    public TestContext withOption(String key, String value) {
+        options.put(key, value);
+        return this;
+    }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6Configuration.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6Configuration.java
new file mode 100644
index 0000000..8b83321
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6Configuration.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+
+import org.apache.http.HttpHost;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION;
+
+/** Elasticsearch 6 specific configuration. */
+@Internal
+final class Elasticsearch6Configuration extends ElasticsearchConfiguration {
+    Elasticsearch6Configuration(ReadableConfig config, ClassLoader classLoader) {
+        super(config, classLoader);
+    }
+
+    public List<HttpHost> getHosts() {
+        return config.get(HOSTS_OPTION).stream()
+                .map(Elasticsearch6Configuration::validateAndParseHostsString)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Parse Hosts String to list.
+     *
+     * <p>Hosts String format was given as following:
+     *
+     * <pre>
+     *     connector.hosts = http://host_name:9092;http://host_name:9093
+     * </pre>
+     */
+    private static HttpHost validateAndParseHostsString(String host) {
+        try {
+            HttpHost httpHost = HttpHost.create(host);
+            if (httpHost.getPort() < 0) {
+                throw new ValidationException(
+                        String.format(
+                                "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing port.",
+                                host, HOSTS_OPTION.key()));
+            }
+
+            if (httpHost.getSchemeName() == null) {
+                throw new ValidationException(
+                        String.format(
+                                "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing scheme.",
+                                host, HOSTS_OPTION.key()));
+            }
+            return httpHost;
+        } catch (Exception e) {
+            throw new ValidationException(
+                    String.format(
+                            "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'.",
+                            host, HOSTS_OPTION.key()),
+                    e);
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
new file mode 100644
index 0000000..0c7c756
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import javax.annotation.Nullable;
+
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * A {@link DynamicTableSink} that describes how to create a {@link ElasticsearchSink} from a
+ * logical description.
+ */
+@PublicEvolving
+final class Elasticsearch6DynamicSink implements DynamicTableSink {
+    @VisibleForTesting
+    static final Elasticsearch6RequestFactory REQUEST_FACTORY = new Elasticsearch6RequestFactory();
+
+    private final EncodingFormat<SerializationSchema<RowData>> format;
+    private final TableSchema schema;
+    private final Elasticsearch6Configuration config;
+    private final ZoneId localTimeZoneId;
+    private final boolean isDynamicIndexWithSystemTime;
+
+    public Elasticsearch6DynamicSink(
+            EncodingFormat<SerializationSchema<RowData>> format,
+            Elasticsearch6Configuration config,
+            TableSchema schema,
+            ZoneId localTimeZoneId) {
+        this(format, config, schema, localTimeZoneId, (ElasticsearchSink.Builder::new));
+    }
+
+    // --------------------------------------------------------------
+    // Hack to make configuration testing possible.
+    //
+    // The code in this block should never be used outside of tests.
+    // Having a way to inject a builder we can assert the builder in
+    // the test. We can not assert everything though, e.g. it is not
+    // possible to assert flushing on checkpoint, as it is configured
+    // on the sink itself.
+    // --------------------------------------------------------------
+
+    private final ElasticSearchBuilderProvider builderProvider;
+
+    @FunctionalInterface
+    interface ElasticSearchBuilderProvider {
+        ElasticsearchSink.Builder<RowData> createBuilder(
+                List<HttpHost> httpHosts, RowElasticsearchSinkFunction upsertSinkFunction);
+    }
+
+    Elasticsearch6DynamicSink(
+            EncodingFormat<SerializationSchema<RowData>> format,
+            Elasticsearch6Configuration config,
+            TableSchema schema,
+            ZoneId localTimeZoneId,
+            ElasticSearchBuilderProvider builderProvider) {
+        this.format = format;
+        this.schema = schema;
+        this.config = config;
+        this.localTimeZoneId = localTimeZoneId;
+        this.isDynamicIndexWithSystemTime = isDynamicIndexWithSystemTime();
+        this.builderProvider = builderProvider;
+    }
+
+    // --------------------------------------------------------------
+    // End of hack to make configuration testing possible
+    // --------------------------------------------------------------
+
+    public boolean isDynamicIndexWithSystemTime() {
+        IndexGeneratorFactory.IndexHelper indexHelper = new IndexGeneratorFactory.IndexHelper();
+        return indexHelper.checkIsDynamicIndexWithSystemTimeFormat(config.getIndex());
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+        for (RowKind kind : requestedMode.getContainedKinds()) {
+            if (kind != RowKind.UPDATE_BEFORE) {
+                builder.addContainedKind(kind);
+            }
+        }
+        if (isDynamicIndexWithSystemTime && !requestedMode.containsOnly(RowKind.INSERT)) {
+            throw new ValidationException(
+                    "Dynamic indexing based on system time only works on append only stream.");
+        }
+        return builder.build();
+    }
+
+    @Override
+    public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
+        return () -> {
+            SerializationSchema<RowData> format =
+                    this.format.createRuntimeEncoder(context, schema.toRowDataType());
+
+            final RowElasticsearchSinkFunction upsertFunction =
+                    new RowElasticsearchSinkFunction(
+                            IndexGeneratorFactory.createIndexGenerator(
+                                    config.getIndex(), schema, localTimeZoneId),
+                            config.getDocumentType(),
+                            format,
+                            XContentType.JSON,
+                            REQUEST_FACTORY,
+                            KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()));
+
+            final ElasticsearchSink.Builder<RowData> builder =
+                    builderProvider.createBuilder(config.getHosts(), upsertFunction);
+
+            builder.setFailureHandler(config.getFailureHandler());
+            builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
+            builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20));
+            builder.setBulkFlushInterval(config.getBulkFlushInterval());
+            builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
+            config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
+            config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
+            config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
+
+            // we must overwrite the default factory which is defined with a lambda because of a bug
+            // in shading lambda serialization shading see FLINK-18006
+            if (config.getUsername().isPresent()
+                    && config.getPassword().isPresent()
+                    && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())
+                    && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get())) {
+                builder.setRestClientFactory(
+                        new AuthRestClientFactory(
+                                config.getPathPrefix().orElse(null),
+                                config.getUsername().get(),
+                                config.getPassword().get()));
+            } else {
+                builder.setRestClientFactory(
+                        new DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
+            }
+
+            final ElasticsearchSink<RowData> sink = builder.build();
+
+            if (config.isDisableFlushOnCheckpoint()) {
+                sink.disableFlushOnCheckpoint();
+            }
+
+            return sink;
+        };
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        return this;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "Elasticsearch6";
+    }
+
+    /** Serializable {@link RestClientFactory} used by the sink. */
+    @VisibleForTesting
+    static class DefaultRestClientFactory implements RestClientFactory {
+
+        private final String pathPrefix;
+
+        public DefaultRestClientFactory(@Nullable String pathPrefix) {
+            this.pathPrefix = pathPrefix;
+        }
+
+        @Override
+        public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
+            if (pathPrefix != null) {
+                restClientBuilder.setPathPrefix(pathPrefix);
+            }
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            DefaultRestClientFactory that = (DefaultRestClientFactory) o;
+            return Objects.equals(pathPrefix, that.pathPrefix);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(pathPrefix);
+        }
+    }
+
+    /** Serializable {@link RestClientFactory} used by the sink which enable authentication. */
+    @VisibleForTesting
+    static class AuthRestClientFactory implements RestClientFactory {
+
+        private final String pathPrefix;
+        private final String username;
+        private final String password;
+        private transient CredentialsProvider credentialsProvider;
+
+        public AuthRestClientFactory(
+                @Nullable String pathPrefix, String username, String password) {
+            this.pathPrefix = pathPrefix;
+            this.password = password;
+            this.username = username;
+        }
+
+        @Override
+        public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
+            if (pathPrefix != null) {
+                restClientBuilder.setPathPrefix(pathPrefix);
+            }
+            if (credentialsProvider == null) {
+                credentialsProvider = new BasicCredentialsProvider();
+                credentialsProvider.setCredentials(
+                        AuthScope.ANY, new UsernamePasswordCredentials(username, password));
+            }
+            restClientBuilder.setHttpClientConfigCallback(
+                    httpAsyncClientBuilder ->
+                            httpAsyncClientBuilder.setDefaultCredentialsProvider(
+                                    credentialsProvider));
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            AuthRestClientFactory that = (AuthRestClientFactory) o;
+            return Objects.equals(pathPrefix, that.pathPrefix)
+                    && Objects.equals(username, that.username)
+                    && Objects.equals(password, that.password);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(pathPrefix, username, password);
+        }
+    }
+
+    /**
+     * Version-specific creation of {@link org.elasticsearch.action.ActionRequest}s used by the
+     * sink.
+     */
+    private static class Elasticsearch6RequestFactory implements RequestFactory {
+        @Override
+        public UpdateRequest createUpdateRequest(
+                String index,
+                String docType,
+                String key,
+                XContentType contentType,
+                byte[] document) {
+            return new UpdateRequest(index, docType, key)
+                    .doc(document, contentType)
+                    .upsert(document, contentType);
+        }
+
+        @Override
+        public IndexRequest createIndexRequest(
+                String index,
+                String docType,
+                String key,
+                XContentType contentType,
+                byte[] document) {
+            return new IndexRequest(index, docType, key).source(document, contentType);
+        }
+
+        @Override
+        public DeleteRequest createDeleteRequest(String index, String docType, String key) {
+            return new DeleteRequest(index, docType, key);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Elasticsearch6DynamicSink that = (Elasticsearch6DynamicSink) o;
+        return Objects.equals(format, that.format)
+                && Objects.equals(schema, that.schema)
+                && Objects.equals(config, that.config)
+                && Objects.equals(builderProvider, that.builderProvider);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(format, schema, config, builderProvider);
+    }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
new file mode 100644
index 0000000..ff600a7
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.StringUtils;
+
+import java.time.ZoneId;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLASH_MAX_SIZE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_MAX_RETRY_TIMEOUT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.DOCUMENT_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.FAILURE_HANDLER_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.FLUSH_ON_CHECKPOINT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.FORMAT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION;
+
+/** A {@link DynamicTableSinkFactory} for discovering {@link Elasticsearch6DynamicSink}. */
+@Internal
+public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory {
+    private static final Set<ConfigOption<?>> requiredOptions =
+            Stream.of(HOSTS_OPTION, INDEX_OPTION, DOCUMENT_TYPE_OPTION).collect(Collectors.toSet());
+    private static final Set<ConfigOption<?>> optionalOptions =
+            Stream.of(
+                            KEY_DELIMITER_OPTION,
+                            FAILURE_HANDLER_OPTION,
+                            FLUSH_ON_CHECKPOINT_OPTION,
+                            BULK_FLASH_MAX_SIZE_OPTION,
+                            BULK_FLUSH_MAX_ACTIONS_OPTION,
+                            BULK_FLUSH_INTERVAL_OPTION,
+                            BULK_FLUSH_BACKOFF_TYPE_OPTION,
+                            BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
+                            BULK_FLUSH_BACKOFF_DELAY_OPTION,
+                            CONNECTION_MAX_RETRY_TIMEOUT_OPTION,
+                            CONNECTION_PATH_PREFIX,
+                            FORMAT_OPTION,
+                            PASSWORD_OPTION,
+                            USERNAME_OPTION)
+                    .collect(Collectors.toSet());
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        TableSchema tableSchema = context.getCatalogTable().getSchema();
+        ElasticsearchValidationUtils.validatePrimaryKey(tableSchema);
+        final FactoryUtil.TableFactoryHelper helper =
+                FactoryUtil.createTableFactoryHelper(this, context);
+
+        final EncodingFormat<SerializationSchema<RowData>> format =
+                helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION);
+
+        helper.validate();
+        Configuration configuration = new Configuration();
+        context.getCatalogTable().getOptions().forEach(configuration::setString);
+        Elasticsearch6Configuration config =
+                new Elasticsearch6Configuration(configuration, context.getClassLoader());
+
+        validate(config, configuration);
+
+        return new Elasticsearch6DynamicSink(
+                format,
+                config,
+                TableSchemaUtils.getPhysicalSchema(tableSchema),
+                getLocalTimeZoneId(context.getConfiguration()));
+    }
+
+    ZoneId getLocalTimeZoneId(ReadableConfig readableConfig) {
+        final String zone = readableConfig.get(TableConfigOptions.LOCAL_TIME_ZONE);
+        final ZoneId zoneId =
+                TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone)
+                        ? ZoneId.systemDefault()
+                        : ZoneId.of(zone);
+
+        return zoneId;
+    }
+
+    private void validate(Elasticsearch6Configuration config, Configuration originalConfiguration) {
+        config.getFailureHandler(); // checks if we can instantiate the custom failure handler
+        config.getHosts(); // validate hosts
+        validate(
+                config.getIndex().length() >= 1,
+                () -> String.format("'%s' must not be empty", INDEX_OPTION.key()));
+        int maxActions = config.getBulkFlushMaxActions();
+        validate(
+                maxActions == -1 || maxActions >= 1,
+                () ->
+                        String.format(
+                                "'%s' must be at least 1. Got: %s",
+                                BULK_FLUSH_MAX_ACTIONS_OPTION.key(), maxActions));
+        long maxSize = config.getBulkFlushMaxByteSize();
+        long mb1 = 1024 * 1024;
+        validate(
+                maxSize == -1 || (maxSize >= mb1 && maxSize % mb1 == 0),
+                () ->
+                        String.format(
+                                "'%s' must be in MB granularity. Got: %s",
+                                BULK_FLASH_MAX_SIZE_OPTION.key(),
+                                originalConfiguration
+                                        .get(BULK_FLASH_MAX_SIZE_OPTION)
+                                        .toHumanReadableString()));
+        validate(
+                config.getBulkFlushBackoffRetries().map(retries -> retries >= 1).orElse(true),
+                () ->
+                        String.format(
+                                "'%s' must be at least 1. Got: %s",
+                                BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(),
+                                config.getBulkFlushBackoffRetries().get()));
+        if (config.getUsername().isPresent()
+                && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())) {
+            validate(
+                    config.getPassword().isPresent()
+                            && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get()),
+                    () ->
+                            String.format(
+                                    "'%s' and '%s' must be set at the same time. Got: username '%s' and password '%s'",
+                                    USERNAME_OPTION.key(),
+                                    PASSWORD_OPTION.key(),
+                                    config.getUsername().get(),
+                                    config.getPassword().orElse("")));
+        }
+    }
+
+    private static void validate(boolean condition, Supplier<String> message) {
+        if (!condition) {
+            throw new ValidationException(message.get());
+        }
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return "elasticsearch-6";
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return requiredOptions;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        return optionalOptions;
+    }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connectors/flink-connector-elasticsearch6/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index bb5a894..29a8593 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.connector.elasticsearch.table.Elasticsearch6DynamicSinkFactory
+org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch6DynamicSinkFactory
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java
new file mode 100644
index 0000000..e99abbe
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.TestContext.context;
+
+/** Tests for validation in {@link Elasticsearch6DynamicSinkFactory}. */
+public class Elasticsearch6DynamicSinkFactoryTest extends TestLogger {
+    @Rule public ExpectedException thrown = ExpectedException.none();
+
+    @Test
+    public void validateEmptyConfiguration() {
+        Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage(
+                "One or more required options are missing.\n"
+                        + "\n"
+                        + "Missing required options are:\n"
+                        + "\n"
+                        + "document-type\n"
+                        + "hosts\n"
+                        + "index");
+        sinkFactory.createDynamicTableSink(context().build());
+    }
+
+    @Test
+    public void validateWrongIndex() {
+        Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage("'index' must not be empty");
+        sinkFactory.createDynamicTableSink(
+                context()
+                        .withOption("index", "")
+                        .withOption("document-type", "MyType")
+                        .withOption("hosts", "http://localhost:12345")
+                        .build());
+    }
+
+    @Test
+    public void validateWrongHosts() {
+        Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage(
+                "Could not parse host 'wrong-host' in option 'hosts'. It should follow the format 'http://host_name:port'.");
+        sinkFactory.createDynamicTableSink(
+                context()
+                        .withOption("index", "MyIndex")
+                        .withOption("document-type", "MyType")
+                        .withOption("hosts", "wrong-host")
+                        .build());
+    }
+
+    @Test
+    public void validateWrongFlushSize() {
+        Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage(
+                "'sink.bulk-flush.max-size' must be in MB granularity. Got: 1024 bytes");
+        sinkFactory.createDynamicTableSink(
+                context()
+                        .withOption(ElasticsearchConnectorOptions.INDEX_OPTION.key(), "MyIndex")
+                        .withOption(
+                                ElasticsearchConnectorOptions.DOCUMENT_TYPE_OPTION.key(), "MyType")
+                        .withOption(
+                                ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                                "http://localhost:1234")
+                        .withOption(
+                                ElasticsearchConnectorOptions.BULK_FLASH_MAX_SIZE_OPTION.key(),
+                                "1kb")
+                        .build());
+    }
+
+    @Test
+    public void validateWrongRetries() {
+        Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage("'sink.bulk-flush.backoff.max-retries' must be at least 1. Got: 0");
+        sinkFactory.createDynamicTableSink(
+                context()
+                        .withOption(ElasticsearchConnectorOptions.INDEX_OPTION.key(), "MyIndex")
+                        .withOption(
+                                ElasticsearchConnectorOptions.DOCUMENT_TYPE_OPTION.key(), "MyType")
+                        .withOption(
+                                ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                                "http://localhost:1234")
+                        .withOption(
+                                ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION
+                                        .key(),
+                                "0")
+                        .build());
+    }
+
+    @Test
+    public void validateWrongMaxActions() {
+        Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage("'sink.bulk-flush.max-actions' must be at least 1. Got: -2");
+        sinkFactory.createDynamicTableSink(
+                context()
+                        .withOption(ElasticsearchConnectorOptions.INDEX_OPTION.key(), "MyIndex")
+                        .withOption(
+                                ElasticsearchConnectorOptions.DOCUMENT_TYPE_OPTION.key(), "MyType")
+                        .withOption(
+                                ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                                "http://localhost:1234")
+                        .withOption(
+                                ElasticsearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION.key(),
+                                "-2")
+                        .build());
+    }
+
+    @Test
+    public void validateWrongBackoffDelay() {
+        Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage("Invalid value for option 'sink.bulk-flush.backoff.delay'.");
+        sinkFactory.createDynamicTableSink(
+                context()
+                        .withOption(ElasticsearchConnectorOptions.INDEX_OPTION.key(), "MyIndex")
+                        .withOption(
+                                ElasticsearchConnectorOptions.DOCUMENT_TYPE_OPTION.key(), "MyType")
+                        .withOption(
+                                ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                                "http://localhost:1234")
+                        .withOption(
+                                ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION.key(),
+                                "-1s")
+                        .build());
+    }
+
+    @Test
+    public void validatePrimaryKeyOnIllegalColumn() {
+        Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage(
+                "The table has a primary key on columns of illegal types: "
+                        + "[ARRAY, MAP, MULTISET, ROW, RAW, VARBINARY].\n"
+                        + " Elasticsearch sink does not support primary keys on columns of types: "
+                        + "[ARRAY, MAP, MULTISET, STRUCTURED_TYPE, ROW, RAW, BINARY, VARBINARY].");
+        sinkFactory.createDynamicTableSink(
+                context()
+                        .withSchema(
+                                new ResolvedSchema(
+                                        Arrays.asList(
+                                                Column.physical("a", DataTypes.BIGINT().notNull()),
+                                                Column.physical(
+                                                        "b",
+                                                        DataTypes.ARRAY(
+                                                                        DataTypes.BIGINT()
+                                                                                .notNull())
+                                                                .notNull()),
+                                                Column.physical(
+                                                        "c",
+                                                        DataTypes.MAP(
+                                                                        DataTypes.BIGINT(),
+                                                                        DataTypes.STRING())
+                                                                .notNull()),
+                                                Column.physical(
+                                                        "d",
+                                                        DataTypes.MULTISET(
+                                                                        DataTypes.BIGINT()
+                                                                                .notNull())
+                                                                .notNull()),
+                                                Column.physical(
+                                                        "e",
+                                                        DataTypes.ROW(
+                                                                        DataTypes.FIELD(
+                                                                                "a",
+                                                                                DataTypes.BIGINT()))
+                                                                .notNull()),
+                                                Column.physical(
+                                                        "f",
+                                                        DataTypes.RAW(
+                                                                        Void.class,
+                                                                        VoidSerializer.INSTANCE)
+                                                                .notNull()),
+                                                Column.physical("g", DataTypes.BYTES().notNull())),
+                                        Collections.emptyList(),
+                                        UniqueConstraint.primaryKey(
+                                                "name",
+                                                Arrays.asList("a", "b", "c", "d", "e", "f", "g"))))
+                        .withOption(ElasticsearchConnectorOptions.INDEX_OPTION.key(), "MyIndex")
+                        .withOption(
+                                ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                                "http://localhost:1234")
+                        .withOption(
+                                ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION.key(),
+                                "1s")
+                        .build());
+    }
+
+    @Test
+    public void validateWrongCredential() {
+        Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage(
+                "'username' and 'password' must be set at the same time. Got: username 'username' and password ''");
+        sinkFactory.createDynamicTableSink(
+                context()
+                        .withOption(ElasticsearchConnectorOptions.INDEX_OPTION.key(), "MyIndex")
+                        .withOption(
+                                ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                                "http://localhost:1234")
+                        .withOption(
+                                ElasticsearchConnectorOptions.DOCUMENT_TYPE_OPTION.key(), "MyType")
+                        .withOption(ElasticsearchConnectorOptions.USERNAME_OPTION.key(), "username")
+                        .withOption(ElasticsearchConnectorOptions.PASSWORD_OPTION.key(), "")
+                        .build());
+    }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
new file mode 100644
index 0000000..0ebc52b
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.TestLogger;
+
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.TestContext.context;
+import static org.apache.flink.table.api.Expressions.row;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/** IT tests for {@link Elasticsearch6DynamicSink}. */
+public class Elasticsearch6DynamicSinkITCase extends TestLogger {
+
+    @ClassRule
+    public static ElasticsearchContainer elasticsearchContainer =
+            new ElasticsearchContainer(DockerImageName.parse(DockerImageVersions.ELASTICSEARCH_6));
+
+    @SuppressWarnings("deprecation")
+    protected final Client getClient() {
+        TransportAddress transportAddress =
+                new TransportAddress(elasticsearchContainer.getTcpHost());
+        String expectedClusterName = "docker-cluster";
+        Settings settings = Settings.builder().put("cluster.name", expectedClusterName).build();
+        return new PreBuiltTransportClient(settings).addTransportAddress(transportAddress);
+    }
+
+    @Test
+    public void testWritingDocuments() throws Exception {
+        ResolvedSchema schema =
+                new ResolvedSchema(
+                        Arrays.asList(
+                                Column.physical("a", DataTypes.BIGINT().notNull()),
+                                Column.physical("b", DataTypes.TIME()),
+                                Column.physical("c", DataTypes.STRING().notNull()),
+                                Column.physical("d", DataTypes.FLOAT()),
+                                Column.physical("e", DataTypes.TINYINT().notNull()),
+                                Column.physical("f", DataTypes.DATE()),
+                                Column.physical("g", DataTypes.TIMESTAMP().notNull())),
+                        Collections.emptyList(),
+                        UniqueConstraint.primaryKey("name", Arrays.asList("a", "g")));
+        GenericRowData rowData =
+                GenericRowData.of(
+                        1L,
+                        12345,
+                        StringData.fromString("ABCDE"),
+                        12.12f,
+                        (byte) 2,
+                        12345,
+                        TimestampData.fromLocalDateTime(
+                                LocalDateTime.parse("2012-12-12T12:12:12")));
+
+        String index = "writing-documents";
+        String myType = "MyType";
+        Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+        SinkFunctionProvider sinkRuntimeProvider =
+                (SinkFunctionProvider)
+                        sinkFactory
+                                .createDynamicTableSink(
+                                        context()
+                                                .withSchema(schema)
+                                                .withOption(
+                                                        ElasticsearchConnectorOptions.INDEX_OPTION
+                                                                .key(),
+                                                        index)
+                                                .withOption(
+                                                        ElasticsearchConnectorOptions
+                                                                .DOCUMENT_TYPE_OPTION
+                                                                .key(),
+                                                        myType)
+                                                .withOption(
+                                                        ElasticsearchConnectorOptions.HOSTS_OPTION
+                                                                .key(),
+                                                        elasticsearchContainer.getHttpHostAddress())
+                                                .withOption(
+                                                        ElasticsearchConnectorOptions
+                                                                .FLUSH_ON_CHECKPOINT_OPTION
+                                                                .key(),
+                                                        "false")
+                                                .build())
+                                .getSinkRuntimeProvider(new MockContext());
+
+        SinkFunction<RowData> sinkFunction = sinkRuntimeProvider.createSinkFunction();
+        StreamExecutionEnvironment environment =
+                StreamExecutionEnvironment.getExecutionEnvironment();
+        environment.setParallelism(4);
+
+        rowData.setRowKind(RowKind.UPDATE_AFTER);
+        environment.<RowData>fromElements(rowData).addSink(sinkFunction);
+        environment.execute();
+
+        Client client = getClient();
+        Map<String, Object> response =
+                client.get(new GetRequest(index, myType, "1_2012-12-12T12:12:12"))
+                        .actionGet()
+                        .getSource();
+        Map<Object, Object> expectedMap = new HashMap<>();
+        expectedMap.put("a", 1);
+        expectedMap.put("b", "00:00:12");
+        expectedMap.put("c", "ABCDE");
+        expectedMap.put("d", 12.12d);
+        expectedMap.put("e", 2);
+        expectedMap.put("f", "2003-10-20");
+        expectedMap.put("g", "2012-12-12 12:12:12");
+        assertThat(response, equalTo(expectedMap));
+    }
+
+    @Test
+    public void testWritingDocumentsFromTableApi() throws Exception {
+        TableEnvironment tableEnvironment =
+                TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+
+        String index = "table-api";
+        String myType = "MyType";
+        tableEnvironment.executeSql(
+                "CREATE TABLE esTable ("
+                        + "a BIGINT NOT NULL,\n"
+                        + "b TIME,\n"
+                        + "c STRING NOT NULL,\n"
+                        + "d FLOAT,\n"
+                        + "e TINYINT NOT NULL,\n"
+                        + "f DATE,\n"
+                        + "g TIMESTAMP NOT NULL,\n"
+                        + "h as a + 2,\n"
+                        + "PRIMARY KEY (a, g) NOT ENFORCED\n"
+                        + ")\n"
+                        + "WITH (\n"
+                        + String.format("'%s'='%s',\n", "connector", "elasticsearch-6")
+                        + String.format(
+                                "'%s'='%s',\n",
+                                ElasticsearchConnectorOptions.INDEX_OPTION.key(), index)
+                        + String.format(
+                                "'%s'='%s',\n",
+                                ElasticsearchConnectorOptions.DOCUMENT_TYPE_OPTION.key(), myType)
+                        + String.format(
+                                "'%s'='%s',\n",
+                                ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                                elasticsearchContainer.getHttpHostAddress())
+                        + String.format(
+                                "'%s'='%s'\n",
+                                ElasticsearchConnectorOptions.FLUSH_ON_CHECKPOINT_OPTION.key(),
+                                "false")
+                        + ")");
+
+        tableEnvironment
+                .fromValues(
+                        row(
+                                1L,
+                                LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+                                "ABCDE",
+                                12.12f,
+                                (byte) 2,
+                                LocalDate.ofEpochDay(12345),
+                                LocalDateTime.parse("2012-12-12T12:12:12")))
+                .executeInsert("esTable")
+                .await();
+
+        Client client = getClient();
+        Map<String, Object> response =
+                client.get(new GetRequest(index, myType, "1_2012-12-12T12:12:12"))
+                        .actionGet()
+                        .getSource();
+        Map<Object, Object> expectedMap = new HashMap<>();
+        expectedMap.put("a", 1);
+        expectedMap.put("b", "00:00:12");
+        expectedMap.put("c", "ABCDE");
+        expectedMap.put("d", 12.12d);
+        expectedMap.put("e", 2);
+        expectedMap.put("f", "2003-10-20");
+        expectedMap.put("g", "2012-12-12 12:12:12");
+        assertThat(response, equalTo(expectedMap));
+    }
+
+    @Test
+    public void testWritingDocumentsNoPrimaryKey() throws Exception {
+        TableEnvironment tableEnvironment =
+                TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+
+        String index = "no-primary-key";
+        String myType = "MyType";
+        tableEnvironment.executeSql(
+                "CREATE TABLE esTable ("
+                        + "a BIGINT NOT NULL,\n"
+                        + "b TIME,\n"
+                        + "c STRING NOT NULL,\n"
+                        + "d FLOAT,\n"
+                        + "e TINYINT NOT NULL,\n"
+                        + "f DATE,\n"
+                        + "g TIMESTAMP NOT NULL\n"
+                        + ")\n"
+                        + "WITH (\n"
+                        + String.format("'%s'='%s',\n", "connector", "elasticsearch-6")
+                        + String.format(
+                                "'%s'='%s',\n",
+                                ElasticsearchConnectorOptions.INDEX_OPTION.key(), index)
+                        + String.format(
+                                "'%s'='%s',\n",
+                                ElasticsearchConnectorOptions.DOCUMENT_TYPE_OPTION.key(), myType)
+                        + String.format(
+                                "'%s'='%s',\n",
+                                ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                                elasticsearchContainer.getHttpHostAddress())
+                        + String.format(
+                                "'%s'='%s'\n",
+                                ElasticsearchConnectorOptions.FLUSH_ON_CHECKPOINT_OPTION.key(),
+                                "false")
+                        + ")");
+
+        tableEnvironment
+                .fromValues(
+                        row(
+                                1L,
+                                LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+                                "ABCDE",
+                                12.12f,
+                                (byte) 2,
+                                LocalDate.ofEpochDay(12345),
+                                LocalDateTime.parse("2012-12-12T12:12:12")),
+                        row(
+                                2L,
+                                LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+                                "FGHIJK",
+                                13.13f,
+                                (byte) 4,
+                                LocalDate.ofEpochDay(12345),
+                                LocalDateTime.parse("2013-12-12T13:13:13")))
+                .executeInsert("esTable")
+                .await();
+
+        Client client = getClient();
+
+        // search API does not return documents that were not indexed, we might need to query
+        // the index a few times
+        Deadline deadline = Deadline.fromNow(Duration.ofSeconds(30));
+        SearchHits hits;
+        do {
+            hits = client.prepareSearch(index).execute().actionGet().getHits();
+            if (hits.getTotalHits() < 2) {
+                Thread.sleep(200);
+            }
+        } while (hits.getTotalHits() < 2 && deadline.hasTimeLeft());
+
+        if (hits.getTotalHits() < 2) {
+            throw new AssertionError("Could not retrieve results from Elasticsearch.");
+        }
+
+        HashSet<Map<String, Object>> resultSet = new HashSet<>();
+        resultSet.add(hits.getAt(0).getSourceAsMap());
+        resultSet.add(hits.getAt(1).getSourceAsMap());
+        Map<Object, Object> expectedMap1 = new HashMap<>();
+        expectedMap1.put("a", 1);
+        expectedMap1.put("b", "00:00:12");
+        expectedMap1.put("c", "ABCDE");
+        expectedMap1.put("d", 12.12d);
+        expectedMap1.put("e", 2);
+        expectedMap1.put("f", "2003-10-20");
+        expectedMap1.put("g", "2012-12-12 12:12:12");
+        Map<Object, Object> expectedMap2 = new HashMap<>();
+        expectedMap2.put("a", 2);
+        expectedMap2.put("b", "00:00:12");
+        expectedMap2.put("c", "FGHIJK");
+        expectedMap2.put("d", 13.13d);
+        expectedMap2.put("e", 4);
+        expectedMap2.put("f", "2003-10-20");
+        expectedMap2.put("g", "2013-12-12 13:13:13");
+        HashSet<Map<Object, Object>> expectedSet = new HashSet<>();
+        expectedSet.add(expectedMap1);
+        expectedSet.add(expectedMap2);
+        assertThat(resultSet, equalTo(expectedSet));
+    }
+
+    @Test
+    public void testWritingDocumentsWithDynamicIndex() throws Exception {
+        TableEnvironment tableEnvironment =
+                TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+
+        String index = "dynamic-index-{b|yyyy-MM-dd}";
+        String myType = "MyType";
+        tableEnvironment.executeSql(
+                "CREATE TABLE esTable ("
+                        + "a BIGINT NOT NULL,\n"
+                        + "b TIMESTAMP NOT NULL,\n"
+                        + "PRIMARY KEY (a) NOT ENFORCED\n"
+                        + ")\n"
+                        + "WITH (\n"
+                        + String.format("'%s'='%s',\n", "connector", "elasticsearch-6")
+                        + String.format(
+                                "'%s'='%s',\n",
+                                ElasticsearchConnectorOptions.INDEX_OPTION.key(), index)
+                        + String.format(
+                                "'%s'='%s',\n",
+                                ElasticsearchConnectorOptions.DOCUMENT_TYPE_OPTION.key(), myType)
+                        + String.format(
+                                "'%s'='%s',\n",
+                                ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                                elasticsearchContainer.getHttpHostAddress())
+                        + String.format(
+                                "'%s'='%s'\n",
+                                ElasticsearchConnectorOptions.FLUSH_ON_CHECKPOINT_OPTION.key(),
+                                "false")
+                        + ")");
+
+        tableEnvironment
+                .fromValues(row(1L, LocalDateTime.parse("2012-12-12T12:12:12")))
+                .executeInsert("esTable")
+                .await();
+
+        Client client = getClient();
+        Map<String, Object> response =
+                client.get(new GetRequest("dynamic-index-2012-12-12", myType, "1"))
+                        .actionGet()
+                        .getSource();
+        Map<Object, Object> expectedMap = new HashMap<>();
+        expectedMap.put("a", 1);
+        expectedMap.put("b", "2012-12-12 12:12:12");
+        assertThat(response, equalTo(expectedMap));
+    }
+
+    private static class MockContext implements DynamicTableSink.Context {
+        @Override
+        public boolean isBounded() {
+            return false;
+        }
+
+        @Override
+        public TypeInformation<?> createTypeInformation(DataType consumedDataType) {
+            return null;
+        }
+
+        @Override
+        public TypeInformation<?> createTypeInformation(LogicalType consumedLogicalType) {
+            return null;
+        }
+
+        @Override
+        public DynamicTableSink.DataStructureConverter createDataStructureConverter(
+                DataType consumedDataType) {
+            return null;
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java
new file mode 100644
index 0000000..f8ab0ab
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.time.ZoneId;
+import java.util.List;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+/** Tests for {@link Elasticsearch6DynamicSink} parameters. */
+public class Elasticsearch6DynamicSinkTest extends TestLogger {
+
+    private static final String FIELD_KEY = "key";
+    private static final String FIELD_FRUIT_NAME = "fruit_name";
+    private static final String FIELD_COUNT = "count";
+    private static final String FIELD_TS = "ts";
+
+    private static final String HOSTNAME = "host1";
+    private static final int PORT = 1234;
+    private static final String SCHEMA = "https";
+    private static final String INDEX = "MyIndex";
+    private static final String DOC_TYPE = "MyType";
+    private static final String USERNAME = "username";
+    private static final String PASSWORD = "password";
+
+    @Test
+    public void testBuilder() {
+        final TableSchema schema = createTestSchema();
+
+        BuilderProvider provider = new BuilderProvider();
+        final Elasticsearch6DynamicSink testSink =
+                new Elasticsearch6DynamicSink(
+                        new DummyEncodingFormat(),
+                        new Elasticsearch6Configuration(
+                                getConfig(), this.getClass().getClassLoader()),
+                        schema,
+                        ZoneId.systemDefault(),
+                        provider);
+
+        testSink.getSinkRuntimeProvider(new MockSinkContext()).createSinkFunction();
+
+        verify(provider.builderSpy).setFailureHandler(new DummyFailureHandler());
+        verify(provider.builderSpy).setBulkFlushBackoff(true);
+        verify(provider.builderSpy)
+                .setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
+        verify(provider.builderSpy).setBulkFlushBackoffDelay(123);
+        verify(provider.builderSpy).setBulkFlushBackoffRetries(3);
+        verify(provider.builderSpy).setBulkFlushInterval(100);
+        verify(provider.builderSpy).setBulkFlushMaxActions(1000);
+        verify(provider.builderSpy).setBulkFlushMaxSizeMb(1);
+        verify(provider.builderSpy)
+                .setRestClientFactory(
+                        new Elasticsearch6DynamicSink.DefaultRestClientFactory("/myapp"));
+        verify(provider.sinkSpy).disableFlushOnCheckpoint();
+    }
+
+    @Test
+    public void testDefaultConfig() {
+        final TableSchema schema = createTestSchema();
+        Configuration configuration = new Configuration();
+        configuration.setString(ElasticsearchConnectorOptions.INDEX_OPTION.key(), INDEX);
+        configuration.setString(ElasticsearchConnectorOptions.DOCUMENT_TYPE_OPTION.key(), DOC_TYPE);
+        configuration.setString(
+                ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                SCHEMA + "://" + HOSTNAME + ":" + PORT);
+
+        BuilderProvider provider = new BuilderProvider();
+        final Elasticsearch6DynamicSink testSink =
+                new Elasticsearch6DynamicSink(
+                        new DummyEncodingFormat(),
+                        new Elasticsearch6Configuration(
+                                configuration, this.getClass().getClassLoader()),
+                        schema,
+                        ZoneId.systemDefault(),
+                        provider);
+
+        testSink.getSinkRuntimeProvider(new MockSinkContext()).createSinkFunction();
+
+        verify(provider.builderSpy).setFailureHandler(new NoOpFailureHandler());
+        verify(provider.builderSpy).setBulkFlushBackoff(false);
+        verify(provider.builderSpy).setBulkFlushInterval(1000);
+        verify(provider.builderSpy).setBulkFlushMaxActions(1000);
+        verify(provider.builderSpy).setBulkFlushMaxSizeMb(2);
+        verify(provider.builderSpy)
+                .setRestClientFactory(new Elasticsearch6DynamicSink.DefaultRestClientFactory(null));
+        verify(provider.sinkSpy, never()).disableFlushOnCheckpoint();
+    }
+
+    @Test
+    public void testAuthConfig() {
+        final TableSchema schema = createTestSchema();
+        Configuration configuration = new Configuration();
+        configuration.setString(ElasticsearchConnectorOptions.INDEX_OPTION.key(), INDEX);
+        configuration.setString(ElasticsearchConnectorOptions.DOCUMENT_TYPE_OPTION.key(), DOC_TYPE);
+        configuration.setString(
+                ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                SCHEMA + "://" + HOSTNAME + ":" + PORT);
+        configuration.setString(ElasticsearchConnectorOptions.USERNAME_OPTION.key(), USERNAME);
+        configuration.setString(ElasticsearchConnectorOptions.PASSWORD_OPTION.key(), PASSWORD);
+
+        BuilderProvider provider = new BuilderProvider();
+        final Elasticsearch6DynamicSink testSink =
+                new Elasticsearch6DynamicSink(
+                        new DummyEncodingFormat(),
+                        new Elasticsearch6Configuration(
+                                configuration, this.getClass().getClassLoader()),
+                        schema,
+                        ZoneId.systemDefault(),
+                        provider);
+
+        testSink.getSinkRuntimeProvider(new MockSinkContext()).createSinkFunction();
+
+        verify(provider.builderSpy).setFailureHandler(new NoOpFailureHandler());
+        verify(provider.builderSpy).setBulkFlushBackoff(false);
+        verify(provider.builderSpy).setBulkFlushInterval(1000);
+        verify(provider.builderSpy).setBulkFlushMaxActions(1000);
+        verify(provider.builderSpy).setBulkFlushMaxSizeMb(2);
+        verify(provider.builderSpy)
+                .setRestClientFactory(
+                        new Elasticsearch6DynamicSink.AuthRestClientFactory(
+                                null, USERNAME, PASSWORD));
+        verify(provider.sinkSpy, never()).disableFlushOnCheckpoint();
+    }
+
+    private Configuration getConfig() {
+        Configuration configuration = new Configuration();
+        configuration.setString(ElasticsearchConnectorOptions.INDEX_OPTION.key(), INDEX);
+        configuration.setString(ElasticsearchConnectorOptions.DOCUMENT_TYPE_OPTION.key(), DOC_TYPE);
+        configuration.setString(
+                ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                SCHEMA + "://" + HOSTNAME + ":" + PORT);
+        configuration.setString(
+                ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION.key(), "exponential");
+        configuration.setString(
+                ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION.key(), "123");
+        configuration.setString(
+                ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(), "3");
+        configuration.setString(
+                ElasticsearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION.key(), "100");
+        configuration.setString(
+                ElasticsearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION.key(), "1000");
+        configuration.setString(
+                ElasticsearchConnectorOptions.BULK_FLASH_MAX_SIZE_OPTION.key(), "1mb");
+        configuration.setString(
+                ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX.key(), "/myapp");
+        configuration.setString(
+                ElasticsearchConnectorOptions.FAILURE_HANDLER_OPTION.key(),
+                DummyFailureHandler.class.getName());
+        configuration.setString(
+                ElasticsearchConnectorOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false");
+        return configuration;
+    }
+
+    private static class BuilderProvider
+            implements Elasticsearch6DynamicSink.ElasticSearchBuilderProvider {
+        public ElasticsearchSink.Builder<RowData> builderSpy;
+        public ElasticsearchSink<RowData> sinkSpy;
+
+        @Override
+        public ElasticsearchSink.Builder<RowData> createBuilder(
+                List<HttpHost> httpHosts, RowElasticsearchSinkFunction upsertSinkFunction) {
+            builderSpy =
+                    Mockito.spy(new ElasticsearchSink.Builder<>(httpHosts, upsertSinkFunction));
+            doAnswer(
+                            invocation -> {
+                                sinkSpy =
+                                        Mockito.spy(
+                                                (ElasticsearchSink<RowData>)
+                                                        invocation.callRealMethod());
+                                return sinkSpy;
+                            })
+                    .when(builderSpy)
+                    .build();
+
+            return builderSpy;
+        }
+    }
+
+    private TableSchema createTestSchema() {
+        return TableSchema.builder()
+                .field(FIELD_KEY, DataTypes.BIGINT())
+                .field(FIELD_FRUIT_NAME, DataTypes.STRING())
+                .field(FIELD_COUNT, DataTypes.DECIMAL(10, 4))
+                .field(FIELD_TS, DataTypes.TIMESTAMP(3))
+                .build();
+    }
+
+    private static class DummySerializationSchema implements SerializationSchema<RowData> {
+
+        private static final DummySerializationSchema INSTANCE = new DummySerializationSchema();
+
+        @Override
+        public byte[] serialize(RowData element) {
+            return new byte[0];
+        }
+    }
+
+    private static class DummyEncodingFormat
+            implements EncodingFormat<SerializationSchema<RowData>> {
+        @Override
+        public SerializationSchema<RowData> createRuntimeEncoder(
+                DynamicTableSink.Context context, DataType consumedDataType) {
+            return DummySerializationSchema.INSTANCE;
+        }
+
+        @Override
+        public ChangelogMode getChangelogMode() {
+            return null;
+        }
+    }
+
+    private static class MockSinkContext implements DynamicTableSink.Context {
+        @Override
+        public boolean isBounded() {
+            return false;
+        }
+
+        @Override
+        public TypeInformation<?> createTypeInformation(DataType consumedDataType) {
+            return null;
+        }
+
+        @Override
+        public TypeInformation<?> createTypeInformation(LogicalType consumedLogicalType) {
+            return null;
+        }
+
+        @Override
+        public DynamicTableSink.DataStructureConverter createDataStructureConverter(
+                DataType consumedDataType) {
+            return null;
+        }
+    }
+
+    /** Custom failure handler for testing. */
+    public static class DummyFailureHandler implements ActionRequestFailureHandler {
+
+        @Override
+        public void onFailure(
+                ActionRequest action,
+                Throwable failure,
+                int restStatusCode,
+                RequestIndexer indexer) {
+            // do nothing
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return o instanceof DummyFailureHandler;
+        }
+
+        @Override
+        public int hashCode() {
+            return DummyFailureHandler.class.hashCode();
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7Configuration.java b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7Configuration.java
new file mode 100644
index 0000000..6bd28cf
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7Configuration.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+
+import org.apache.http.HttpHost;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION;
+
+/** Elasticsearch 7 specific configuration. */
+@Internal
+final class Elasticsearch7Configuration extends ElasticsearchConfiguration {
+    Elasticsearch7Configuration(ReadableConfig config, ClassLoader classLoader) {
+        super(config, classLoader);
+    }
+
+    public List<HttpHost> getHosts() {
+        return config.get(HOSTS_OPTION).stream()
+                .map(Elasticsearch7Configuration::validateAndParseHostsString)
+                .collect(Collectors.toList());
+    }
+
+    private static HttpHost validateAndParseHostsString(String host) {
+        try {
+            HttpHost httpHost = HttpHost.create(host);
+            if (httpHost.getPort() < 0) {
+                throw new ValidationException(
+                        String.format(
+                                "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing port.",
+                                host, HOSTS_OPTION.key()));
+            }
+
+            if (httpHost.getSchemeName() == null) {
+                throw new ValidationException(
+                        String.format(
+                                "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing scheme.",
+                                host, HOSTS_OPTION.key()));
+            }
+            return httpHost;
+        } catch (Exception e) {
+            throw new ValidationException(
+                    String.format(
+                            "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'.",
+                            host, HOSTS_OPTION.key()),
+                    e);
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
new file mode 100644
index 0000000..1926e44
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import javax.annotation.Nullable;
+
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * A {@link DynamicTableSink} that describes how to create a {@link ElasticsearchSink} from a
+ * logical description.
+ */
+@Internal
+final class Elasticsearch7DynamicSink implements DynamicTableSink {
+    @VisibleForTesting
+    static final Elasticsearch7RequestFactory REQUEST_FACTORY = new Elasticsearch7RequestFactory();
+
+    private final EncodingFormat<SerializationSchema<RowData>> format;
+    private final TableSchema schema;
+    private final Elasticsearch7Configuration config;
+    private final ZoneId localTimeZoneId;
+    private final boolean isDynamicIndexWithSystemTime;
+
+    public Elasticsearch7DynamicSink(
+            EncodingFormat<SerializationSchema<RowData>> format,
+            Elasticsearch7Configuration config,
+            TableSchema schema,
+            ZoneId localTimeZoneId) {
+        this(format, config, schema, localTimeZoneId, (ElasticsearchSink.Builder::new));
+    }
+
+    // --------------------------------------------------------------
+    // Hack to make configuration testing possible.
+    //
+    // The code in this block should never be used outside of tests.
+    // Having a way to inject a builder we can assert the builder in
+    // the test. We can not assert everything though, e.g. it is not
+    // possible to assert flushing on checkpoint, as it is configured
+    // on the sink itself.
+    // --------------------------------------------------------------
+
+    private final ElasticSearchBuilderProvider builderProvider;
+
+    @FunctionalInterface
+    interface ElasticSearchBuilderProvider {
+        ElasticsearchSink.Builder<RowData> createBuilder(
+                List<HttpHost> httpHosts, RowElasticsearchSinkFunction upsertSinkFunction);
+    }
+
+    Elasticsearch7DynamicSink(
+            EncodingFormat<SerializationSchema<RowData>> format,
+            Elasticsearch7Configuration config,
+            TableSchema schema,
+            ZoneId localTimeZoneId,
+            ElasticSearchBuilderProvider builderProvider) {
+        this.format = format;
+        this.schema = schema;
+        this.config = config;
+        this.localTimeZoneId = localTimeZoneId;
+        this.isDynamicIndexWithSystemTime = isDynamicIndexWithSystemTime();
+        this.builderProvider = builderProvider;
+    }
+
+    // --------------------------------------------------------------
+    // End of hack to make configuration testing possible
+    // --------------------------------------------------------------
+
+    public boolean isDynamicIndexWithSystemTime() {
+        IndexGeneratorFactory.IndexHelper indexHelper = new IndexGeneratorFactory.IndexHelper();
+        return indexHelper.checkIsDynamicIndexWithSystemTimeFormat(config.getIndex());
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+        for (RowKind kind : requestedMode.getContainedKinds()) {
+            if (kind != RowKind.UPDATE_BEFORE) {
+                builder.addContainedKind(kind);
+            }
+        }
+        if (isDynamicIndexWithSystemTime && !requestedMode.containsOnly(RowKind.INSERT)) {
+            throw new ValidationException(
+                    "Dynamic indexing based on system time only works on append only stream.");
+        }
+        return builder.build();
+    }
+
+    @Override
+    public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
+        return () -> {
+            SerializationSchema<RowData> format =
+                    this.format.createRuntimeEncoder(context, schema.toRowDataType());
+
+            final RowElasticsearchSinkFunction upsertFunction =
+                    new RowElasticsearchSinkFunction(
+                            IndexGeneratorFactory.createIndexGenerator(
+                                    config.getIndex(), schema, localTimeZoneId),
+                            null, // this is deprecated in es 7+
+                            format,
+                            XContentType.JSON,
+                            REQUEST_FACTORY,
+                            KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()));
+
+            final ElasticsearchSink.Builder<RowData> builder =
+                    builderProvider.createBuilder(config.getHosts(), upsertFunction);
+
+            builder.setFailureHandler(config.getFailureHandler());
+            builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
+            builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20));
+            builder.setBulkFlushInterval(config.getBulkFlushInterval());
+            builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
+            config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
+            config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
+            config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
+
+            // we must overwrite the default factory which is defined with a lambda because of a bug
+            // in shading lambda serialization shading see FLINK-18006
+            if (config.getUsername().isPresent()
+                    && config.getPassword().isPresent()
+                    && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())
+                    && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get())) {
+                builder.setRestClientFactory(
+                        new AuthRestClientFactory(
+                                config.getPathPrefix().orElse(null),
+                                config.getUsername().get(),
+                                config.getPassword().get()));
+            } else {
+                builder.setRestClientFactory(
+                        new DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
+            }
+
+            final ElasticsearchSink<RowData> sink = builder.build();
+
+            if (config.isDisableFlushOnCheckpoint()) {
+                sink.disableFlushOnCheckpoint();
+            }
+
+            return sink;
+        };
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        return this;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "Elasticsearch7";
+    }
+
+    /** Serializable {@link RestClientFactory} used by the sink. */
+    @VisibleForTesting
+    static class DefaultRestClientFactory implements RestClientFactory {
+
+        private final String pathPrefix;
+
+        public DefaultRestClientFactory(@Nullable String pathPrefix) {
+            this.pathPrefix = pathPrefix;
+        }
+
+        @Override
+        public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
+            if (pathPrefix != null) {
+                restClientBuilder.setPathPrefix(pathPrefix);
+            }
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            DefaultRestClientFactory that = (DefaultRestClientFactory) o;
+            return Objects.equals(pathPrefix, that.pathPrefix);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(pathPrefix);
+        }
+    }
+
+    /** Serializable {@link RestClientFactory} used by the sink which enable authentication. */
+    @VisibleForTesting
+    static class AuthRestClientFactory implements RestClientFactory {
+
+        private final String pathPrefix;
+        private final String username;
+        private final String password;
+        private transient CredentialsProvider credentialsProvider;
+
+        public AuthRestClientFactory(
+                @Nullable String pathPrefix, String username, String password) {
+            this.pathPrefix = pathPrefix;
+            this.password = password;
+            this.username = username;
+        }
+
+        @Override
+        public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
+            if (pathPrefix != null) {
+                restClientBuilder.setPathPrefix(pathPrefix);
+            }
+            if (credentialsProvider == null) {
+                credentialsProvider = new BasicCredentialsProvider();
+                credentialsProvider.setCredentials(
+                        AuthScope.ANY, new UsernamePasswordCredentials(username, password));
+            }
+            restClientBuilder.setHttpClientConfigCallback(
+                    httpAsyncClientBuilder ->
+                            httpAsyncClientBuilder.setDefaultCredentialsProvider(
+                                    credentialsProvider));
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            AuthRestClientFactory that = (AuthRestClientFactory) o;
+            return Objects.equals(pathPrefix, that.pathPrefix)
+                    && Objects.equals(username, that.username)
+                    && Objects.equals(password, that.password);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(pathPrefix, password, username);
+        }
+    }
+
+    /**
+     * Version-specific creation of {@link org.elasticsearch.action.ActionRequest}s used by the
+     * sink.
+     */
+    private static class Elasticsearch7RequestFactory implements RequestFactory {
+        @Override
+        public UpdateRequest createUpdateRequest(
+                String index,
+                String docType,
+                String key,
+                XContentType contentType,
+                byte[] document) {
+            return new UpdateRequest(index, key)
+                    .doc(document, contentType)
+                    .upsert(document, contentType);
+        }
+
+        @Override
+        public IndexRequest createIndexRequest(
+                String index,
+                String docType,
+                String key,
+                XContentType contentType,
+                byte[] document) {
+            return new IndexRequest(index).id(key).source(document, contentType);
+        }
+
+        @Override
+        public DeleteRequest createDeleteRequest(String index, String docType, String key) {
+            return new DeleteRequest(index, key);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Elasticsearch7DynamicSink that = (Elasticsearch7DynamicSink) o;
+        return Objects.equals(format, that.format)
+                && Objects.equals(schema, that.schema)
+                && Objects.equals(config, that.config)
+                && Objects.equals(builderProvider, that.builderProvider);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(format, schema, config, builderProvider);
+    }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java
new file mode 100644
index 0000000..50c37f0
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.StringUtils;
+
+import java.time.ZoneId;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLASH_MAX_SIZE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_MAX_RETRY_TIMEOUT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.FAILURE_HANDLER_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.FLUSH_ON_CHECKPOINT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.FORMAT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION;
+
+/** A {@link DynamicTableSinkFactory} for discovering {@link Elasticsearch7DynamicSink}. */
+@Internal
+public class Elasticsearch7DynamicSinkFactory implements DynamicTableSinkFactory {
+    private static final Set<ConfigOption<?>> requiredOptions =
+            Stream.of(HOSTS_OPTION, INDEX_OPTION).collect(Collectors.toSet());
+    private static final Set<ConfigOption<?>> optionalOptions =
+            Stream.of(
+                            KEY_DELIMITER_OPTION,
+                            FAILURE_HANDLER_OPTION,
+                            FLUSH_ON_CHECKPOINT_OPTION,
+                            BULK_FLASH_MAX_SIZE_OPTION,
+                            BULK_FLUSH_MAX_ACTIONS_OPTION,
+                            BULK_FLUSH_INTERVAL_OPTION,
+                            BULK_FLUSH_BACKOFF_TYPE_OPTION,
+                            BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
+                            BULK_FLUSH_BACKOFF_DELAY_OPTION,
+                            CONNECTION_MAX_RETRY_TIMEOUT_OPTION,
+                            CONNECTION_PATH_PREFIX,
+                            FORMAT_OPTION,
+                            PASSWORD_OPTION,
+                            USERNAME_OPTION)
+                    .collect(Collectors.toSet());
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        TableSchema tableSchema = context.getCatalogTable().getSchema();
+        ElasticsearchValidationUtils.validatePrimaryKey(tableSchema);
+
+        final FactoryUtil.TableFactoryHelper helper =
+                FactoryUtil.createTableFactoryHelper(this, context);
+
+        final EncodingFormat<SerializationSchema<RowData>> format =
+                helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION);
+
+        helper.validate();
+        Configuration configuration = new Configuration();
+        context.getCatalogTable().getOptions().forEach(configuration::setString);
+        Elasticsearch7Configuration config =
+                new Elasticsearch7Configuration(configuration, context.getClassLoader());
+
+        validate(config, configuration);
+
+        return new Elasticsearch7DynamicSink(
+                format,
+                config,
+                TableSchemaUtils.getPhysicalSchema(tableSchema),
+                getLocalTimeZoneId(context.getConfiguration()));
+    }
+
+    ZoneId getLocalTimeZoneId(ReadableConfig readableConfig) {
+        final String zone = readableConfig.get(TableConfigOptions.LOCAL_TIME_ZONE);
+        final ZoneId zoneId =
+                TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone)
+                        ? ZoneId.systemDefault()
+                        : ZoneId.of(zone);
+
+        return zoneId;
+    }
+
+    private void validate(Elasticsearch7Configuration config, Configuration originalConfiguration) {
+        config.getFailureHandler(); // checks if we can instantiate the custom failure handler
+        config.getHosts(); // validate hosts
+        validate(
+                config.getIndex().length() >= 1,
+                () -> String.format("'%s' must not be empty", INDEX_OPTION.key()));
+        int maxActions = config.getBulkFlushMaxActions();
+        validate(
+                maxActions == -1 || maxActions >= 1,
+                () ->
+                        String.format(
+                                "'%s' must be at least 1. Got: %s",
+                                BULK_FLUSH_MAX_ACTIONS_OPTION.key(), maxActions));
+        long maxSize = config.getBulkFlushMaxByteSize();
+        long mb1 = 1024 * 1024;
+        validate(
+                maxSize == -1 || (maxSize >= mb1 && maxSize % mb1 == 0),
+                () ->
+                        String.format(
+                                "'%s' must be in MB granularity. Got: %s",
+                                BULK_FLASH_MAX_SIZE_OPTION.key(),
+                                originalConfiguration
+                                        .get(BULK_FLASH_MAX_SIZE_OPTION)
+                                        .toHumanReadableString()));
+        validate(
+                config.getBulkFlushBackoffRetries().map(retries -> retries >= 1).orElse(true),
+                () ->
+                        String.format(
+                                "'%s' must be at least 1. Got: %s",
+                                BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(),
+                                config.getBulkFlushBackoffRetries().get()));
+        if (config.getUsername().isPresent()
+                && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())) {
+            validate(
+                    config.getPassword().isPresent()
+                            && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get()),
+                    () ->
+                            String.format(
+                                    "'%s' and '%s' must be set at the same time. Got: username '%s' and password '%s'",
+                                    USERNAME_OPTION.key(),
+                                    PASSWORD_OPTION.key(),
+                                    config.getUsername().get(),
+                                    config.getPassword().orElse("")));
+        }
+    }
+
+    private static void validate(boolean condition, Supplier<String> message) {
+        if (!condition) {
+            throw new ValidationException(message.get());
+        }
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return "elasticsearch-7";
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return requiredOptions;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        return optionalOptions;
+    }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connectors/flink-connector-elasticsearch7/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 9e189e3..10e4846 100644
--- a/flink-connectors/flink-connector-elasticsearch7/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/flink-connectors/flink-connector-elasticsearch7/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.connector.elasticsearch.table.Elasticsearch7DynamicSinkFactory
+org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7DynamicSinkFactory
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactoryTest.java b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactoryTest.java
new file mode 100644
index 0000000..3ff21ed
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactoryTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.TestContext.context;
+
+/** Tests for validation in {@link Elasticsearch7DynamicSinkFactory}. */
+public class Elasticsearch7DynamicSinkFactoryTest extends TestLogger {
+    @Rule public ExpectedException thrown = ExpectedException.none();
+
+    @Test
+    public void validateEmptyConfiguration() {
+        Elasticsearch7DynamicSinkFactory sinkFactory = new Elasticsearch7DynamicSinkFactory();
+
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage(
+                "One or more required options are missing.\n"
+                        + "\n"
+                        + "Missing required options are:\n"
+                        + "\n"
+                        + "hosts\n"
+                        + "index");
+        sinkFactory.createDynamicTableSink(context().build());
+    }
+
+    @Test
+    public void validateWrongIndex() {
+        Elasticsearch7DynamicSinkFactory sinkFactory = new Elasticsearch7DynamicSinkFactory();
+
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage("'index' must not be empty");
+        sinkFactory.createDynamicTableSink(
+                context()
+                        .withOption("index", "")
+                        .withOption("hosts", "http://localhost:12345")
+                        .build());
+    }
+
+    @Test
+    public void validateWrongHosts() {
+        Elasticsearch7DynamicSinkFactory sinkFactory = new Elasticsearch7DynamicSinkFactory();
+
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage(
+                "Could not parse host 'wrong-host' in option 'hosts'. It should follow the format 'http://host_name:port'.");
+        sinkFactory.createDynamicTableSink(
+                context().withOption("index", "MyIndex").withOption("hosts", "wrong-host").build());
+    }
+
+    @Test
+    public void validateWrongFlushSize() {
+        Elasticsearch7DynamicSinkFactory sinkFactory = new Elasticsearch7DynamicSinkFactory();
+
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage(
+                "'sink.bulk-flush.max-size' must be in MB granularity. Got: 1024 bytes");
+        sinkFactory.createDynamicTableSink(
+                context()
+                        .withOption(ElasticsearchConnectorOptions.INDEX_OPTION.key(), "MyIndex")
+                        .withOption(
+                                ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                                "http://localhost:1234")
+                        .withOption(
+                                ElasticsearchConnectorOptions.BULK_FLASH_MAX_SIZE_OPTION.key(),
+                                "1kb")
+                        .build());
+    }
+
+    @Test
+    public void validateWrongRetries() {
+        Elasticsearch7DynamicSinkFactory sinkFactory = new Elasticsearch7DynamicSinkFactory();
+
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage("'sink.bulk-flush.backoff.max-retries' must be at least 1. Got: 0");
+        sinkFactory.createDynamicTableSink(
+                context()
+                        .withOption(ElasticsearchConnectorOptions.INDEX_OPTION.key(), "MyIndex")
+                        .withOption(
+                                ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                                "http://localhost:1234")
+                        .withOption(
+                                ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION
+                                        .key(),
+                                "0")
+                        .build());
+    }
+
+    @Test
+    public void validateWrongMaxActions() {
+        Elasticsearch7DynamicSinkFactory sinkFactory = new Elasticsearch7DynamicSinkFactory();
+
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage("'sink.bulk-flush.max-actions' must be at least 1. Got: -2");
+        sinkFactory.createDynamicTableSink(
+                context()
+                        .withOption(ElasticsearchConnectorOptions.INDEX_OPTION.key(), "MyIndex")
+                        .withOption(
+                                ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                                "http://localhost:1234")
+                        .withOption(
+                                ElasticsearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION.key(),
+                                "-2")
+                        .build());
+    }
+
+    @Test
+    public void validateWrongBackoffDelay() {
+        Elasticsearch7DynamicSinkFactory sinkFactory = new Elasticsearch7DynamicSinkFactory();
+
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage("Invalid value for option 'sink.bulk-flush.backoff.delay'.");
+        sinkFactory.createDynamicTableSink(
+                context()
+                        .withOption(ElasticsearchConnectorOptions.INDEX_OPTION.key(), "MyIndex")
+                        .withOption(
+                                ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                                "http://localhost:1234")
+                        .withOption(
+                                ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION.key(),
+                                "-1s")
+                        .build());
+    }
+
+    @Test
+    public void validatePrimaryKeyOnIllegalColumn() {
+        Elasticsearch7DynamicSinkFactory sinkFactory = new Elasticsearch7DynamicSinkFactory();
+
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage(
+                "The table has a primary key on columns of illegal types: "
+                        + "[ARRAY, MAP, MULTISET, ROW, RAW, VARBINARY].\n"
+                        + " Elasticsearch sink does not support primary keys on columns of types: "
+                        + "[ARRAY, MAP, MULTISET, STRUCTURED_TYPE, ROW, RAW, BINARY, VARBINARY].");
+        sinkFactory.createDynamicTableSink(
+                context()
+                        .withSchema(
+                                new ResolvedSchema(
+                                        Arrays.asList(
+                                                Column.physical("a", DataTypes.BIGINT().notNull()),
+                                                Column.physical(
+                                                        "b",
+                                                        DataTypes.ARRAY(
+                                                                        DataTypes.BIGINT()
+                                                                                .notNull())
+                                                                .notNull()),
+                                                Column.physical(
+                                                        "c",
+                                                        DataTypes.MAP(
+                                                                        DataTypes.BIGINT(),
+                                                                        DataTypes.STRING())
+                                                                .notNull()),
+                                                Column.physical(
+                                                        "d",
+                                                        DataTypes.MULTISET(
+                                                                        DataTypes.BIGINT()
+                                                                                .notNull())
+                                                                .notNull()),
+                                                Column.physical(
+                                                        "e",
+                                                        DataTypes.ROW(
+                                                                        DataTypes.FIELD(
+                                                                                "a",
+                                                                                DataTypes.BIGINT()))
+                                                                .notNull()),
+                                                Column.physical(
+                                                        "f",
+                                                        DataTypes.RAW(
+                                                                        Void.class,
+                                                                        VoidSerializer.INSTANCE)
+                                                                .notNull()),
+                                                Column.physical("g", DataTypes.BYTES().notNull())),
+                                        Collections.emptyList(),
+                                        UniqueConstraint.primaryKey(
+                                                "name",
+                                                Arrays.asList("a", "b", "c", "d", "e", "f", "g"))))
+                        .withOption(ElasticsearchConnectorOptions.INDEX_OPTION.key(), "MyIndex")
+                        .withOption(
+                                ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                                "http://localhost:1234")
+                        .withOption(
+                                ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION.key(),
+                                "1s")
+                        .build());
+    }
+
+    @Test
+    public void validateWrongCredential() {
+        Elasticsearch7DynamicSinkFactory sinkFactory = new Elasticsearch7DynamicSinkFactory();
+
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage(
+                "'username' and 'password' must be set at the same time. Got: username 'username' and password ''");
+        sinkFactory.createDynamicTableSink(
+                context()
+                        .withOption(ElasticsearchConnectorOptions.INDEX_OPTION.key(), "MyIndex")
+                        .withOption(
+                                ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                                "http://localhost:1234")
+                        .withOption(ElasticsearchConnectorOptions.USERNAME_OPTION.key(), "username")
+                        .withOption(ElasticsearchConnectorOptions.PASSWORD_OPTION.key(), "")
+                        .build());
+    }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
new file mode 100644
index 0000000..fdf7a3a
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.TestLogger;
+
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.TestContext.context;
+import static org.apache.flink.table.api.Expressions.row;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/** IT tests for {@link Elasticsearch7DynamicSink}. */
+public class Elasticsearch7DynamicSinkITCase extends TestLogger {
+
+    @ClassRule
+    public static ElasticsearchContainer elasticsearchContainer =
+            new ElasticsearchContainer(DockerImageName.parse(DockerImageVersions.ELASTICSEARCH_7));
+
+    @SuppressWarnings("deprecation")
+    protected final Client getClient() {
+        TransportAddress transportAddress =
+                new TransportAddress(elasticsearchContainer.getTcpHost());
+        String expectedClusterName = "docker-cluster";
+        Settings settings = Settings.builder().put("cluster.name", expectedClusterName).build();
+        return new PreBuiltTransportClient(settings).addTransportAddress(transportAddress);
+    }
+
+    @Test
+    public void testWritingDocuments() throws Exception {
+        ResolvedSchema schema =
+                new ResolvedSchema(
+                        Arrays.asList(
+                                Column.physical("a", DataTypes.BIGINT().notNull()),
+                                Column.physical("b", DataTypes.TIME()),
+                                Column.physical("c", DataTypes.STRING().notNull()),
+                                Column.physical("d", DataTypes.FLOAT()),
+                                Column.physical("e", DataTypes.TINYINT().notNull()),
+                                Column.physical("f", DataTypes.DATE()),
+                                Column.physical("g", DataTypes.TIMESTAMP().notNull())),
+                        Collections.emptyList(),
+                        UniqueConstraint.primaryKey("name", Arrays.asList("a", "g")));
+
+        GenericRowData rowData =
+                GenericRowData.of(
+                        1L,
+                        12345,
+                        StringData.fromString("ABCDE"),
+                        12.12f,
+                        (byte) 2,
+                        12345,
+                        TimestampData.fromLocalDateTime(
+                                LocalDateTime.parse("2012-12-12T12:12:12")));
+
+        String index = "writing-documents";
+        Elasticsearch7DynamicSinkFactory sinkFactory = new Elasticsearch7DynamicSinkFactory();
+
+        SinkFunctionProvider sinkRuntimeProvider =
+                (SinkFunctionProvider)
+                        sinkFactory
+                                .createDynamicTableSink(
+                                        context()
+                                                .withSchema(schema)
+                                                .withOption(
+                                                        ElasticsearchConnectorOptions.INDEX_OPTION
+                                                                .key(),
+                                                        index)
+                                                .withOption(
+                                                        ElasticsearchConnectorOptions.HOSTS_OPTION
+                                                                .key(),
+                                                        elasticsearchContainer.getHttpHostAddress())
+                                                .withOption(
+                                                        ElasticsearchConnectorOptions
+                                                                .FLUSH_ON_CHECKPOINT_OPTION
+                                                                .key(),
+                                                        "false")
+                                                .build())
+                                .getSinkRuntimeProvider(new MockContext());
+
+        SinkFunction<RowData> sinkFunction = sinkRuntimeProvider.createSinkFunction();
+        StreamExecutionEnvironment environment =
+                StreamExecutionEnvironment.getExecutionEnvironment();
+        environment.setParallelism(4);
+
+        rowData.setRowKind(RowKind.UPDATE_AFTER);
+        environment.<RowData>fromElements(rowData).addSink(sinkFunction);
+        environment.execute();
+
+        Client client = getClient();
+        Map<String, Object> response =
+                client.get(new GetRequest(index, "1_2012-12-12T12:12:12")).actionGet().getSource();
+        Map<Object, Object> expectedMap = new HashMap<>();
+        expectedMap.put("a", 1);
+        expectedMap.put("b", "00:00:12");
+        expectedMap.put("c", "ABCDE");
+        expectedMap.put("d", 12.12d);
+        expectedMap.put("e", 2);
+        expectedMap.put("f", "2003-10-20");
+        expectedMap.put("g", "2012-12-12 12:12:12");
+        assertThat(response, equalTo(expectedMap));
+    }
+
+    @Test
+    public void testWritingDocumentsFromTableApi() throws Exception {
+        TableEnvironment tableEnvironment =
+                TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+
+        String index = "table-api";
+        tableEnvironment.executeSql(
+                "CREATE TABLE esTable ("
+                        + "a BIGINT NOT NULL,\n"
+                        + "b TIME,\n"
+                        + "c STRING NOT NULL,\n"
+                        + "d FLOAT,\n"
+                        + "e TINYINT NOT NULL,\n"
+                        + "f DATE,\n"
+                        + "g TIMESTAMP NOT NULL,"
+                        + "h as a + 2,\n"
+                        + "PRIMARY KEY (a, g) NOT ENFORCED\n"
+                        + ")\n"
+                        + "WITH (\n"
+                        + String.format("'%s'='%s',\n", "connector", "elasticsearch-7")
+                        + String.format(
+                                "'%s'='%s',\n",
+                                ElasticsearchConnectorOptions.INDEX_OPTION.key(), index)
+                        + String.format(
+                                "'%s'='%s',\n",
+                                ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                                elasticsearchContainer.getHttpHostAddress())
+                        + String.format(
+                                "'%s'='%s'\n",
+                                ElasticsearchConnectorOptions.FLUSH_ON_CHECKPOINT_OPTION.key(),
+                                "false")
+                        + ")");
+
+        tableEnvironment
+                .fromValues(
+                        row(
+                                1L,
+                                LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+                                "ABCDE",
+                                12.12f,
+                                (byte) 2,
+                                LocalDate.ofEpochDay(12345),
+                                LocalDateTime.parse("2012-12-12T12:12:12")))
+                .executeInsert("esTable")
+                .await();
+
+        Client client = getClient();
+        Map<String, Object> response =
+                client.get(new GetRequest(index, "1_2012-12-12T12:12:12")).actionGet().getSource();
+        Map<Object, Object> expectedMap = new HashMap<>();
+        expectedMap.put("a", 1);
+        expectedMap.put("b", "00:00:12");
+        expectedMap.put("c", "ABCDE");
+        expectedMap.put("d", 12.12d);
+        expectedMap.put("e", 2);
+        expectedMap.put("f", "2003-10-20");
+        expectedMap.put("g", "2012-12-12 12:12:12");
+        assertThat(response, equalTo(expectedMap));
+    }
+
+    @Test
+    public void testWritingDocumentsNoPrimaryKey() throws Exception {
+        TableEnvironment tableEnvironment =
+                TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+
+        String index = "no-primary-key";
+        tableEnvironment.executeSql(
+                "CREATE TABLE esTable ("
+                        + "a BIGINT NOT NULL,\n"
+                        + "b TIME,\n"
+                        + "c STRING NOT NULL,\n"
+                        + "d FLOAT,\n"
+                        + "e TINYINT NOT NULL,\n"
+                        + "f DATE,\n"
+                        + "g TIMESTAMP NOT NULL\n"
+                        + ")\n"
+                        + "WITH (\n"
+                        + String.format("'%s'='%s',\n", "connector", "elasticsearch-7")
+                        + String.format(
+                                "'%s'='%s',\n",
+                                ElasticsearchConnectorOptions.INDEX_OPTION.key(), index)
+                        + String.format(
+                                "'%s'='%s',\n",
+                                ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                                elasticsearchContainer.getHttpHostAddress())
+                        + String.format(
+                                "'%s'='%s'\n",
+                                ElasticsearchConnectorOptions.FLUSH_ON_CHECKPOINT_OPTION.key(),
+                                "false")
+                        + ")");
+
+        tableEnvironment
+                .fromValues(
+                        row(
+                                1L,
+                                LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+                                "ABCDE",
+                                12.12f,
+                                (byte) 2,
+                                LocalDate.ofEpochDay(12345),
+                                LocalDateTime.parse("2012-12-12T12:12:12")),
+                        row(
+                                2L,
+                                LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+                                "FGHIJK",
+                                13.13f,
+                                (byte) 4,
+                                LocalDate.ofEpochDay(12345),
+                                LocalDateTime.parse("2013-12-12T13:13:13")))
+                .executeInsert("esTable")
+                .await();
+
+        Client client = getClient();
+
+        // search API does not return documents that were not indexed, we might need to query
+        // the index a few times
+        Deadline deadline = Deadline.fromNow(Duration.ofSeconds(30));
+        SearchHits hits;
+        do {
+            hits = client.prepareSearch(index).execute().actionGet().getHits();
+            if (hits.getTotalHits().value < 2) {
+                Thread.sleep(200);
+            }
+        } while (hits.getTotalHits().value < 2 && deadline.hasTimeLeft());
+
+        if (hits.getTotalHits().value < 2) {
+            throw new AssertionError("Could not retrieve results from Elasticsearch.");
+        }
+
+        HashSet<Map<String, Object>> resultSet = new HashSet<>();
+        resultSet.add(hits.getAt(0).getSourceAsMap());
+        resultSet.add(hits.getAt(1).getSourceAsMap());
+        Map<Object, Object> expectedMap1 = new HashMap<>();
+        expectedMap1.put("a", 1);
+        expectedMap1.put("b", "00:00:12");
+        expectedMap1.put("c", "ABCDE");
+        expectedMap1.put("d", 12.12d);
+        expectedMap1.put("e", 2);
+        expectedMap1.put("f", "2003-10-20");
+        expectedMap1.put("g", "2012-12-12 12:12:12");
+        Map<Object, Object> expectedMap2 = new HashMap<>();
+        expectedMap2.put("a", 2);
+        expectedMap2.put("b", "00:00:12");
+        expectedMap2.put("c", "FGHIJK");
+        expectedMap2.put("d", 13.13d);
+        expectedMap2.put("e", 4);
+        expectedMap2.put("f", "2003-10-20");
+        expectedMap2.put("g", "2013-12-12 13:13:13");
+        HashSet<Map<Object, Object>> expectedSet = new HashSet<>();
+        expectedSet.add(expectedMap1);
+        expectedSet.add(expectedMap2);
+        assertThat(resultSet, equalTo(expectedSet));
+    }
+
+    @Test
+    public void testWritingDocumentsWithDynamicIndex() throws Exception {
+        TableEnvironment tableEnvironment =
+                TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+
+        String index = "dynamic-index-{b|yyyy-MM-dd}";
+        tableEnvironment.executeSql(
+                "CREATE TABLE esTable ("
+                        + "a BIGINT NOT NULL,\n"
+                        + "b TIMESTAMP NOT NULL,\n"
+                        + "PRIMARY KEY (a) NOT ENFORCED\n"
+                        + ")\n"
+                        + "WITH (\n"
+                        + String.format("'%s'='%s',\n", "connector", "elasticsearch-7")
+                        + String.format(
+                                "'%s'='%s',\n",
+                                ElasticsearchConnectorOptions.INDEX_OPTION.key(), index)
+                        + String.format(
+                                "'%s'='%s',\n",
+                                ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                                elasticsearchContainer.getHttpHostAddress())
+                        + String.format(
+                                "'%s'='%s'\n",
+                                ElasticsearchConnectorOptions.FLUSH_ON_CHECKPOINT_OPTION.key(),
+                                "false")
+                        + ")");
+
+        tableEnvironment
+                .fromValues(row(1L, LocalDateTime.parse("2012-12-12T12:12:12")))
+                .executeInsert("esTable")
+                .await();
+
+        Client client = getClient();
+        Map<String, Object> response =
+                client.get(new GetRequest("dynamic-index-2012-12-12", "1")).actionGet().getSource();
+        Map<Object, Object> expectedMap = new HashMap<>();
+        expectedMap.put("a", 1);
+        expectedMap.put("b", "2012-12-12 12:12:12");
+        assertThat(response, equalTo(expectedMap));
+    }
+
+    private static class MockContext implements DynamicTableSink.Context {
+        @Override
+        public boolean isBounded() {
+            return false;
+        }
+
+        @Override
+        public TypeInformation<?> createTypeInformation(DataType consumedDataType) {
+            return null;
+        }
+
+        @Override
+        public TypeInformation<?> createTypeInformation(LogicalType consumedLogicalType) {
+            return null;
+        }
+
+        @Override
+        public DynamicTableSink.DataStructureConverter createDataStructureConverter(
+                DataType consumedDataType) {
+            return null;
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java
new file mode 100644
index 0000000..2928c8d
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.time.ZoneId;
+import java.util.List;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+/** Tests for {@link Elasticsearch7DynamicSink} parameters. */
+public class Elasticsearch7DynamicSinkTest extends TestLogger {
+
+    private static final String FIELD_KEY = "key";
+    private static final String FIELD_FRUIT_NAME = "fruit_name";
+    private static final String FIELD_COUNT = "count";
+    private static final String FIELD_TS = "ts";
+
+    private static final String HOSTNAME = "host1";
+    private static final int PORT = 1234;
+    private static final String SCHEMA = "https";
+    private static final String INDEX = "MyIndex";
+    private static final String DOC_TYPE = "MyType";
+    private static final String USERNAME = "username";
+    private static final String PASSWORD = "password";
+
+    @Test
+    public void testBuilder() {
+        final TableSchema schema = createTestSchema();
+
+        BuilderProvider provider = new BuilderProvider();
+        final Elasticsearch7DynamicSink testSink =
+                new Elasticsearch7DynamicSink(
+                        new DummyEncodingFormat(),
+                        new Elasticsearch7Configuration(
+                                getConfig(), this.getClass().getClassLoader()),
+                        schema,
+                        ZoneId.systemDefault(),
+                        provider);
+
+        testSink.getSinkRuntimeProvider(new MockSinkContext()).createSinkFunction();
+
+        verify(provider.builderSpy).setFailureHandler(new DummyFailureHandler());
+        verify(provider.builderSpy).setBulkFlushBackoff(true);
+        verify(provider.builderSpy)
+                .setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
+        verify(provider.builderSpy).setBulkFlushBackoffDelay(123);
+        verify(provider.builderSpy).setBulkFlushBackoffRetries(3);
+        verify(provider.builderSpy).setBulkFlushInterval(100);
+        verify(provider.builderSpy).setBulkFlushMaxActions(1000);
+        verify(provider.builderSpy).setBulkFlushMaxSizeMb(1);
+        verify(provider.builderSpy)
+                .setRestClientFactory(
+                        new Elasticsearch7DynamicSink.DefaultRestClientFactory("/myapp"));
+        verify(provider.sinkSpy).disableFlushOnCheckpoint();
+    }
+
+    @Test
+    public void testDefaultConfig() {
+        final TableSchema schema = createTestSchema();
+        Configuration configuration = new Configuration();
+        configuration.setString(ElasticsearchConnectorOptions.INDEX_OPTION.key(), INDEX);
+        configuration.setString(ElasticsearchConnectorOptions.DOCUMENT_TYPE_OPTION.key(), DOC_TYPE);
+        configuration.setString(
+                ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                SCHEMA + "://" + HOSTNAME + ":" + PORT);
+
+        BuilderProvider provider = new BuilderProvider();
+        final Elasticsearch7DynamicSink testSink =
+                new Elasticsearch7DynamicSink(
+                        new DummyEncodingFormat(),
+                        new Elasticsearch7Configuration(
+                                configuration, this.getClass().getClassLoader()),
+                        schema,
+                        ZoneId.systemDefault(),
+                        provider);
+
+        testSink.getSinkRuntimeProvider(new MockSinkContext()).createSinkFunction();
+
+        verify(provider.builderSpy).setFailureHandler(new NoOpFailureHandler());
+        verify(provider.builderSpy).setBulkFlushBackoff(false);
+        verify(provider.builderSpy).setBulkFlushInterval(1000);
+        verify(provider.builderSpy).setBulkFlushMaxActions(1000);
+        verify(provider.builderSpy).setBulkFlushMaxSizeMb(2);
+        verify(provider.builderSpy)
+                .setRestClientFactory(new Elasticsearch7DynamicSink.DefaultRestClientFactory(null));
+        verify(provider.sinkSpy, never()).disableFlushOnCheckpoint();
+    }
+
+    @Test
+    public void testAuthConfig() {
+        final TableSchema schema = createTestSchema();
+        Configuration configuration = new Configuration();
+        configuration.setString(ElasticsearchConnectorOptions.INDEX_OPTION.key(), INDEX);
+        configuration.setString(ElasticsearchConnectorOptions.DOCUMENT_TYPE_OPTION.key(), DOC_TYPE);
+        configuration.setString(
+                ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                SCHEMA + "://" + HOSTNAME + ":" + PORT);
+        configuration.setString(ElasticsearchConnectorOptions.USERNAME_OPTION.key(), USERNAME);
+        configuration.setString(ElasticsearchConnectorOptions.PASSWORD_OPTION.key(), PASSWORD);
+
+        BuilderProvider provider = new BuilderProvider();
+        final Elasticsearch7DynamicSink testSink =
+                new Elasticsearch7DynamicSink(
+                        new DummyEncodingFormat(),
+                        new Elasticsearch7Configuration(
+                                configuration, this.getClass().getClassLoader()),
+                        schema,
+                        ZoneId.systemDefault(),
+                        provider);
+
+        testSink.getSinkRuntimeProvider(new MockSinkContext()).createSinkFunction();
+
+        verify(provider.builderSpy).setFailureHandler(new NoOpFailureHandler());
+        verify(provider.builderSpy).setBulkFlushBackoff(false);
+        verify(provider.builderSpy).setBulkFlushInterval(1000);
+        verify(provider.builderSpy).setBulkFlushMaxActions(1000);
+        verify(provider.builderSpy).setBulkFlushMaxSizeMb(2);
+        verify(provider.builderSpy)
+                .setRestClientFactory(
+                        new Elasticsearch7DynamicSink.AuthRestClientFactory(
+                                null, USERNAME, PASSWORD));
+        verify(provider.sinkSpy, never()).disableFlushOnCheckpoint();
+    }
+
+    private Configuration getConfig() {
+        Configuration configuration = new Configuration();
+        configuration.setString(ElasticsearchConnectorOptions.INDEX_OPTION.key(), INDEX);
+        configuration.setString(ElasticsearchConnectorOptions.DOCUMENT_TYPE_OPTION.key(), DOC_TYPE);
+        configuration.setString(
+                ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                SCHEMA + "://" + HOSTNAME + ":" + PORT);
+        configuration.setString(
+                ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION.key(), "exponential");
+        configuration.setString(
+                ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION.key(), "123");
+        configuration.setString(
+                ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(), "3");
+        configuration.setString(
+                ElasticsearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION.key(), "100");
+        configuration.setString(
+                ElasticsearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION.key(), "1000");
+        configuration.setString(
+                ElasticsearchConnectorOptions.BULK_FLASH_MAX_SIZE_OPTION.key(), "1mb");
+        configuration.setString(
+                ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX.key(), "/myapp");
+        configuration.setString(
+                ElasticsearchConnectorOptions.FAILURE_HANDLER_OPTION.key(),
+                DummyFailureHandler.class.getName());
+        configuration.setString(
+                ElasticsearchConnectorOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false");
+        return configuration;
+    }
+
+    private static class BuilderProvider
+            implements Elasticsearch7DynamicSink.ElasticSearchBuilderProvider {
+        public ElasticsearchSink.Builder<RowData> builderSpy;
+        public ElasticsearchSink<RowData> sinkSpy;
+
+        @Override
+        public ElasticsearchSink.Builder<RowData> createBuilder(
+                List<HttpHost> httpHosts, RowElasticsearchSinkFunction upsertSinkFunction) {
+            builderSpy =
+                    Mockito.spy(new ElasticsearchSink.Builder<>(httpHosts, upsertSinkFunction));
+            doAnswer(
+                            invocation -> {
+                                sinkSpy =
+                                        Mockito.spy(
+                                                (ElasticsearchSink<RowData>)
+                                                        invocation.callRealMethod());
+                                return sinkSpy;
+                            })
+                    .when(builderSpy)
+                    .build();
+
+            return builderSpy;
+        }
+    }
+
+    private TableSchema createTestSchema() {
+        return TableSchema.builder()
+                .field(FIELD_KEY, DataTypes.BIGINT())
+                .field(FIELD_FRUIT_NAME, DataTypes.STRING())
+                .field(FIELD_COUNT, DataTypes.DECIMAL(10, 4))
+                .field(FIELD_TS, DataTypes.TIMESTAMP(3))
+                .build();
+    }
+
+    private static class DummySerializationSchema implements SerializationSchema<RowData> {
+
+        private static final DummySerializationSchema INSTANCE = new DummySerializationSchema();
+
+        @Override
+        public byte[] serialize(RowData element) {
+            return new byte[0];
+        }
+    }
+
+    private static class DummyEncodingFormat
+            implements EncodingFormat<SerializationSchema<RowData>> {
+        @Override
+        public SerializationSchema<RowData> createRuntimeEncoder(
+                DynamicTableSink.Context context, DataType consumedDataType) {
+            return DummySerializationSchema.INSTANCE;
+        }
+
+        @Override
+        public ChangelogMode getChangelogMode() {
+            return null;
+        }
+    }
+
+    private static class MockSinkContext implements DynamicTableSink.Context {
+        @Override
+        public boolean isBounded() {
+            return false;
+        }
+
+        @Override
+        public TypeInformation<?> createTypeInformation(DataType consumedDataType) {
+            return null;
+        }
+
+        @Override
+        public TypeInformation<?> createTypeInformation(LogicalType consumedLogicalType) {
+            return null;
+        }
+
+        @Override
+        public DynamicTableSink.DataStructureConverter createDataStructureConverter(
+                DataType consumedDataType) {
+            return null;
+        }
+    }
+
+    /** Custom failure handler for testing. */
+    public static class DummyFailureHandler implements ActionRequestFailureHandler {
+
+        @Override
+        public void onFailure(
+                ActionRequest action,
+                Throwable failure,
+                int restStatusCode,
+                RequestIndexer indexer) {
+            // do nothing
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return o instanceof DummyFailureHandler;
+        }
+
+        @Override
+        public int hashCode() {
+            return DummyFailureHandler.class.hashCode();
+        }
+    }
+}