You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/07 11:00:52 UTC

[inlong] branch master updated: [INLONG-4659][Sort] Support field routing for Elasticsearch connector (#4849)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fac488d9 [INLONG-4659][Sort] Support field routing for Elasticsearch connector (#4849)
2fac488d9 is described below

commit 2fac488d9d279022a12a9b5a1737b537e616b986
Author: Oneal65 <li...@foxmail.com>
AuthorDate: Thu Jul 7 19:00:47 2022 +0800

    [INLONG-4659][Sort] Support field routing for Elasticsearch connector (#4849)
---
 .idea/vcs.xml                                      |   4 +-
 .../protocol/node/load/ElasticsearchLoadNode.java  |   8 +-
 .../sort-connectors/elasticsearch-6/pom.xml        | 109 ++++-
 .../table/Elasticsearch6Configuration.java         |  80 ++++
 .../table/Elasticsearch6DynamicSink.java           | 324 ++++++++++++++
 .../table/Elasticsearch6DynamicSinkFactory.java    | 173 ++++++++
 .../org.apache.flink.table.factories.Factory       |  16 +
 .../table/Elasticsearch6DynamicSinkITCase.java     | 483 +++++++++++++++++++++
 .../src/test/resources/log4j2-test.properties      |  28 ++
 .../sort-connectors/elasticsearch-7/pom.xml        | 108 ++++-
 .../table/Elasticsearch7Configuration.java         |  71 +++
 .../table/Elasticsearch7DynamicSink.java           | 325 ++++++++++++++
 .../table/Elasticsearch7DynamicSinkFactory.java    | 173 ++++++++
 .../org.apache.flink.table.factories.Factory       |  16 +
 .../table/Elasticsearch7DynamicSinkITCase.java     | 451 +++++++++++++++++++
 .../src/test/resources/log4j2-test.properties      |  28 ++
 .../sort-connectors/elasticsearch-base/pom.xml     | 175 ++++++++
 .../table/AbstractTimeIndexGenerator.java          |  38 ++
 .../table/ElasticsearchConfiguration.java          | 170 ++++++++
 .../elasticsearch/table/ElasticsearchOptions.java  | 162 +++++++
 .../table/ElasticsearchValidationUtils.java        |  99 +++++
 .../sort/elasticsearch/table/IndexGenerator.java   |  39 ++
 .../elasticsearch/table/IndexGeneratorBase.java    |  49 +++
 .../elasticsearch/table/IndexGeneratorFactory.java | 275 ++++++++++++
 .../sort/elasticsearch/table/KeyExtractor.java     | 128 ++++++
 .../sort/elasticsearch/table/RequestFactory.java   |  52 +++
 .../sort/elasticsearch/table/RoutingExtractor.java |  65 +++
 .../table/RowElasticsearchSinkFunction.java        | 153 +++++++
 .../elasticsearch/table/StaticIndexGenerator.java  |  34 ++
 .../sort/elasticsearch/table/TestContext.java      |  71 +++
 .../src/test/resources/log4j2-test.properties      |  28 ++
 inlong-sort/sort-connectors/pom.xml                |   1 +
 .../sort/parser/ElasticsearchSqlParseTest.java     |   4 +-
 licenses/inlong-sort-connectors/LICENSE            |  23 +
 pom.xml                                            |  11 +-
 35 files changed, 3948 insertions(+), 26 deletions(-)

diff --git a/.idea/vcs.xml b/.idea/vcs.xml
index e8d2bb62c..cf5cd1de8 100644
--- a/.idea/vcs.xml
+++ b/.idea/vcs.xml
@@ -19,7 +19,7 @@
   <component name="VcsDirectoryMappings">
     <mapping directory="$PROJECT_DIR$" vcs="Git" />
   </component>
-   <component name="IssueNavigationConfiguration">
+  <component name="IssueNavigationConfiguration">
     <option name="links">
       <list>
         <IssueNavigationLink>
@@ -33,4 +33,4 @@
       </list>
     </option>
   </component>
-</project>
+</project>
\ No newline at end of file
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ElasticsearchLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ElasticsearchLoadNode.java
index c3d400b73..8cfc4ccaf 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ElasticsearchLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ElasticsearchLoadNode.java
@@ -97,18 +97,22 @@ public class ElasticsearchLoadNode extends LoadNode implements Serializable {
         this.version = version;
     }
 
+    /**
+     * if you want to set field routing, set the routing.field-name
+     */
     @Override
     public Map<String, String> tableOptions() {
         Map<String, String> options = super.tableOptions();
-        options.put("connector", "elasticsearch-7");
+        options.put("connector", "elasticsearch-7-inlong");
         if (version == 6) {
-            options.put("connector", "elasticsearch-6");
+            options.put("connector", "elasticsearch-6-inlong");
             options.put("document-type", documentType);
         }
         options.put("hosts", hosts);
         options.put("index", index);
         options.put("password", password);
         options.put("username", username);
+        options.put("routing.field-name", primaryKey);
         return options;
     }
 
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/pom.xml b/inlong-sort/sort-connectors/elasticsearch-6/pom.xml
index c6039ed70..1360dca65 100644
--- a/inlong-sort/sort-connectors/elasticsearch-6/pom.xml
+++ b/inlong-sort/sort-connectors/elasticsearch-6/pom.xml
@@ -31,32 +31,125 @@
     <name>Apache InLong - Sort-connector-elasticsearch6</name>
     <packaging>jar</packaging>
 
-
-    <properties>
-        <elasticsearch.version>6.8.17</elasticsearch.version>
-    </properties>
-
     <dependencies>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-connector-elasticsearch6_2.12</artifactId>
             <version>${flink.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-elasticsearch-base</artifactId>
+            <version>1.3.0-SNAPSHOT</version>
+            <exclusions>
+                <!-- Elasticsearch Java Client has been moved to a different module in 5.x -->
+                <exclusion>
+                    <groupId>org.elasticsearch</groupId>
+                    <artifactId>elasticsearch</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
         <dependency>
             <groupId>org.elasticsearch.client</groupId>
             <artifactId>elasticsearch-rest-high-level-client</artifactId>
-            <version>${elasticsearch.version}</version>
+            <version>${elasticsearch6.version}</version>
         </dependency>
         <dependency>
             <groupId>org.elasticsearch.client</groupId>
             <artifactId>elasticsearch-rest-client</artifactId>
-            <version>${elasticsearch.version}</version>
+            <version>${elasticsearch6.version}</version>
         </dependency>
         <dependency>
             <groupId>org.elasticsearch</groupId>
             <artifactId>elasticsearch</artifactId>
-            <version>${elasticsearch.version}</version>
+            <version>${elasticsearch6.version}</version>
+        </dependency>
+
+        <!-- test dependencies -->
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <version>1.15.1</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils_2.12</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.12</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-elasticsearch-base</artifactId>
+            <version>1.3.0-SNAPSHOT</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.elasticsearch</groupId>
+                    <artifactId>elasticsearch</artifactId>
+                </exclusion>
+            </exclusions>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <!--
+                Including elasticsearch transport dependency for tests. Netty3 is not here anymore in 6.x
+        -->
+        <dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>transport</artifactId>
+            <version>${elasticsearch6.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.elasticsearch.plugin</groupId>
+            <artifactId>transport-netty4-client</artifactId>
+            <version>${elasticsearch6.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- Elasticsearch table descriptor testing -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner_2.12</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Table API integration tests -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner-blink_2.12</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Elasticsearch table sink factory testing -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-json</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
 
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6Configuration.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6Configuration.java
new file mode 100644
index 000000000..3d8212b16
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6Configuration.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch6.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.inlong.sort.elasticsearch.table.ElasticsearchConfiguration;
+
+import org.apache.http.HttpHost;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+
+/** Elasticsearch 6 specific configuration. */
+@Internal
+final class Elasticsearch6Configuration extends ElasticsearchConfiguration {
+    Elasticsearch6Configuration(ReadableConfig config, ClassLoader classLoader) {
+        super(config, classLoader);
+    }
+
+    public List<HttpHost> getHosts() {
+        return config.get(HOSTS_OPTION).stream()
+                .map(Elasticsearch6Configuration::validateAndParseHostsString)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Parse Hosts String to list.
+     *
+     * <p>Hosts String format was given as following:
+     *
+     * <pre>
+     *     connector.hosts = http://host_name:9092;http://host_name:9093
+     * </pre>
+     */
+    private static HttpHost validateAndParseHostsString(String host) {
+        try {
+            HttpHost httpHost = HttpHost.create(host);
+            if (httpHost.getPort() < 0) {
+                throw new ValidationException(
+                        String.format(
+                                "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing port.",
+                                host, HOSTS_OPTION.key()));
+            }
+
+            if (httpHost.getSchemeName() == null) {
+                throw new ValidationException(
+                        String.format(
+                                "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing scheme.",
+                                host, HOSTS_OPTION.key()));
+            }
+            return httpHost;
+        } catch (Exception e) {
+            throw new ValidationException(
+                    String.format(
+                            "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'.",
+                            host, HOSTS_OPTION.key()),
+                    e);
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
new file mode 100644
index 000000000..af2ec05d8
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch6.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.inlong.sort.elasticsearch.table.RequestFactory;
+import org.apache.inlong.sort.elasticsearch.table.IndexGeneratorFactory;
+import org.apache.inlong.sort.elasticsearch.table.KeyExtractor;
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.inlong.sort.elasticsearch.table.RoutingExtractor;
+import org.apache.inlong.sort.elasticsearch.table.RowElasticsearchSinkFunction;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * A {@link DynamicTableSink} that describes how to create a {@link ElasticsearchSink} from a
+ * logical description.
+ */
+@PublicEvolving
+final class Elasticsearch6DynamicSink implements DynamicTableSink {
+    @VisibleForTesting
+    static final Elasticsearch6RequestFactory REQUEST_FACTORY = new Elasticsearch6RequestFactory();
+
+    private final EncodingFormat<SerializationSchema<RowData>> format;
+    private final TableSchema schema;
+    private final Elasticsearch6Configuration config;
+
+    public Elasticsearch6DynamicSink(
+            EncodingFormat<SerializationSchema<RowData>> format,
+            Elasticsearch6Configuration config,
+            TableSchema schema) {
+        this(format, config, schema, (ElasticsearchSink.Builder::new));
+    }
+
+    // --------------------------------------------------------------
+    // Hack to make configuration testing possible.
+    //
+    // The code in this block should never be used outside of tests.
+    // Having a way to inject a builder we can assert the builder in
+    // the test. We can not assert everything though, e.g. it is not
+    // possible to assert flushing on checkpoint, as it is configured
+    // on the sink itself.
+    // --------------------------------------------------------------
+
+    private final ElasticSearchBuilderProvider builderProvider;
+
+    @FunctionalInterface
+    interface ElasticSearchBuilderProvider {
+        ElasticsearchSink.Builder<RowData> createBuilder(
+                List<HttpHost> httpHosts, RowElasticsearchSinkFunction upsertSinkFunction);
+    }
+
+    Elasticsearch6DynamicSink(
+            EncodingFormat<SerializationSchema<RowData>> format,
+            Elasticsearch6Configuration config,
+            TableSchema schema,
+            ElasticSearchBuilderProvider builderProvider) {
+        this.format = format;
+        this.schema = schema;
+        this.config = config;
+        this.builderProvider = builderProvider;
+    }
+
+    // --------------------------------------------------------------
+    // End of hack to make configuration testing possible
+    // --------------------------------------------------------------
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+        for (RowKind kind : requestedMode.getContainedKinds()) {
+            if (kind != RowKind.UPDATE_BEFORE) {
+                builder.addContainedKind(kind);
+            }
+        }
+        return builder.build();
+    }
+
+    @Override
+    public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
+        return () -> {
+            SerializationSchema<RowData> format =
+                    this.format.createRuntimeEncoder(context, schema.toRowDataType());
+
+            final RowElasticsearchSinkFunction upsertFunction =
+                    new RowElasticsearchSinkFunction(
+                            IndexGeneratorFactory.createIndexGenerator(config.getIndex(), schema),
+                            config.getDocumentType(),
+                            format,
+                            XContentType.JSON,
+                            REQUEST_FACTORY,
+                            KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()),
+                            RoutingExtractor.createRoutingExtractor(
+                                    schema, config.getRoutingField().orElse(null)));
+
+            final ElasticsearchSink.Builder<RowData> builder =
+                    builderProvider.createBuilder(config.getHosts(), upsertFunction);
+
+            builder.setFailureHandler(config.getFailureHandler());
+            builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
+            builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20));
+            builder.setBulkFlushInterval(config.getBulkFlushInterval());
+            builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
+            config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
+            config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
+            config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
+
+            // we must overwrite the default factory which is defined with a lambda because of a bug
+            // in shading lambda serialization shading see FLINK-18006
+            if (config.getUsername().isPresent()
+                    && config.getPassword().isPresent()
+                    && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())
+                    && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get())) {
+                builder.setRestClientFactory(
+                        new AuthRestClientFactory(
+                                config.getPathPrefix().orElse(null),
+                                config.getUsername().get(),
+                                config.getPassword().get()));
+            } else {
+                builder.setRestClientFactory(
+                        new DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
+            }
+
+            final ElasticsearchSink<RowData> sink = builder.build();
+
+            if (config.isDisableFlushOnCheckpoint()) {
+                sink.disableFlushOnCheckpoint();
+            }
+
+            return sink;
+        };
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        return this;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "Elasticsearch6";
+    }
+
+    /** Serializable {@link RestClientFactory} used by the sink. */
+    @VisibleForTesting
+    static class DefaultRestClientFactory implements RestClientFactory {
+
+        private final String pathPrefix;
+
+        public DefaultRestClientFactory(@Nullable String pathPrefix) {
+            this.pathPrefix = pathPrefix;
+        }
+
+        @Override
+        public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
+            if (pathPrefix != null) {
+                restClientBuilder.setPathPrefix(pathPrefix);
+            }
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            DefaultRestClientFactory that = (DefaultRestClientFactory) o;
+            return Objects.equals(pathPrefix, that.pathPrefix);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(pathPrefix);
+        }
+    }
+
+    /** Serializable {@link RestClientFactory} used by the sink which enable authentication. */
+    @VisibleForTesting
+    static class AuthRestClientFactory implements RestClientFactory {
+
+        private final String pathPrefix;
+        private final String username;
+        private final String password;
+        private transient CredentialsProvider credentialsProvider;
+
+        public AuthRestClientFactory(
+                @Nullable String pathPrefix, String username, String password) {
+            this.pathPrefix = pathPrefix;
+            this.password = password;
+            this.username = username;
+        }
+
+        @Override
+        public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
+            if (pathPrefix != null) {
+                restClientBuilder.setPathPrefix(pathPrefix);
+            }
+            if (credentialsProvider == null) {
+                credentialsProvider = new BasicCredentialsProvider();
+                credentialsProvider.setCredentials(
+                        AuthScope.ANY, new UsernamePasswordCredentials(username, password));
+            }
+            restClientBuilder.setHttpClientConfigCallback(
+                    httpAsyncClientBuilder ->
+                            httpAsyncClientBuilder.setDefaultCredentialsProvider(
+                                    credentialsProvider));
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            AuthRestClientFactory that = (AuthRestClientFactory) o;
+            return Objects.equals(pathPrefix, that.pathPrefix)
+                    && Objects.equals(username, that.username)
+                    && Objects.equals(password, that.password);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(pathPrefix, username, password);
+        }
+    }
+
+    /**
+     * Version-specific creation of {@link org.elasticsearch.action.ActionRequest}s used by the
+     * sink.
+     */
+    private static class Elasticsearch6RequestFactory implements RequestFactory {
+        @Override
+        public UpdateRequest createUpdateRequest(
+                String index,
+                String docType,
+                String key,
+                XContentType contentType,
+                byte[] document) {
+            return new UpdateRequest(index, docType, key)
+                    .doc(document, contentType)
+                    .upsert(document, contentType);
+        }
+
+        @Override
+        public IndexRequest createIndexRequest(
+                String index,
+                String docType,
+                String key,
+                XContentType contentType,
+                byte[] document) {
+            return new IndexRequest(index, docType, key).source(document, contentType);
+        }
+
+        @Override
+        public DeleteRequest createDeleteRequest(String index, String docType, String key) {
+            return new DeleteRequest(index, docType, key);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Elasticsearch6DynamicSink that = (Elasticsearch6DynamicSink) o;
+        return Objects.equals(format, that.format)
+                && Objects.equals(schema, that.schema)
+                && Objects.equals(config, that.config)
+                && Objects.equals(builderProvider, that.builderProvider);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(format, schema, config, builderProvider);
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
new file mode 100644
index 000000000..4a49939ec
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch6.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.StringUtils;
+import org.apache.inlong.sort.elasticsearch.table.ElasticsearchValidationUtils;
+
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.CONNECTION_MAX_RETRY_TIMEOUT_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.CONNECTION_PATH_PREFIX;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.DOCUMENT_TYPE_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.FAILURE_HANDLER_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.FORMAT_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.INDEX_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.KEY_DELIMITER_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.ROUTING_FIELD_NAME;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.PASSWORD_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.USERNAME_OPTION;
+
+/** A {@link DynamicTableSinkFactory} for discovering {@link Elasticsearch6DynamicSink}. */
+@Internal
+public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory {
+    private static final Set<ConfigOption<?>> requiredOptions =
+            Stream.of(HOSTS_OPTION, INDEX_OPTION, DOCUMENT_TYPE_OPTION).collect(Collectors.toSet());
+    private static final Set<ConfigOption<?>> optionalOptions =
+            Stream.of(
+                    KEY_DELIMITER_OPTION,
+                            ROUTING_FIELD_NAME,
+                    FAILURE_HANDLER_OPTION,
+                    FLUSH_ON_CHECKPOINT_OPTION,
+                    BULK_FLASH_MAX_SIZE_OPTION,
+                    BULK_FLUSH_MAX_ACTIONS_OPTION,
+                    BULK_FLUSH_INTERVAL_OPTION,
+                    BULK_FLUSH_BACKOFF_TYPE_OPTION,
+                    BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
+                    BULK_FLUSH_BACKOFF_DELAY_OPTION,
+                    CONNECTION_MAX_RETRY_TIMEOUT_OPTION,
+                    CONNECTION_PATH_PREFIX,
+                    FORMAT_OPTION,
+                    PASSWORD_OPTION,
+                    USERNAME_OPTION)
+                    .collect(Collectors.toSet());
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        TableSchema tableSchema = context.getCatalogTable().getSchema();
+        ElasticsearchValidationUtils.validatePrimaryKey(tableSchema);
+        final FactoryUtil.TableFactoryHelper helper =
+                FactoryUtil.createTableFactoryHelper(this, context);
+
+        final EncodingFormat<SerializationSchema<RowData>> format =
+                helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION);
+
+        helper.validate();
+        Configuration configuration = new Configuration();
+        context.getCatalogTable().getOptions().forEach(configuration::setString);
+        Elasticsearch6Configuration config =
+                new Elasticsearch6Configuration(configuration, context.getClassLoader());
+
+        validate(config, configuration);
+
+        return new Elasticsearch6DynamicSink(
+                format, config, TableSchemaUtils.getPhysicalSchema(tableSchema));
+    }
+
+    private void validate(Elasticsearch6Configuration config, Configuration originalConfiguration) {
+        config.getFailureHandler(); // checks if we can instantiate the custom failure handler
+        config.getHosts(); // validate hosts
+        validate(
+                config.getIndex().length() >= 1,
+                () -> String.format("'%s' must not be empty", INDEX_OPTION.key()));
+        int maxActions = config.getBulkFlushMaxActions();
+        validate(
+                maxActions == -1 || maxActions >= 1,
+                () ->
+                        String.format(
+                                "'%s' must be at least 1. Got: %s",
+                                BULK_FLUSH_MAX_ACTIONS_OPTION.key(), maxActions));
+        long maxSize = config.getBulkFlushMaxByteSize();
+        long mb1 = 1024 * 1024;
+        validate(
+                maxSize == -1 || (maxSize >= mb1 && maxSize % mb1 == 0),
+                () ->
+                        String.format(
+                                "'%s' must be in MB granularity. Got: %s",
+                                BULK_FLASH_MAX_SIZE_OPTION.key(),
+                                originalConfiguration
+                                        .get(BULK_FLASH_MAX_SIZE_OPTION)
+                                        .toHumanReadableString()));
+        validate(
+                config.getBulkFlushBackoffRetries().map(retries -> retries >= 1).orElse(true),
+                () ->
+                        String.format(
+                                "'%s' must be at least 1. Got: %s",
+                                BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(),
+                                config.getBulkFlushBackoffRetries().get()));
+        if (config.getUsername().isPresent()
+                && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())) {
+            validate(
+                    config.getPassword().isPresent()
+                            && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get()),
+                    () ->
+                            String.format(
+                                    "'%s' and '%s' must be set at the same time. Got: username '%s' and password '%s'",
+                                    USERNAME_OPTION.key(),
+                                    PASSWORD_OPTION.key(),
+                                    config.getUsername().get(),
+                                    config.getPassword().orElse("")));
+        }
+    }
+
+    private static void validate(boolean condition, Supplier<String> message) {
+        if (!condition) {
+            throw new ValidationException(message.get());
+        }
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return "elasticsearch-6-inlong";
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return requiredOptions;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        return optionalOptions;
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/elasticsearch-6/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..944dbf0cf
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.elasticsearch6.table.Elasticsearch6DynamicSinkFactory
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/test/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkITCase.java b/inlong-sort/sort-connectors/elasticsearch-6/src/test/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkITCase.java
new file mode 100644
index 000000000..970edbcc8
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/test/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkITCase.java
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch6.table;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.apache.inlong.sort.elasticsearch.table.TestContext.context;
+import static org.apache.flink.table.api.Expressions.row;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/** IT tests for {@link Elasticsearch6DynamicSink}. */
+public class Elasticsearch6DynamicSinkITCase {
+
+    @ClassRule
+    public static ElasticsearchContainer elasticsearchContainer =
+            new ElasticsearchContainer(
+                    DockerImageName.parse("docker.elastic.co/elasticsearch/elasticsearch-oss")
+                            .withTag("6.3.1"));
+
+    @SuppressWarnings("deprecation")
+    protected final Client getClient() {
+        TransportAddress transportAddress =
+                new TransportAddress(elasticsearchContainer.getTcpHost());
+        String expectedClusterName = "docker-cluster";
+        Settings settings = Settings.builder().put("cluster.name", expectedClusterName).build();
+        return new PreBuiltTransportClient(settings).addTransportAddress(transportAddress);
+    }
+
+    @Test
+    public void testWritingDocuments() throws Exception {
+        ResolvedSchema schema =
+                new ResolvedSchema(
+                        Arrays.asList(
+                                Column.physical("a", DataTypes.BIGINT().notNull()),
+                                Column.physical("b", DataTypes.TIME()),
+                                Column.physical("c", DataTypes.STRING().notNull()),
+                                Column.physical("d", DataTypes.FLOAT()),
+                                Column.physical("e", DataTypes.TINYINT().notNull()),
+                                Column.physical("f", DataTypes.DATE()),
+                                Column.physical("g", DataTypes.TIMESTAMP().notNull())),
+                        Collections.emptyList(),
+                        UniqueConstraint.primaryKey("name", Arrays.asList("a", "g")));
+        GenericRowData rowData =
+                GenericRowData.of(
+                        1L,
+                        12345,
+                        StringData.fromString("ABCDE"),
+                        12.12f,
+                        (byte) 2,
+                        12345,
+                        TimestampData.fromLocalDateTime(
+                                LocalDateTime.parse("2012-12-12T12:12:12")));
+
+        String index = "writing-documents";
+        String myType = "MyType";
+        Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+        SinkFunctionProvider sinkRuntimeProvider =
+                (SinkFunctionProvider)
+                        sinkFactory
+                                .createDynamicTableSink(
+                                        context()
+                                                .withSchema(schema)
+                                                .withOption(
+                                                        ElasticsearchOptions.INDEX_OPTION.key(),
+                                                        index)
+                                                .withOption(
+                                                        ElasticsearchOptions.DOCUMENT_TYPE_OPTION
+                                                                .key(),
+                                                        myType)
+                                                .withOption(
+                                                        ElasticsearchOptions.HOSTS_OPTION.key(),
+                                                        elasticsearchContainer.getHttpHostAddress())
+                                                .withOption(
+                                                        ElasticsearchOptions
+                                                                .FLUSH_ON_CHECKPOINT_OPTION
+                                                                .key(),
+                                                        "false")
+                                                .build())
+                                .getSinkRuntimeProvider(new MockContext());
+
+        SinkFunction<RowData> sinkFunction = sinkRuntimeProvider.createSinkFunction();
+        StreamExecutionEnvironment environment =
+                StreamExecutionEnvironment.getExecutionEnvironment();
+        rowData.setRowKind(RowKind.UPDATE_AFTER);
+        environment.<RowData>fromElements(rowData).addSink(sinkFunction);
+        environment.execute();
+
+        Client client = getClient();
+
+        Map<Object, Object> expectedMap = new HashMap<>();
+        expectedMap.put("a", 1);
+        expectedMap.put("b", "00:00:12");
+        expectedMap.put("c", "ABCDE");
+        expectedMap.put("d", 12.12d);
+        expectedMap.put("e", 2);
+        expectedMap.put("f", "2003-10-20");
+        expectedMap.put("g", "2012-12-12 12:12:12");
+        Map<String, Object> response =
+                client.get(new GetRequest(index, myType, "1_2012-12-12T12:12:12"))
+                        .actionGet()
+                        .getSource();
+        assertThat(response, equalTo(expectedMap));
+    }
+
+    @Test
+    public void testWritingDocumentsFromTableApi() throws Exception {
+        TableEnvironment tableEnvironment =
+                TableEnvironment.create(
+                        EnvironmentSettings.newInstance()
+                                .useBlinkPlanner()
+                                .inStreamingMode()
+                                .build());
+
+        String index = "table-api";
+        String myType = "MyType";
+        tableEnvironment.executeSql(
+                "CREATE TABLE esTable ("
+                        + "a BIGINT NOT NULL,\n"
+                        + "b TIME,\n"
+                        + "c STRING NOT NULL,\n"
+                        + "d FLOAT,\n"
+                        + "e TINYINT NOT NULL,\n"
+                        + "f DATE,\n"
+                        + "g TIMESTAMP NOT NULL,\n"
+                        + "h as a + 2,\n"
+                        + "PRIMARY KEY (a, g) NOT ENFORCED\n"
+                        + ")\n"
+                        + "WITH (\n"
+                        + String.format("'%s'='%s',\n", "connector", "elasticsearch-6-inlong")
+                        + String.format(
+                                "'%s'='%s',\n", ElasticsearchOptions.INDEX_OPTION.key(), index)
+                        + String.format(
+                                "'%s'='%s',\n",
+                                ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), myType)
+                        + String.format(
+                                "'%s'='%s',\n",
+                                ElasticsearchOptions.HOSTS_OPTION.key(),
+                                elasticsearchContainer.getHttpHostAddress())
+                        + String.format(
+                                "'%s'='%s'\n",
+                                ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false")
+                        + ")");
+
+        tableEnvironment
+                .fromValues(
+                        row(
+                                1L,
+                                LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+                                "ABCDE",
+                                12.12f,
+                                (byte) 2,
+                                LocalDate.ofEpochDay(12345),
+                                LocalDateTime.parse("2012-12-12T12:12:12")))
+                .executeInsert("esTable")
+                .await();
+
+        Client client = getClient();
+        Map<Object, Object> expectedMap = new HashMap<>();
+        expectedMap.put("a", 1);
+        expectedMap.put("b", "00:00:12");
+        expectedMap.put("c", "ABCDE");
+        expectedMap.put("d", 12.12d);
+        expectedMap.put("e", 2);
+        expectedMap.put("f", "2003-10-20");
+        expectedMap.put("g", "2012-12-12 12:12:12");
+        Map<String, Object> response =
+                client.get(new GetRequest(index, myType, "1_2012-12-12T12:12:12"))
+                        .actionGet()
+                        .getSource();
+        assertThat(response, equalTo(expectedMap));
+    }
+
+    @Test
+    public void testWritingDocumentsNoPrimaryKey() throws Exception {
+        TableEnvironment tableEnvironment =
+                TableEnvironment.create(
+                        EnvironmentSettings.newInstance()
+                                .useBlinkPlanner()
+                                .inStreamingMode()
+                                .build());
+
+        String index = "no-primary-key";
+        String myType = "MyType";
+        tableEnvironment.executeSql(
+                "CREATE TABLE esTable ("
+                        + "a BIGINT NOT NULL,\n"
+                        + "b TIME,\n"
+                        + "c STRING NOT NULL,\n"
+                        + "d FLOAT,\n"
+                        + "e TINYINT NOT NULL,\n"
+                        + "f DATE,\n"
+                        + "g TIMESTAMP NOT NULL\n"
+                        + ")\n"
+                        + "WITH (\n"
+                        + String.format("'%s'='%s',\n", "connector", "elasticsearch-6-inlong")
+                        + String.format(
+                                "'%s'='%s',\n", ElasticsearchOptions.INDEX_OPTION.key(), index)
+                        + String.format(
+                                "'%s'='%s',\n",
+                                ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), myType)
+                        + String.format(
+                                "'%s'='%s',\n",
+                                ElasticsearchOptions.HOSTS_OPTION.key(),
+                                elasticsearchContainer.getHttpHostAddress())
+                        + String.format(
+                                "'%s'='%s'\n",
+                                ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false")
+                        + ")");
+
+        tableEnvironment
+                .fromValues(
+                        row(
+                                1L,
+                                LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+                                "ABCDE",
+                                12.12f,
+                                (byte) 2,
+                                LocalDate.ofEpochDay(12345),
+                                LocalDateTime.parse("2012-12-12T12:12:12")),
+                        row(
+                                2L,
+                                LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+                                "FGHIJK",
+                                13.13f,
+                                (byte) 4,
+                                LocalDate.ofEpochDay(12345),
+                                LocalDateTime.parse("2013-12-12T13:13:13")))
+                .executeInsert("esTable")
+                .await();
+
+        Client client = getClient();
+
+        // search API does not return documents that were not indexed, we might need to query
+        // the index a few times
+        Deadline deadline = Deadline.fromNow(Duration.ofSeconds(30));
+        SearchHits hits;
+        do {
+            hits = client.prepareSearch(index).execute().actionGet().getHits();
+            if (hits.getTotalHits() < 2) {
+                Thread.sleep(200);
+            }
+        } while (hits.getTotalHits() < 2 && deadline.hasTimeLeft());
+
+        if (hits.getTotalHits() < 2) {
+            throw new AssertionError("Could not retrieve results from Elasticsearch.");
+        }
+
+        HashSet<Map<String, Object>> resultSet = new HashSet<>();
+        resultSet.add(hits.getAt(0).getSourceAsMap());
+        resultSet.add(hits.getAt(1).getSourceAsMap());
+        Map<Object, Object> expectedMap1 = new HashMap<>();
+        expectedMap1.put("a", 1);
+        expectedMap1.put("b", "00:00:12");
+        expectedMap1.put("c", "ABCDE");
+        expectedMap1.put("d", 12.12d);
+        expectedMap1.put("e", 2);
+        expectedMap1.put("f", "2003-10-20");
+        expectedMap1.put("g", "2012-12-12 12:12:12");
+        Map<Object, Object> expectedMap2 = new HashMap<>();
+        expectedMap2.put("a", 2);
+        expectedMap2.put("b", "00:00:12");
+        expectedMap2.put("c", "FGHIJK");
+        expectedMap2.put("d", 13.13d);
+        expectedMap2.put("e", 4);
+        expectedMap2.put("f", "2003-10-20");
+        expectedMap2.put("g", "2013-12-12 13:13:13");
+        HashSet<Map<Object, Object>> expectedSet = new HashSet<>();
+        expectedSet.add(expectedMap1);
+        expectedSet.add(expectedMap2);
+        assertThat(resultSet, equalTo(expectedSet));
+    }
+
+    @Test
+    public void testWritingDocumentsWithDynamicIndex() throws Exception {
+        TableEnvironment tableEnvironment =
+                TableEnvironment.create(
+                        EnvironmentSettings.newInstance()
+                                .useBlinkPlanner()
+                                .inStreamingMode()
+                                .build());
+
+        String index = "dynamic-index-{b|yyyy-MM-dd}";
+        String myType = "MyType";
+        tableEnvironment.executeSql(
+                "CREATE TABLE esTable ("
+                        + "a BIGINT NOT NULL,\n"
+                        + "b TIMESTAMP NOT NULL,\n"
+                        + "PRIMARY KEY (a) NOT ENFORCED\n"
+                        + ")\n"
+                        + "WITH (\n"
+                        + String.format("'%s'='%s',\n", "connector", "elasticsearch-6-inlong")
+                        + String.format(
+                                "'%s'='%s',\n", ElasticsearchOptions.INDEX_OPTION.key(), index)
+                        + String.format(
+                                "'%s'='%s',\n",
+                                ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), myType)
+                        + String.format(
+                                "'%s'='%s',\n",
+                                ElasticsearchOptions.HOSTS_OPTION.key(),
+                                elasticsearchContainer.getHttpHostAddress())
+                        + String.format(
+                                "'%s'='%s'\n",
+                                ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false")
+                        + ")");
+
+        tableEnvironment
+                .fromValues(row(1L, LocalDateTime.parse("2012-12-12T12:12:12")))
+                .executeInsert("esTable")
+                .await();
+
+        Client client = getClient();
+        Map<String, Object> response =
+                client.get(new GetRequest("dynamic-index-2012-12-12", myType, "1"))
+                        .actionGet()
+                        .getSource();
+        Map<Object, Object> expectedMap = new HashMap<>();
+        expectedMap.put("a", 1);
+        expectedMap.put("b", "2012-12-12 12:12:12");
+        assertThat(response, equalTo(expectedMap));
+    }
+
+    /**
+     * testing for ES field routing
+     * according to <a href="https://github.com/apache/flink/pull/14493/files">MR</a>
+     * and <a href="https://github.com/apache/flink/tree/release-1.13.2-rc2">flink release-1.13.2-rc2</a>
+     * @throws Exception
+     */
+    @Test
+    public void testWritingDocumentsWithRouting() throws Exception {
+        ResolvedSchema schema =
+                new ResolvedSchema(
+                        Arrays.asList(
+                                Column.physical("a", DataTypes.BIGINT().notNull()),
+                                Column.physical("b", DataTypes.TIME()),
+                                Column.physical("c", DataTypes.STRING().notNull()),
+                                Column.physical("d", DataTypes.FLOAT()),
+                                Column.physical("e", DataTypes.TINYINT().notNull()),
+                                Column.physical("f", DataTypes.DATE()),
+                                Column.physical("g", DataTypes.TIMESTAMP().notNull())),
+                        Collections.emptyList(),
+                        UniqueConstraint.primaryKey("name", Arrays.asList("a", "g")));
+
+        GenericRowData rowData =
+                GenericRowData.of(
+                        1L,
+                        12345,
+                        StringData.fromString("ABCDE"),
+                        12.12f,
+                        (byte) 2,
+                        12345,
+                        TimestampData.fromLocalDateTime(
+                                LocalDateTime.parse("2012-12-12T12:12:12")));
+
+        String index = "writing-documents";
+        String myType = "MyType";
+        Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+        SinkFunctionProvider sinkRuntimeProvider =
+                (SinkFunctionProvider)
+                        sinkFactory
+                                .createDynamicTableSink(
+                                        context()
+                                                .withSchema(schema)
+                                                .withOption(
+                                                        ElasticsearchOptions.INDEX_OPTION.key(),
+                                                        index)
+                                                .withOption(
+                                                        ElasticsearchOptions.DOCUMENT_TYPE_OPTION
+                                                                .key(),
+                                                        myType)
+                                                .withOption(
+                                                        ElasticsearchOptions.ROUTING_FIELD_NAME
+                                                                .key(),
+                                                        "c")
+                                                .withOption(
+                                                        ElasticsearchOptions.HOSTS_OPTION.key(),
+                                                        elasticsearchContainer.getHttpHostAddress())
+                                                .withOption(
+                                                        ElasticsearchOptions
+                                                                .FLUSH_ON_CHECKPOINT_OPTION
+                                                                .key(),
+                                                        "false")
+                                                .build())
+                                .getSinkRuntimeProvider(new MockContext());
+
+        SinkFunction<RowData> sinkFunction = sinkRuntimeProvider.createSinkFunction();
+        StreamExecutionEnvironment environment =
+                StreamExecutionEnvironment.getExecutionEnvironment();
+        rowData.setRowKind(RowKind.UPDATE_AFTER);
+        environment.<RowData>fromElements(rowData).addSink(sinkFunction);
+        environment.execute();
+
+        Client client = getClient();
+        Map<Object, Object> expectedMap = new HashMap<>();
+        expectedMap.put("a", 1);
+        expectedMap.put("b", "00:00:12");
+        expectedMap.put("c", "ABCDE");
+        expectedMap.put("d", 12.12d);
+        expectedMap.put("e", 2);
+        expectedMap.put("f", "2003-10-20");
+        expectedMap.put("g", "2012-12-12 12:12:12");
+        GetResponse response =
+                client.get(new GetRequest(index, myType, "1_2012-12-12T12:12:12").routing("ABCDE"))
+                        .actionGet();
+        assertThat(response.getSource(), equalTo(expectedMap));
+    }
+
+    private static class MockContext implements DynamicTableSink.Context {
+        @Override
+        public boolean isBounded() {
+            return false;
+        }
+
+        @Override
+        public TypeInformation<?> createTypeInformation(DataType consumedDataType) {
+            return null;
+        }
+
+        @Override
+        public DynamicTableSink.DataStructureConverter createDataStructureConverter(
+                DataType consumedDataType) {
+            return null;
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/test/resources/log4j2-test.properties b/inlong-sort/sort-connectors/elasticsearch-6/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..835c2ec9a
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/pom.xml b/inlong-sort/sort-connectors/elasticsearch-7/pom.xml
index b8ef3cc9c..c807f1e97 100644
--- a/inlong-sort/sort-connectors/elasticsearch-7/pom.xml
+++ b/inlong-sort/sort-connectors/elasticsearch-7/pom.xml
@@ -31,31 +31,125 @@
     <name>Apache InLong - Sort-connector-elasticsearch7</name>
     <packaging>jar</packaging>
 
-    <properties>
-        <elasticsearch.version>7.9.2</elasticsearch.version>
-    </properties>
-
     <dependencies>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-connector-elasticsearch7_2.12</artifactId>
             <version>${flink.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-elasticsearch-base</artifactId>
+            <version>1.3.0-SNAPSHOT</version>
+            <exclusions>
+                <!-- Elasticsearch Java Client has been moved to a different module in 5.x -->
+                <exclusion>
+                    <groupId>org.elasticsearch</groupId>
+                    <artifactId>elasticsearch</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
         <dependency>
             <groupId>org.elasticsearch.client</groupId>
             <artifactId>elasticsearch-rest-high-level-client</artifactId>
-            <version>${elasticsearch.version}</version>
+            <version>${elasticsearch7.version}</version>
         </dependency>
         <dependency>
             <groupId>org.elasticsearch.client</groupId>
             <artifactId>elasticsearch-rest-client</artifactId>
-            <version>${elasticsearch.version}</version>
+            <version>${elasticsearch7.version}</version>
         </dependency>
         <dependency>
             <groupId>org.elasticsearch</groupId>
             <artifactId>elasticsearch</artifactId>
-            <version>${elasticsearch.version}</version>
+            <version>${elasticsearch7.version}</version>
+        </dependency>
+
+        <!-- test dependencies -->
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <version>1.15.1</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils_2.12</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.12</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-elasticsearch-base</artifactId>
+            <version>1.3.0-SNAPSHOT</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.elasticsearch</groupId>
+                    <artifactId>elasticsearch</artifactId>
+                </exclusion>
+            </exclusions>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <!--
+                Including elasticsearch transport dependency for tests. Netty3 is not here anymore in 7.x
+        -->
+        <dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>transport</artifactId>
+            <version>${elasticsearch7.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.elasticsearch.plugin</groupId>
+            <artifactId>transport-netty4-client</artifactId>
+            <version>${elasticsearch7.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Elasticsearch table descriptor testing -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner_2.12</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
         </dependency>
+
+        <!-- Table API integration tests -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner-blink_2.12</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Elasticsearch table sink factory testing -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-json</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>transport</artifactId>
+            <version>7.5.1</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
 
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7Configuration.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7Configuration.java
new file mode 100644
index 000000000..64b93a732
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7Configuration.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch7.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.inlong.sort.elasticsearch.table.ElasticsearchConfiguration;
+
+import org.apache.http.HttpHost;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+
+/** Elasticsearch 7 specific configuration. */
+@Internal
+final class Elasticsearch7Configuration extends ElasticsearchConfiguration {
+    Elasticsearch7Configuration(ReadableConfig config, ClassLoader classLoader) {
+        super(config, classLoader);
+    }
+
+    public List<HttpHost> getHosts() {
+        return config.get(HOSTS_OPTION).stream()
+                .map(Elasticsearch7Configuration::validateAndParseHostsString)
+                .collect(Collectors.toList());
+    }
+
+    private static HttpHost validateAndParseHostsString(String host) {
+        try {
+            HttpHost httpHost = HttpHost.create(host);
+            if (httpHost.getPort() < 0) {
+                throw new ValidationException(
+                        String.format(
+                                "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing port.",
+                                host, HOSTS_OPTION.key()));
+            }
+
+            if (httpHost.getSchemeName() == null) {
+                throw new ValidationException(
+                        String.format(
+                                "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing scheme.",
+                                host, HOSTS_OPTION.key()));
+            }
+            return httpHost;
+        } catch (Exception e) {
+            throw new ValidationException(
+                    String.format(
+                            "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'.",
+                            host, HOSTS_OPTION.key()),
+                    e);
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
new file mode 100644
index 000000000..be09fa99a
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch7.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.inlong.sort.elasticsearch.table.IndexGeneratorFactory;
+import org.apache.inlong.sort.elasticsearch.table.KeyExtractor;
+import org.apache.inlong.sort.elasticsearch.table.RequestFactory;
+import org.apache.inlong.sort.elasticsearch.table.RoutingExtractor;
+import org.apache.inlong.sort.elasticsearch.table.RowElasticsearchSinkFunction;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * A {@link DynamicTableSink} that describes how to create a {@link ElasticsearchSink} from a
+ * logical description.
+ */
+@Internal
+final class Elasticsearch7DynamicSink implements DynamicTableSink {
+    @VisibleForTesting
+    static final Elasticsearch7RequestFactory REQUEST_FACTORY =
+            new Elasticsearch7DynamicSink.Elasticsearch7RequestFactory();
+
+    private final EncodingFormat<SerializationSchema<RowData>> format;
+    private final TableSchema schema;
+    private final Elasticsearch7Configuration config;
+
+    public Elasticsearch7DynamicSink(
+            EncodingFormat<SerializationSchema<RowData>> format,
+            Elasticsearch7Configuration config,
+            TableSchema schema) {
+        this(format, config, schema, (ElasticsearchSink.Builder::new));
+    }
+
+    // --------------------------------------------------------------
+    // Hack to make configuration testing possible.
+    //
+    // The code in this block should never be used outside of tests.
+    // Having a way to inject a builder we can assert the builder in
+    // the test. We can not assert everything though, e.g. it is not
+    // possible to assert flushing on checkpoint, as it is configured
+    // on the sink itself.
+    // --------------------------------------------------------------
+
+    private final ElasticSearchBuilderProvider builderProvider;
+
+    @FunctionalInterface
+    interface ElasticSearchBuilderProvider {
+        ElasticsearchSink.Builder<RowData> createBuilder(
+                List<HttpHost> httpHosts, RowElasticsearchSinkFunction upsertSinkFunction);
+    }
+
+    Elasticsearch7DynamicSink(
+            EncodingFormat<SerializationSchema<RowData>> format,
+            Elasticsearch7Configuration config,
+            TableSchema schema,
+            ElasticSearchBuilderProvider builderProvider) {
+        this.format = format;
+        this.schema = schema;
+        this.config = config;
+        this.builderProvider = builderProvider;
+    }
+
+    // --------------------------------------------------------------
+    // End of hack to make configuration testing possible
+    // --------------------------------------------------------------
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+        for (RowKind kind : requestedMode.getContainedKinds()) {
+            if (kind != RowKind.UPDATE_BEFORE) {
+                builder.addContainedKind(kind);
+            }
+        }
+        return builder.build();
+    }
+
+    @Override
+    public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
+        return () -> {
+            SerializationSchema<RowData> format =
+                    this.format.createRuntimeEncoder(context, schema.toRowDataType());
+
+            final RowElasticsearchSinkFunction upsertFunction =
+                    new RowElasticsearchSinkFunction(
+                            IndexGeneratorFactory.createIndexGenerator(config.getIndex(), schema),
+                            null, // this is deprecated in es 7+
+                            format,
+                            XContentType.JSON,
+                            REQUEST_FACTORY,
+                            KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()),
+                            RoutingExtractor.createRoutingExtractor(
+                                    schema, config.getRoutingField().orElse(null)));
+
+            final ElasticsearchSink.Builder<RowData> builder =
+                    builderProvider.createBuilder(config.getHosts(), upsertFunction);
+
+            builder.setFailureHandler(config.getFailureHandler());
+            builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
+            builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20));
+            builder.setBulkFlushInterval(config.getBulkFlushInterval());
+            builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
+            config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
+            config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
+            config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
+
+            // we must overwrite the default factory which is defined with a lambda because of a bug
+            // in shading lambda serialization shading see FLINK-18006
+            if (config.getUsername().isPresent()
+                    && config.getPassword().isPresent()
+                    && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())
+                    && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get())) {
+                builder.setRestClientFactory(
+                        new AuthRestClientFactory(
+                                config.getPathPrefix().orElse(null),
+                                config.getUsername().get(),
+                                config.getPassword().get()));
+            } else {
+                builder.setRestClientFactory(
+                        new DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
+            }
+
+            final ElasticsearchSink<RowData> sink = builder.build();
+
+            if (config.isDisableFlushOnCheckpoint()) {
+                sink.disableFlushOnCheckpoint();
+            }
+
+            return sink;
+        };
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        return this;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "Elasticsearch7";
+    }
+
+    /** Serializable {@link RestClientFactory} used by the sink. */
+    @VisibleForTesting
+    static class DefaultRestClientFactory implements RestClientFactory {
+
+        private final String pathPrefix;
+
+        public DefaultRestClientFactory(@Nullable String pathPrefix) {
+            this.pathPrefix = pathPrefix;
+        }
+
+        @Override
+        public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
+            if (pathPrefix != null) {
+                restClientBuilder.setPathPrefix(pathPrefix);
+            }
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            DefaultRestClientFactory that = (DefaultRestClientFactory) o;
+            return Objects.equals(pathPrefix, that.pathPrefix);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(pathPrefix);
+        }
+    }
+
+    /** Serializable {@link RestClientFactory} used by the sink which enable authentication. */
+    @VisibleForTesting
+    static class AuthRestClientFactory implements RestClientFactory {
+
+        private final String pathPrefix;
+        private final String username;
+        private final String password;
+        private transient CredentialsProvider credentialsProvider;
+
+        public AuthRestClientFactory(
+                @Nullable String pathPrefix, String username, String password) {
+            this.pathPrefix = pathPrefix;
+            this.password = password;
+            this.username = username;
+        }
+
+        @Override
+        public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
+            if (pathPrefix != null) {
+                restClientBuilder.setPathPrefix(pathPrefix);
+            }
+            if (credentialsProvider == null) {
+                credentialsProvider = new BasicCredentialsProvider();
+                credentialsProvider.setCredentials(
+                        AuthScope.ANY, new UsernamePasswordCredentials(username, password));
+            }
+            restClientBuilder.setHttpClientConfigCallback(
+                    httpAsyncClientBuilder ->
+                            httpAsyncClientBuilder.setDefaultCredentialsProvider(
+                                    credentialsProvider));
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            AuthRestClientFactory that = (AuthRestClientFactory) o;
+            return Objects.equals(pathPrefix, that.pathPrefix)
+                    && Objects.equals(username, that.username)
+                    && Objects.equals(password, that.password);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(pathPrefix, password, username);
+        }
+    }
+
+    /**
+     * Version-specific creation of {@link org.elasticsearch.action.ActionRequest}s used by the
+     * sink.
+     */
+    private static class Elasticsearch7RequestFactory implements RequestFactory {
+        @Override
+        public UpdateRequest createUpdateRequest(
+                String index,
+                String docType,
+                String key,
+                XContentType contentType,
+                byte[] document) {
+            return new UpdateRequest(index, key)
+                    .doc(document, contentType)
+                    .upsert(document, contentType);
+        }
+
+        @Override
+        public IndexRequest createIndexRequest(
+                String index,
+                String docType,
+                String key,
+                XContentType contentType,
+                byte[] document) {
+            return new IndexRequest(index).id(key).source(document, contentType);
+        }
+
+        @Override
+        public DeleteRequest createDeleteRequest(String index, String docType, String key) {
+            return new DeleteRequest(index, key);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Elasticsearch7DynamicSink that = (Elasticsearch7DynamicSink) o;
+        return Objects.equals(format, that.format)
+                && Objects.equals(schema, that.schema)
+                && Objects.equals(config, that.config)
+                && Objects.equals(builderProvider, that.builderProvider);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(format, schema, config, builderProvider);
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java
new file mode 100644
index 000000000..8bbdca6af
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch7.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.StringUtils;
+import org.apache.inlong.sort.elasticsearch.table.ElasticsearchValidationUtils;
+
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.CONNECTION_MAX_RETRY_TIMEOUT_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.CONNECTION_PATH_PREFIX;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.FAILURE_HANDLER_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.FORMAT_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.INDEX_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.KEY_DELIMITER_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.ROUTING_FIELD_NAME;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.PASSWORD_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.USERNAME_OPTION;
+
+/** A {@link DynamicTableSinkFactory} for discovering {@link Elasticsearch7DynamicSink}. */
+@Internal
+public class Elasticsearch7DynamicSinkFactory implements DynamicTableSinkFactory {
+    private static final Set<ConfigOption<?>> requiredOptions =
+            Stream.of(HOSTS_OPTION, INDEX_OPTION).collect(Collectors.toSet());
+    private static final Set<ConfigOption<?>> optionalOptions =
+            Stream.of(
+                    KEY_DELIMITER_OPTION,
+                            ROUTING_FIELD_NAME,
+                    FAILURE_HANDLER_OPTION,
+                    FLUSH_ON_CHECKPOINT_OPTION,
+                    BULK_FLASH_MAX_SIZE_OPTION,
+                    BULK_FLUSH_MAX_ACTIONS_OPTION,
+                    BULK_FLUSH_INTERVAL_OPTION,
+                    BULK_FLUSH_BACKOFF_TYPE_OPTION,
+                    BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
+                    BULK_FLUSH_BACKOFF_DELAY_OPTION,
+                    CONNECTION_MAX_RETRY_TIMEOUT_OPTION,
+                    CONNECTION_PATH_PREFIX,
+                    FORMAT_OPTION,
+                    PASSWORD_OPTION,
+                    USERNAME_OPTION)
+                    .collect(Collectors.toSet());
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        TableSchema tableSchema = context.getCatalogTable().getSchema();
+        ElasticsearchValidationUtils.validatePrimaryKey(tableSchema);
+
+        final FactoryUtil.TableFactoryHelper helper =
+                FactoryUtil.createTableFactoryHelper(this, context);
+
+        final EncodingFormat<SerializationSchema<RowData>> format =
+                helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION);
+
+        helper.validate();
+        Configuration configuration = new Configuration();
+        context.getCatalogTable().getOptions().forEach(configuration::setString);
+        Elasticsearch7Configuration config =
+                new Elasticsearch7Configuration(configuration, context.getClassLoader());
+
+        validate(config, configuration);
+
+        return new Elasticsearch7DynamicSink(
+                format, config, TableSchemaUtils.getPhysicalSchema(tableSchema));
+    }
+
+    private void validate(Elasticsearch7Configuration config, Configuration originalConfiguration) {
+        config.getFailureHandler(); // checks if we can instantiate the custom failure handler
+        config.getHosts(); // validate hosts
+        validate(
+                config.getIndex().length() >= 1,
+                () -> String.format("'%s' must not be empty", INDEX_OPTION.key()));
+        int maxActions = config.getBulkFlushMaxActions();
+        validate(
+                maxActions == -1 || maxActions >= 1,
+                () ->
+                        String.format(
+                                "'%s' must be at least 1. Got: %s",
+                                BULK_FLUSH_MAX_ACTIONS_OPTION.key(), maxActions));
+        long maxSize = config.getBulkFlushMaxByteSize();
+        long mb1 = 1024 * 1024;
+        validate(
+                maxSize == -1 || (maxSize >= mb1 && maxSize % mb1 == 0),
+                () ->
+                        String.format(
+                                "'%s' must be in MB granularity. Got: %s",
+                                BULK_FLASH_MAX_SIZE_OPTION.key(),
+                                originalConfiguration
+                                        .get(BULK_FLASH_MAX_SIZE_OPTION)
+                                        .toHumanReadableString()));
+        validate(
+                config.getBulkFlushBackoffRetries().map(retries -> retries >= 1).orElse(true),
+                () ->
+                        String.format(
+                                "'%s' must be at least 1. Got: %s",
+                                BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(),
+                                config.getBulkFlushBackoffRetries().get()));
+        if (config.getUsername().isPresent()
+                && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())) {
+            validate(
+                    config.getPassword().isPresent()
+                            && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get()),
+                    () ->
+                            String.format(
+                                    "'%s' and '%s' must be set at the same time. Got: username '%s' and password '%s'",
+                                    USERNAME_OPTION.key(),
+                                    PASSWORD_OPTION.key(),
+                                    config.getUsername().get(),
+                                    config.getPassword().orElse("")));
+        }
+    }
+
+    private static void validate(boolean condition, Supplier<String> message) {
+        if (!condition) {
+            throw new ValidationException(message.get());
+        }
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return "elasticsearch-7-inlong";
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return requiredOptions;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        return optionalOptions;
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/elasticsearch-7/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..54507bd7f
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.elasticsearch7.table.Elasticsearch7DynamicSinkFactory
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/test/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkITCase.java b/inlong-sort/sort-connectors/elasticsearch-7/src/test/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkITCase.java
new file mode 100644
index 000000000..9cf07a127
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/test/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkITCase.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch7.table;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.apache.inlong.sort.elasticsearch.table.TestContext.context;
+import static org.apache.flink.table.api.Expressions.row;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/** IT tests for {@link Elasticsearch7DynamicSink}. */
+public class Elasticsearch7DynamicSinkITCase {
+
+    @ClassRule
+    public static ElasticsearchContainer elasticsearchContainer =
+            new ElasticsearchContainer(
+                    DockerImageName.parse("docker.elastic.co/elasticsearch/elasticsearch-oss")
+                            .withTag("7.5.1"));
+
+    @SuppressWarnings("deprecation")
+    protected final Client getClient() {
+        TransportAddress transportAddress =
+                new TransportAddress(elasticsearchContainer.getTcpHost());
+        String expectedClusterName = "docker-cluster";
+        Settings settings = Settings.builder().put("cluster.name", expectedClusterName).build();
+        return new PreBuiltTransportClient(settings).addTransportAddress(transportAddress);
+    }
+
+    @Test
+    public void testWritingDocuments() throws Exception {
+        ResolvedSchema schema =
+                new ResolvedSchema(
+                        Arrays.asList(
+                                Column.physical("a", DataTypes.BIGINT().notNull()),
+                                Column.physical("b", DataTypes.TIME()),
+                                Column.physical("c", DataTypes.STRING().notNull()),
+                                Column.physical("d", DataTypes.FLOAT()),
+                                Column.physical("e", DataTypes.TINYINT().notNull()),
+                                Column.physical("f", DataTypes.DATE()),
+                                Column.physical("g", DataTypes.TIMESTAMP().notNull())),
+                        Collections.emptyList(),
+                        UniqueConstraint.primaryKey("name", Arrays.asList("a", "g")));
+
+        GenericRowData rowData =
+                GenericRowData.of(
+                        1L,
+                        12345,
+                        StringData.fromString("ABCDE"),
+                        12.12f,
+                        (byte) 2,
+                        12345,
+                        TimestampData.fromLocalDateTime(
+                                LocalDateTime.parse("2012-12-12T12:12:12")));
+
+        String index = "writing-documents";
+        Elasticsearch7DynamicSinkFactory sinkFactory = new Elasticsearch7DynamicSinkFactory();
+
+        SinkFunctionProvider sinkRuntimeProvider =
+                (SinkFunctionProvider)
+                        sinkFactory
+                                .createDynamicTableSink(
+                                        context()
+                                                .withSchema(schema)
+                                                .withOption(
+                                                        ElasticsearchOptions.INDEX_OPTION.key(),
+                                                        index)
+                                                .withOption(
+                                                        ElasticsearchOptions.HOSTS_OPTION.key(),
+                                                        elasticsearchContainer.getHttpHostAddress())
+                                                .withOption(
+                                                        ElasticsearchOptions
+                                                                .FLUSH_ON_CHECKPOINT_OPTION
+                                                                .key(),
+                                                        "false")
+                                                .build())
+                                .getSinkRuntimeProvider(new MockContext());
+
+        SinkFunction<RowData> sinkFunction = sinkRuntimeProvider.createSinkFunction();
+        StreamExecutionEnvironment environment =
+                StreamExecutionEnvironment.getExecutionEnvironment();
+        rowData.setRowKind(RowKind.UPDATE_AFTER);
+        environment.<RowData>fromElements(rowData).addSink(sinkFunction);
+        environment.execute();
+
+        Client client = getClient();
+
+        Map<Object, Object> expectedMap = new HashMap<>();
+        expectedMap.put("a", 1);
+        expectedMap.put("b", "00:00:12");
+        expectedMap.put("c", "ABCDE");
+        expectedMap.put("d", 12.12d);
+        expectedMap.put("e", 2);
+        expectedMap.put("f", "2003-10-20");
+        expectedMap.put("g", "2012-12-12 12:12:12");
+        Map<String, Object> response =
+                client.get(new GetRequest(index, "1_2012-12-12T12:12:12")).actionGet().getSource();
+        assertThat(response, equalTo(expectedMap));
+    }
+
+    @Test
+    public void testWritingDocumentsFromTableApi() throws Exception {
+        TableEnvironment tableEnvironment =
+                TableEnvironment.create(
+                        EnvironmentSettings.newInstance()
+                                .useBlinkPlanner()
+                                .inStreamingMode()
+                                .build());
+
+        String index = "table-api";
+        tableEnvironment.executeSql(
+                "CREATE TABLE esTable ("
+                        + "a BIGINT NOT NULL,\n"
+                        + "b TIME,\n"
+                        + "c STRING NOT NULL,\n"
+                        + "d FLOAT,\n"
+                        + "e TINYINT NOT NULL,\n"
+                        + "f DATE,\n"
+                        + "g TIMESTAMP NOT NULL,"
+                        + "h as a + 2,\n"
+                        + "PRIMARY KEY (a, g) NOT ENFORCED\n"
+                        + ")\n"
+                        + "WITH (\n"
+                        + String.format("'%s'='%s',\n", "connector", "elasticsearch-7-inlong")
+                        + String.format(
+                                "'%s'='%s',\n", ElasticsearchOptions.INDEX_OPTION.key(), index)
+                        + String.format(
+                                "'%s'='%s',\n",
+                                ElasticsearchOptions.HOSTS_OPTION.key(),
+                                elasticsearchContainer.getHttpHostAddress())
+                        + String.format(
+                                "'%s'='%s'\n",
+                                ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false")
+                        + ")");
+
+        tableEnvironment
+                .fromValues(
+                        row(
+                                1L,
+                                LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+                                "ABCDE",
+                                12.12f,
+                                (byte) 2,
+                                LocalDate.ofEpochDay(12345),
+                                LocalDateTime.parse("2012-12-12T12:12:12")))
+                .executeInsert("esTable")
+                .await();
+
+        Client client = getClient();
+
+        Map<Object, Object> expectedMap = new HashMap<>();
+        expectedMap.put("a", 1);
+        expectedMap.put("b", "00:00:12");
+        expectedMap.put("c", "ABCDE");
+        expectedMap.put("d", 12.12d);
+        expectedMap.put("e", 2);
+        expectedMap.put("f", "2003-10-20");
+        expectedMap.put("g", "2012-12-12 12:12:12");
+        Map<String, Object> response =
+                client.get(new GetRequest(index, "1_2012-12-12T12:12:12")).actionGet().getSource();
+        assertThat(response, equalTo(expectedMap));
+    }
+
+    @Test
+    public void testWritingDocumentsNoPrimaryKey() throws Exception {
+        TableEnvironment tableEnvironment =
+                TableEnvironment.create(
+                        EnvironmentSettings.newInstance()
+                                .useBlinkPlanner()
+                                .inStreamingMode()
+                                .build());
+
+        String index = "no-primary-key";
+        tableEnvironment.executeSql(
+                "CREATE TABLE esTable ("
+                        + "a BIGINT NOT NULL,\n"
+                        + "b TIME,\n"
+                        + "c STRING NOT NULL,\n"
+                        + "d FLOAT,\n"
+                        + "e TINYINT NOT NULL,\n"
+                        + "f DATE,\n"
+                        + "g TIMESTAMP NOT NULL\n"
+                        + ")\n"
+                        + "WITH (\n"
+                        + String.format("'%s'='%s',\n", "connector", "elasticsearch-7-inlong")
+                        + String.format(
+                                "'%s'='%s',\n", ElasticsearchOptions.INDEX_OPTION.key(), index)
+                        + String.format(
+                                "'%s'='%s',\n",
+                                ElasticsearchOptions.HOSTS_OPTION.key(),
+                                elasticsearchContainer.getHttpHostAddress())
+                        + String.format(
+                                "'%s'='%s'\n",
+                                ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false")
+                        + ")");
+
+        tableEnvironment
+                .fromValues(
+                        row(
+                                1L,
+                                LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+                                "ABCDE",
+                                12.12f,
+                                (byte) 2,
+                                LocalDate.ofEpochDay(12345),
+                                LocalDateTime.parse("2012-12-12T12:12:12")),
+                        row(
+                                2L,
+                                LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+                                "FGHIJK",
+                                13.13f,
+                                (byte) 4,
+                                LocalDate.ofEpochDay(12345),
+                                LocalDateTime.parse("2013-12-12T13:13:13")))
+                .executeInsert("esTable")
+                .await();
+
+        Client client = getClient();
+
+        // search API does not return documents that were not indexed, we might need to query
+        // the index a few times
+        Deadline deadline = Deadline.fromNow(Duration.ofSeconds(30));
+        SearchHits hits;
+        do {
+            hits = client.prepareSearch(index).execute().actionGet().getHits();
+            if (hits.getTotalHits().value < 2) {
+                Thread.sleep(200);
+            }
+        } while (hits.getTotalHits().value < 2 && deadline.hasTimeLeft());
+
+        if (hits.getTotalHits().value < 2) {
+            throw new AssertionError("Could not retrieve results from Elasticsearch.");
+        }
+
+        HashSet<Map<String, Object>> resultSet = new HashSet<>();
+        resultSet.add(hits.getAt(0).getSourceAsMap());
+        resultSet.add(hits.getAt(1).getSourceAsMap());
+        Map<Object, Object> expectedMap1 = new HashMap<>();
+        expectedMap1.put("a", 1);
+        expectedMap1.put("b", "00:00:12");
+        expectedMap1.put("c", "ABCDE");
+        expectedMap1.put("d", 12.12d);
+        expectedMap1.put("e", 2);
+        expectedMap1.put("f", "2003-10-20");
+        expectedMap1.put("g", "2012-12-12 12:12:12");
+        Map<Object, Object> expectedMap2 = new HashMap<>();
+        expectedMap2.put("a", 2);
+        expectedMap2.put("b", "00:00:12");
+        expectedMap2.put("c", "FGHIJK");
+        expectedMap2.put("d", 13.13d);
+        expectedMap2.put("e", 4);
+        expectedMap2.put("f", "2003-10-20");
+        expectedMap2.put("g", "2013-12-12 13:13:13");
+        HashSet<Map<Object, Object>> expectedSet = new HashSet<>();
+        expectedSet.add(expectedMap1);
+        expectedSet.add(expectedMap2);
+        assertThat(resultSet, equalTo(expectedSet));
+    }
+
+    @Test
+    public void testWritingDocumentsWithDynamicIndex() throws Exception {
+        TableEnvironment tableEnvironment =
+                TableEnvironment.create(
+                        EnvironmentSettings.newInstance()
+                                .useBlinkPlanner()
+                                .inStreamingMode()
+                                .build());
+
+        String index = "dynamic-index-{b|yyyy-MM-dd}";
+        tableEnvironment.executeSql(
+                "CREATE TABLE esTable ("
+                        + "a BIGINT NOT NULL,\n"
+                        + "b TIMESTAMP NOT NULL,\n"
+                        + "PRIMARY KEY (a) NOT ENFORCED\n"
+                        + ")\n"
+                        + "WITH (\n"
+                        + String.format("'%s'='%s',\n", "connector", "elasticsearch-7-inlong")
+                        + String.format(
+                                "'%s'='%s',\n", ElasticsearchOptions.INDEX_OPTION.key(), index)
+                        + String.format(
+                                "'%s'='%s',\n",
+                                ElasticsearchOptions.HOSTS_OPTION.key(),
+                                elasticsearchContainer.getHttpHostAddress())
+                        + String.format(
+                                "'%s'='%s'\n",
+                                ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false")
+                        + ")");
+
+        tableEnvironment
+                .fromValues(row(1L, LocalDateTime.parse("2012-12-12T12:12:12")))
+                .executeInsert("esTable")
+                .await();
+
+        Client client = getClient();
+        Map<String, Object> response =
+                client.get(new GetRequest("dynamic-index-2012-12-12", "1")).actionGet().getSource();
+        Map<Object, Object> expectedMap = new HashMap<>();
+        expectedMap.put("a", 1);
+        expectedMap.put("b", "2012-12-12 12:12:12");
+        assertThat(response, equalTo(expectedMap));
+    }
+
+    @Test
+    public void testWritingDocumentsWithRouting() throws Exception {
+        ResolvedSchema schema =
+                new ResolvedSchema(
+                        Arrays.asList(
+                                Column.physical("a", DataTypes.BIGINT().notNull()),
+                                Column.physical("b", DataTypes.TIME()),
+                                Column.physical("c", DataTypes.STRING().notNull()),
+                                Column.physical("d", DataTypes.FLOAT()),
+                                Column.physical("e", DataTypes.TINYINT().notNull()),
+                                Column.physical("f", DataTypes.DATE()),
+                                Column.physical("g", DataTypes.TIMESTAMP().notNull())),
+                        Collections.emptyList(),
+                        UniqueConstraint.primaryKey("name", Arrays.asList("a", "g")));
+
+        GenericRowData rowData =
+                GenericRowData.of(
+                        1L,
+                        12345,
+                        StringData.fromString("ABCDE"),
+                        12.12f,
+                        (byte) 2,
+                        12345,
+                        TimestampData.fromLocalDateTime(
+                                LocalDateTime.parse("2012-12-12T12:12:12")));
+
+        String index = "writing-documents";
+        Elasticsearch7DynamicSinkFactory sinkFactory = new Elasticsearch7DynamicSinkFactory();
+
+        SinkFunctionProvider sinkRuntimeProvider =
+                (SinkFunctionProvider)
+                        sinkFactory
+                                .createDynamicTableSink(
+                                        context()
+                                                .withSchema(schema)
+                                                .withOption(
+                                                        ElasticsearchOptions.INDEX_OPTION.key(),
+                                                        index)
+                                                .withOption(
+                                                        ElasticsearchOptions.ROUTING_FIELD_NAME
+                                                                .key(),
+                                                        "c")
+                                                .withOption(
+                                                        ElasticsearchOptions.HOSTS_OPTION.key(),
+                                                        elasticsearchContainer.getHttpHostAddress())
+                                                .withOption(
+                                                        ElasticsearchOptions
+                                                                .FLUSH_ON_CHECKPOINT_OPTION
+                                                                .key(),
+                                                        "false")
+                                                .build())
+                                .getSinkRuntimeProvider(new MockContext());
+
+        SinkFunction<RowData> sinkFunction = sinkRuntimeProvider.createSinkFunction();
+        StreamExecutionEnvironment environment =
+                StreamExecutionEnvironment.getExecutionEnvironment();
+        rowData.setRowKind(RowKind.UPDATE_AFTER);
+        environment.<RowData>fromElements(rowData).addSink(sinkFunction);
+        environment.execute();
+
+        Client client = getClient();
+        Map<Object, Object> expectedMap = new HashMap<>();
+        expectedMap.put("a", 1);
+        expectedMap.put("b", "00:00:12");
+        expectedMap.put("c", "ABCDE");
+        expectedMap.put("d", 12.12d);
+        expectedMap.put("e", 2);
+        expectedMap.put("f", "2003-10-20");
+        expectedMap.put("g", "2012-12-12 12:12:12");
+        GetResponse response =
+                client.get(new GetRequest(index, "1_2012-12-12T12:12:12").routing("ABCDE"))
+                        .actionGet();
+        assertThat(response.getSource(), equalTo(expectedMap));
+    }
+
+    private static class MockContext implements DynamicTableSink.Context {
+        @Override
+        public boolean isBounded() {
+            return false;
+        }
+
+        @Override
+        public TypeInformation<?> createTypeInformation(DataType consumedDataType) {
+            return null;
+        }
+
+        @Override
+        public DynamicTableSink.DataStructureConverter createDataStructureConverter(
+                DataType consumedDataType) {
+            return null;
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/test/resources/log4j2-test.properties b/inlong-sort/sort-connectors/elasticsearch-7/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..835c2ec9a
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/pom.xml b/inlong-sort/sort-connectors/elasticsearch-base/pom.xml
new file mode 100644
index 000000000..36b6cfeb4
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/pom.xml
@@ -0,0 +1,175 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~  Licensed to the Apache Software Foundation (ASF) under one
+  ~  or more contributor license agreements.  See the NOTICE file
+  ~  distributed with this work for additional information
+  ~  regarding copyright ownership.  The ASF licenses this file
+  ~  to you under the Apache License, Version 2.0 (the
+  ~  "License"); you may not use this file except in compliance
+  ~  with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>sort-connectors</artifactId>
+        <groupId>org.apache.inlong</groupId>
+        <version>1.3.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>sort-connector-elasticsearch-base</artifactId>
+    <name>Apache InLong - Sort-connector-elasticsearch-base</name>
+    <packaging>jar</packaging>
+
+
+    <dependencies>
+
+        <!-- core dependencies -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-elasticsearch-base_2.12</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.12</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <version>${elasticsearch6.version}</version>
+            <!--
+            FLINK-7133: Excluding all org.ow2.asm from elasticsearch dependencies because
+            1. from the POV of client they are optional,
+            2. the version configured by default at the time of writing this comment (1.7.1) depends on asm 4.1
+               and when it is shaded into elasticsearch-base artifact it conflicts with newer shaded versions of asm
+               resulting in errors at the runtime when application is executed locally, e.g. from IDE.
+            -->
+            <exclusions>
+                <exclusion>
+                    <groupId>org.ow2.asm</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- Table ecosystem -->
+        <!-- Projects depending on this project won't depend on flink-table-*. -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+            <optional>true</optional>
+        </dependency>
+        <!-- A planner dependency won't be necessary once FLIP-32 has been completed. -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner_2.12</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+            <optional>true</optional>
+        </dependency>
+
+        <!-- test dependencies -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils_2.12</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime_2.12</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.12</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
+        <!-- Elasticsearch table descriptor testing -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner_2.12</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Elasticsearch table descriptor testing -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Elasticsearch table sink factory testing -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-json</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!--
+            Including Log4j2 dependencies for tests is required for the
+            embedded Elasticsearch nodes used in tests to run correctly.
+        -->
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/AbstractTimeIndexGenerator.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/AbstractTimeIndexGenerator.java
new file mode 100644
index 000000000..2f7eebd88
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/AbstractTimeIndexGenerator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import java.time.format.DateTimeFormatter;
+
+/** Abstract class for time related {@link IndexGenerator}. */
+public abstract class AbstractTimeIndexGenerator extends IndexGeneratorBase {
+
+    private final String dateTimeFormat;
+    protected transient DateTimeFormatter dateTimeFormatter;
+
+    public AbstractTimeIndexGenerator(String index, String dateTimeFormat) {
+        super(index);
+        this.dateTimeFormat = dateTimeFormat;
+    }
+
+    @Override
+    public void open() {
+        this.dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat);
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java
new file mode 100644
index 000000000..86ed88875
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.FAILURE_HANDLER_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.PASSWORD_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.USERNAME_OPTION;
+
+/** Accessor methods to elasticsearch options. */
+public class ElasticsearchConfiguration {
+    protected final ReadableConfig config;
+    private final ClassLoader classLoader;
+
+    public ElasticsearchConfiguration(ReadableConfig config, ClassLoader classLoader) {
+        this.config = config;
+        this.classLoader = classLoader;
+    }
+
+    public ActionRequestFailureHandler getFailureHandler() {
+        final ActionRequestFailureHandler failureHandler;
+        String value = config.get(FAILURE_HANDLER_OPTION);
+        switch (value.toUpperCase()) {
+            case "FAIL":
+                failureHandler = new NoOpFailureHandler();
+                break;
+            case "IGNORE":
+                failureHandler = new IgnoringFailureHandler();
+                break;
+            case "RETRY-REJECTED":
+                failureHandler = new RetryRejectedExecutionFailureHandler();
+                break;
+            default:
+                try {
+                    Class<?> failureHandlerClass = Class.forName(value, false, classLoader);
+                    failureHandler =
+                            (ActionRequestFailureHandler)
+                                    InstantiationUtil.instantiate(failureHandlerClass);
+                } catch (ClassNotFoundException e) {
+                    throw new ValidationException(
+                            "Could not instantiate the failure handler class: " + value, e);
+                }
+                break;
+        }
+        return failureHandler;
+    }
+
+    public String getDocumentType() {
+        return config.get(ElasticsearchOptions.DOCUMENT_TYPE_OPTION);
+    }
+
+    public int getBulkFlushMaxActions() {
+        int maxActions = config.get(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION);
+        // convert 0 to -1, because Elasticsearch client use -1 to disable this configuration.
+        return maxActions == 0 ? -1 : maxActions;
+    }
+
+    public long getBulkFlushMaxByteSize() {
+        long maxSize = config.get(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION).getBytes();
+        // convert 0 to -1, because Elasticsearch client use -1 to disable this configuration.
+        return maxSize == 0 ? -1 : maxSize;
+    }
+
+    public long getBulkFlushInterval() {
+        long interval = config.get(BULK_FLUSH_INTERVAL_OPTION).toMillis();
+        // convert 0 to -1, because Elasticsearch client use -1 to disable this configuration.
+        return interval == 0 ? -1 : interval;
+    }
+
+    public Optional<String> getUsername() {
+        return config.getOptional(USERNAME_OPTION);
+    }
+
+    public Optional<String> getPassword() {
+        return config.getOptional(PASSWORD_OPTION);
+    }
+
+    public boolean isBulkFlushBackoffEnabled() {
+        return config.get(BULK_FLUSH_BACKOFF_TYPE_OPTION)
+                != ElasticsearchOptions.BackOffType.DISABLED;
+    }
+
+    public Optional<ElasticsearchSinkBase.FlushBackoffType> getBulkFlushBackoffType() {
+        switch (config.get(BULK_FLUSH_BACKOFF_TYPE_OPTION)) {
+            case CONSTANT:
+                return Optional.of(ElasticsearchSinkBase.FlushBackoffType.CONSTANT);
+            case EXPONENTIAL:
+                return Optional.of(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
+            default:
+                return Optional.empty();
+        }
+    }
+
+    public Optional<Integer> getBulkFlushBackoffRetries() {
+        return config.getOptional(BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION);
+    }
+
+    public Optional<Long> getBulkFlushBackoffDelay() {
+        return config.getOptional(BULK_FLUSH_BACKOFF_DELAY_OPTION).map(Duration::toMillis);
+    }
+
+    public boolean isDisableFlushOnCheckpoint() {
+        return !config.get(ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION);
+    }
+
+    public String getIndex() {
+        return config.get(ElasticsearchOptions.INDEX_OPTION);
+    }
+
+    public String getKeyDelimiter() {
+        return config.get(ElasticsearchOptions.KEY_DELIMITER_OPTION);
+    }
+
+    public Optional<String> getRoutingField() {
+        return config.getOptional(ElasticsearchOptions.ROUTING_FIELD_NAME);
+    }
+
+    public Optional<String> getPathPrefix() {
+        return config.getOptional(ElasticsearchOptions.CONNECTION_PATH_PREFIX);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ElasticsearchConfiguration that = (ElasticsearchConfiguration) o;
+        return Objects.equals(config, that.config) && Objects.equals(classLoader, that.classLoader);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(config, classLoader);
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchOptions.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchOptions.java
new file mode 100644
index 000000000..0d74c2c3b
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchOptions.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/**
+ * Options for {@link org.apache.flink.table.factories.DynamicTableSinkFactory} for Elasticsearch.
+ */
+public class ElasticsearchOptions {
+    /**
+     * Backoff strategy. Extends {@link ElasticsearchSinkBase.FlushBackoffType} with {@code
+     * DISABLED} option.
+     */
+    public enum BackOffType {
+        DISABLED,
+        CONSTANT,
+        EXPONENTIAL
+    }
+
+    public static final ConfigOption<List<String>> HOSTS_OPTION =
+            ConfigOptions.key("hosts")
+                    .stringType()
+                    .asList()
+                    .noDefaultValue()
+                    .withDescription("Elasticsearch hosts to connect to.");
+    public static final ConfigOption<String> INDEX_OPTION =
+            ConfigOptions.key("index")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Elasticsearch index for every record.");
+    public static final ConfigOption<String> DOCUMENT_TYPE_OPTION =
+            ConfigOptions.key("document-type")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Elasticsearch document type.");
+    public static final ConfigOption<String> PASSWORD_OPTION =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Password used to connect to Elasticsearch instance.");
+    public static final ConfigOption<String> USERNAME_OPTION =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Username used to connect to Elasticsearch instance.");
+    public static final ConfigOption<String> KEY_DELIMITER_OPTION =
+            ConfigOptions.key("document-id.key-delimiter")
+                    .stringType()
+                    .defaultValue("_")
+                    .withDescription(
+                            "Delimiter for composite keys e.g., \"$\" would result in IDs \"KEY1$KEY2$KEY3\".");
+
+    public static final ConfigOption<String> ROUTING_FIELD_NAME =
+            ConfigOptions.key("routing.field-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Elasticsearch routing filed.");
+
+    public static final ConfigOption<String> FAILURE_HANDLER_OPTION =
+            ConfigOptions.key("failure-handler")
+                    .stringType()
+                    .defaultValue("fail")
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Failure handling strategy in case a request to Elasticsearch fails")
+                                    .list(
+                                            text(
+                                                    "\"fail\" (throws an exception if a request fails "
+                                                            + "and thus causes a job failure)"),
+                                            text(
+                                                    "\"ignore\" (ignores failures and drops the request)"),
+                                            text(
+                                                    "\"retry-rejected\" (re-adds requests that have failed "
+                                                            + "due to queue capacity saturation)"),
+                                            text(
+                                                    "\"class name\" for failure handling with "
+                                                            + "a ActionRequestFailureHandler subclass"))
+                                    .build());
+    public static final ConfigOption<Boolean> FLUSH_ON_CHECKPOINT_OPTION =
+            ConfigOptions.key("sink.flush-on-checkpoint")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Disables flushing on checkpoint");
+    public static final ConfigOption<Integer> BULK_FLUSH_MAX_ACTIONS_OPTION =
+            ConfigOptions.key("sink.bulk-flush.max-actions")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription("Maximum number of actions to buffer for each bulk request.");
+    public static final ConfigOption<MemorySize> BULK_FLASH_MAX_SIZE_OPTION =
+            ConfigOptions.key("sink.bulk-flush.max-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("2mb"))
+                    .withDescription("Maximum size of buffered actions per bulk request");
+    public static final ConfigOption<Duration> BULK_FLUSH_INTERVAL_OPTION =
+            ConfigOptions.key("sink.bulk-flush.interval")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(1))
+                    .withDescription("Bulk flush interval");
+    public static final ConfigOption<BackOffType> BULK_FLUSH_BACKOFF_TYPE_OPTION =
+            ConfigOptions.key("sink.bulk-flush.backoff.strategy")
+                    .enumType(BackOffType.class)
+                    .defaultValue(BackOffType.DISABLED)
+                    .withDescription("Backoff strategy");
+    public static final ConfigOption<Integer> BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION =
+            ConfigOptions.key("sink.bulk-flush.backoff.max-retries")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("Maximum number of retries.");
+    public static final ConfigOption<Duration> BULK_FLUSH_BACKOFF_DELAY_OPTION =
+            ConfigOptions.key("sink.bulk-flush.backoff.delay")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription("Delay between each backoff attempt.");
+    public static final ConfigOption<Duration> CONNECTION_MAX_RETRY_TIMEOUT_OPTION =
+            ConfigOptions.key("connection.max-retry-timeout")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription("Maximum timeout between retries.");
+    public static final ConfigOption<String> CONNECTION_PATH_PREFIX =
+            ConfigOptions.key("connection.path-prefix")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Prefix string to be added to every REST communication.");
+    public static final ConfigOption<String> FORMAT_OPTION =
+            ConfigOptions.key("format")
+                    .stringType()
+                    .defaultValue("json")
+                    .withDescription(
+                            "The format must produce a valid JSON document. "
+                                    + "Please refer to the documentation on formats for more details.");
+
+    private ElasticsearchOptions() {
+
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java
new file mode 100644
index 000000000..d4564e33a
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java
@@ -0,0 +1,99 @@
+/*
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
+
+/** Utility methods for validating Elasticsearch properties. */
+public class ElasticsearchValidationUtils {
+
+    private static final Set<LogicalTypeRoot> ILLEGAL_PRIMARY_KEY_TYPES = new LinkedHashSet<>();
+
+    static {
+        ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.ARRAY);
+        ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.MAP);
+        ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.MULTISET);
+        ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.STRUCTURED_TYPE);
+        ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.ROW);
+        ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.RAW);
+        ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.BINARY);
+        ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.VARBINARY);
+    }
+
+    /**
+     * Checks that the table does not have primary key defined on illegal types. In Elasticsearch
+     * the primary key is used to calculate the Elasticsearch document id, which is a string of up
+     * to 512 bytes. It cannot have whitespaces. As of now it is calculated by concatenating the
+     * fields. Certain types do not have a good string representation to be used in this scenario.
+     * The illegal types are mostly {@link LogicalTypeFamily#COLLECTION} types and {@link
+     * LogicalTypeRoot#RAW} type.
+     */
+    public static void validatePrimaryKey(TableSchema schema) {
+        schema.getPrimaryKey()
+                .ifPresent(
+                        key -> {
+                            List<LogicalTypeRoot> illegalTypes =
+                                    key.getColumns().stream()
+                                            .map(
+                                                    fieldName -> {
+                                                        LogicalType logicalType =
+                                                                schema.getFieldDataType(fieldName)
+                                                                        .get()
+                                                                        .getLogicalType();
+                                                        if (hasRoot(
+                                                                logicalType,
+                                                                LogicalTypeRoot.DISTINCT_TYPE)) {
+                                                            return ((DistinctType) logicalType)
+                                                                    .getSourceType()
+                                                                    .getTypeRoot();
+                                                        } else {
+                                                            return logicalType.getTypeRoot();
+                                                        }
+                                                    })
+                                            .filter(ILLEGAL_PRIMARY_KEY_TYPES::contains)
+                                            .collect(Collectors.toList());
+
+                            if (!illegalTypes.isEmpty()) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "The table has a primary key on columns of illegal types: %s.\n"
+                                                        + " Elasticsearch sink does not support "
+                                                        + "primary keys on columns of types: %s.",
+                                                illegalTypes, ILLEGAL_PRIMARY_KEY_TYPES));
+                            }
+                        });
+    }
+
+    private ElasticsearchValidationUtils() {
+
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java
new file mode 100644
index 000000000..8e175fe51
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+
+/** This interface is responsible to generate index name from given {@link Row} record. */
+public interface IndexGenerator extends Serializable {
+
+    /**
+     * Initialize the index generator, this will be called only once before {@link
+     * #generate(RowData)} is called.
+     */
+    default void open() {
+
+    }
+
+    /** Generate index name according the the given row. */
+    String generate(RowData row);
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java
new file mode 100644
index 000000000..39fbe2d65
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import java.util.Objects;
+
+/** Base class for {@link IndexGenerator}. */
+public abstract class IndexGeneratorBase implements IndexGenerator {
+
+    private static final long serialVersionUID = 1L;
+    protected final String index;
+
+    public IndexGeneratorBase(String index) {
+        this.index = index;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof IndexGeneratorBase)) {
+            return false;
+        }
+        IndexGeneratorBase that = (IndexGeneratorBase) o;
+        return index.equals(that.index);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(index);
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java
new file mode 100644
index 000000000..968fe2706
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Factory of {@link IndexGenerator}.
+ *
+ * <p>Flink supports both static index and dynamic index.
+ *
+ * <p>If you want to have a static index, this option value should be a plain string, e.g.
+ * 'myusers', all the records will be consistently written into "myusers" index.
+ *
+ * <p>If you want to have a dynamic index, you can use '{field_name}' to reference a field value in
+ * the record to dynamically generate a target index. You can also use
+ * '{field_name|date_format_string}' to convert a field value of TIMESTAMP/DATE/TIME type into the
+ * format specified by date_format_string. The date_format_string is compatible with {@link
+ * java.text.SimpleDateFormat}. For example, if the option value is 'myusers_{log_ts|yyyy-MM-dd}',
+ * then a record with log_ts field value 2020-03-27 12:25:55 will be written into
+ * "myusers_2020-03-27" index.
+ */
+public final class IndexGeneratorFactory {
+
+    private IndexGeneratorFactory() {
+
+    }
+
+    public static IndexGenerator createIndexGenerator(String index, TableSchema schema) {
+        final IndexHelper indexHelper = new IndexHelper();
+        if (indexHelper.checkIsDynamicIndex(index)) {
+            return createRuntimeIndexGenerator(
+                    index, schema.getFieldNames(), schema.getFieldDataTypes(), indexHelper);
+        } else {
+            return new StaticIndexGenerator(index);
+        }
+    }
+
+    interface DynamicFormatter extends Serializable {
+        String format(@Nonnull Object fieldValue, DateTimeFormatter formatter);
+    }
+
+    private static IndexGenerator createRuntimeIndexGenerator(
+            String index, String[] fieldNames, DataType[] fieldTypes, IndexHelper indexHelper) {
+        final String dynamicIndexPatternStr = indexHelper.extractDynamicIndexPatternStr(index);
+        final String indexPrefix = index.substring(0, index.indexOf(dynamicIndexPatternStr));
+        final String indexSuffix =
+                index.substring(indexPrefix.length() + dynamicIndexPatternStr.length());
+
+        final boolean isDynamicIndexWithFormat = indexHelper.checkIsDynamicIndexWithFormat(index);
+        final int indexFieldPos =
+                indexHelper.extractIndexFieldPos(index, fieldNames, isDynamicIndexWithFormat);
+        final LogicalType indexFieldType = fieldTypes[indexFieldPos].getLogicalType();
+        final LogicalTypeRoot indexFieldLogicalTypeRoot = indexFieldType.getTypeRoot();
+
+        // validate index field type
+        indexHelper.validateIndexFieldType(indexFieldLogicalTypeRoot);
+
+        // time extract dynamic index pattern
+        final RowData.FieldGetter fieldGetter =
+                RowData.createFieldGetter(indexFieldType, indexFieldPos);
+
+        if (isDynamicIndexWithFormat) {
+            final String dateTimeFormat =
+                    indexHelper.extractDateFormat(index, indexFieldLogicalTypeRoot);
+            DynamicFormatter formatFunction =
+                    createFormatFunction(indexFieldType, indexFieldLogicalTypeRoot);
+
+            return new AbstractTimeIndexGenerator(index, dateTimeFormat) {
+                @Override
+                public String generate(RowData row) {
+                    Object fieldOrNull = fieldGetter.getFieldOrNull(row);
+                    final String formattedField;
+                    // TODO we can possibly optimize it to use the nullability of the field
+                    if (fieldOrNull != null) {
+                        formattedField = formatFunction.format(fieldOrNull, dateTimeFormatter);
+                    } else {
+                        formattedField = "null";
+                    }
+                    return indexPrefix.concat(formattedField).concat(indexSuffix);
+                }
+            };
+        }
+        // general dynamic index pattern
+        return new IndexGeneratorBase(index) {
+            @Override
+            public String generate(RowData row) {
+                Object indexField = fieldGetter.getFieldOrNull(row);
+                return indexPrefix
+                        .concat(indexField == null ? "null" : indexField.toString())
+                        .concat(indexSuffix);
+            }
+        };
+    }
+
+    private static DynamicFormatter createFormatFunction(
+            LogicalType indexFieldType, LogicalTypeRoot indexFieldLogicalTypeRoot) {
+        switch (indexFieldLogicalTypeRoot) {
+            case DATE:
+                return (value, dateTimeFormatter) -> {
+                    Integer indexField = (Integer) value;
+                    return LocalDate.ofEpochDay(indexField).format(dateTimeFormatter);
+                };
+            case TIME_WITHOUT_TIME_ZONE:
+                return (value, dateTimeFormatter) -> {
+                    Integer indexField = (Integer) value;
+                    return LocalTime.ofNanoOfDay(indexField * 1_000_000L).format(dateTimeFormatter);
+                };
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return (value, dateTimeFormatter) -> {
+                    TimestampData indexField = (TimestampData) value;
+                    return indexField.toLocalDateTime().format(dateTimeFormatter);
+                };
+            case TIMESTAMP_WITH_TIME_ZONE:
+                throw new UnsupportedOperationException(
+                        "TIMESTAMP_WITH_TIME_ZONE is not supported yet");
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return (value, dateTimeFormatter) -> {
+                    TimestampData indexField = (TimestampData) value;
+                    return indexField.toInstant().atZone(ZoneOffset.UTC).format(dateTimeFormatter);
+                };
+            default:
+                throw new TableException(
+                        String.format(
+                                "Unsupported type '%s' found in Elasticsearch dynamic index field, "
+                                        + "time-related pattern only support types are: DATE,TIME,TIMESTAMP.",
+                                indexFieldType));
+        }
+    }
+
+    /**
+     * Helper class for {@link IndexGeneratorFactory}, this helper can use to validate index field
+     * type ans parse index format from pattern.
+     */
+    private static class IndexHelper {
+        private static final Pattern dynamicIndexPattern = Pattern.compile("\\{[^\\{\\}]+\\}?");
+        private static final Pattern dynamicIndexTimeExtractPattern =
+                Pattern.compile(".*\\{.+\\|.*\\}.*");
+        private static final List<LogicalTypeRoot> supportedTypes = new ArrayList<>();
+        private static final Map<LogicalTypeRoot, String> defaultFormats = new HashMap<>();
+
+        static {
+            // time related types
+            supportedTypes.add(LogicalTypeRoot.DATE);
+            supportedTypes.add(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE);
+            supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+            supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE);
+            supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+            // general types
+            supportedTypes.add(LogicalTypeRoot.VARCHAR);
+            supportedTypes.add(LogicalTypeRoot.CHAR);
+            supportedTypes.add(LogicalTypeRoot.TINYINT);
+            supportedTypes.add(LogicalTypeRoot.INTEGER);
+            supportedTypes.add(LogicalTypeRoot.BIGINT);
+        }
+
+        static {
+            defaultFormats.put(LogicalTypeRoot.DATE, "yyyy_MM_dd");
+            defaultFormats.put(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, "HH_mm_ss");
+            defaultFormats.put(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, "yyyy_MM_dd_HH_mm_ss");
+            defaultFormats.put(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE, "yyyy_MM_dd_HH_mm_ss");
+            defaultFormats.put(
+                    LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, "yyyy_MM_dd_HH_mm_ssX");
+        }
+
+        /** Validate the index field Type. */
+        void validateIndexFieldType(LogicalTypeRoot logicalType) {
+            if (!supportedTypes.contains(logicalType)) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Unsupported type %s of index field, " + "Supported types are: %s",
+                                logicalType, supportedTypes));
+            }
+        }
+
+        /** Get the default date format. */
+        String getDefaultFormat(LogicalTypeRoot logicalType) {
+            return defaultFormats.get(logicalType);
+        }
+
+        /** Check general dynamic index is enabled or not by index pattern. */
+        boolean checkIsDynamicIndex(String index) {
+            final Matcher matcher = dynamicIndexPattern.matcher(index);
+            int count = 0;
+            while (matcher.find()) {
+                count++;
+            }
+            if (count > 1) {
+                throw new TableException(
+                        String.format(
+                                "Chaining dynamic index pattern %s is not supported,"
+                                        + " only support single dynamic index pattern.",
+                                index));
+            }
+            return count == 1;
+        }
+
+        /** Check time extract dynamic index is enabled or not by index pattern. */
+        boolean checkIsDynamicIndexWithFormat(String index) {
+            return dynamicIndexTimeExtractPattern.matcher(index).matches();
+        }
+
+        /** Extract dynamic index pattern string from index pattern string. */
+        String extractDynamicIndexPatternStr(String index) {
+            int start = index.indexOf("{");
+            int end = index.lastIndexOf("}");
+            return index.substring(start, end + 1);
+        }
+
+        /** Extract index field position in a fieldNames, return the field position. */
+        int extractIndexFieldPos(
+                String index, String[] fieldNames, boolean isDynamicIndexWithFormat) {
+            List<String> fieldList = Arrays.asList(fieldNames);
+            String indexFieldName;
+            if (isDynamicIndexWithFormat) {
+                indexFieldName = index.substring(index.indexOf("{") + 1, index.indexOf("|"));
+            } else {
+                indexFieldName = index.substring(index.indexOf("{") + 1, index.indexOf("}"));
+            }
+            if (!fieldList.contains(indexFieldName)) {
+                throw new TableException(
+                        String.format(
+                                "Unknown field '%s' in index pattern '%s', please check the field name.",
+                                indexFieldName, index));
+            }
+            return fieldList.indexOf(indexFieldName);
+        }
+
+        /** Extract dateTime format by the date format that extracted from index pattern string. */
+        private String extractDateFormat(String index, LogicalTypeRoot logicalType) {
+            String format = index.substring(index.indexOf("|") + 1, index.indexOf("}"));
+            if ("".equals(format)) {
+                format = getDefaultFormat(logicalType);
+            }
+            return format;
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java
new file mode 100644
index 000000000..a1b74eb1a
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.Period;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/** An extractor for a Elasticsearch key from a {@link RowData}. */
+public class KeyExtractor implements Function<RowData, String>, Serializable {
+    private final FieldFormatter[] fieldFormatters;
+    private final String keyDelimiter;
+
+    private interface FieldFormatter extends Serializable {
+        String format(RowData rowData);
+    }
+
+    private KeyExtractor(FieldFormatter[] fieldFormatters, String keyDelimiter) {
+        this.fieldFormatters = fieldFormatters;
+        this.keyDelimiter = keyDelimiter;
+    }
+
+    @Override
+    public String apply(RowData rowData) {
+        final StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < fieldFormatters.length; i++) {
+            if (i > 0) {
+                builder.append(keyDelimiter);
+            }
+            final String value = fieldFormatters[i].format(rowData);
+            builder.append(value);
+        }
+        return builder.toString();
+    }
+
+    private static class ColumnWithIndex {
+        public TableColumn column;
+        public int index;
+
+        public ColumnWithIndex(TableColumn column, int index) {
+            this.column = column;
+            this.index = index;
+        }
+
+        public LogicalType getType() {
+            return column.getType().getLogicalType();
+        }
+
+        public int getIndex() {
+            return index;
+        }
+    }
+
+    public static Function<RowData, String> createKeyExtractor(
+            TableSchema schema, String keyDelimiter) {
+        return schema.getPrimaryKey()
+                .map(
+                        key -> {
+                            Map<String, ColumnWithIndex> namesToColumns = new HashMap<>();
+                            List<TableColumn> tableColumns = schema.getTableColumns();
+                            for (int i = 0; i < schema.getFieldCount(); i++) {
+                                TableColumn column = tableColumns.get(i);
+                                namesToColumns.put(
+                                        column.getName(), new ColumnWithIndex(column, i));
+                            }
+
+                            FieldFormatter[] fieldFormatters =
+                                    key.getColumns().stream()
+                                            .map(namesToColumns::get)
+                                            .map(
+                                                    column ->
+                                                            toFormatter(
+                                                                    column.index, column.getType()))
+                                            .toArray(FieldFormatter[]::new);
+
+                            return (Function<RowData, String>)
+                                    new KeyExtractor(fieldFormatters, keyDelimiter);
+                        })
+                .orElseGet(() -> (Function<RowData, String> & Serializable) (row) -> null);
+    }
+
+    private static FieldFormatter toFormatter(int index, LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case DATE:
+                return (row) -> LocalDate.ofEpochDay(row.getInt(index)).toString();
+            case TIME_WITHOUT_TIME_ZONE:
+                return (row) ->
+                        LocalTime.ofNanoOfDay((long) row.getInt(index) * 1_000_000L).toString();
+            case INTERVAL_YEAR_MONTH:
+                return (row) -> Period.ofDays(row.getInt(index)).toString();
+            case INTERVAL_DAY_TIME:
+                return (row) -> Duration.ofMillis(row.getLong(index)).toString();
+            case DISTINCT_TYPE:
+                return toFormatter(index, ((DistinctType) type).getSourceType());
+            default:
+                RowData.FieldGetter fieldGetter = RowData.createFieldGetter(type, index);
+                return (row) -> fieldGetter.getFieldOrNull(row).toString();
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RequestFactory.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RequestFactory.java
new file mode 100644
index 000000000..70df59792
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RequestFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import java.io.Serializable;
+
+/** For version-agnostic creating of {@link ActionRequest}s. */
+public interface RequestFactory extends Serializable {
+    /**
+     * Creates an update request to be added to a {@link RequestIndexer}. Note: the type field has
+     * been deprecated since Elasticsearch 7.x and it would not take any effort.
+     */
+    UpdateRequest createUpdateRequest(
+            String index, String docType, String key, XContentType contentType, byte[] document);
+
+    /**
+     * Creates an index request to be added to a {@link RequestIndexer}. Note: the type field has
+     * been deprecated since Elasticsearch 7.x and it would not take any effort.
+     */
+    IndexRequest createIndexRequest(
+            String index, String docType, String key, XContentType contentType, byte[] document);
+
+    /**
+     * Creates a delete request to be added to a {@link RequestIndexer}. Note: the type field has
+     * been deprecated since Elasticsearch 7.x and it would not take any effort.
+     */
+    DeleteRequest createDeleteRequest(String index, String docType, String key);
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RoutingExtractor.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RoutingExtractor.java
new file mode 100644
index 000000000..4ddcd544a
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RoutingExtractor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * An extractor for a Elasticsearch routing from a {@link RowData}.
+ */
+@Internal
+public class RoutingExtractor {
+    private RoutingExtractor() {
+    }
+
+    public static Function<RowData, String> createRoutingExtractor(
+            TableSchema schema,
+            @Nullable String filedName) {
+        if (filedName == null) {
+            return null;
+        }
+        List<TableColumn> tableColumns = schema.getTableColumns();
+        for (int i = 0; i < schema.getFieldCount(); i++) {
+            TableColumn column = tableColumns.get(i);
+            if (column.getName().equals(filedName)) {
+                RowData.FieldGetter fieldGetter = RowData.createFieldGetter(
+                        column.getType().getLogicalType(),
+                        i);
+                return (Function<RowData, String> & Serializable) (row) -> {
+                    Object fieldOrNull = fieldGetter.getFieldOrNull(row);
+                    if (fieldOrNull != null) {
+                        return fieldOrNull.toString();
+                    } else {
+                        return null;
+                    }
+                };
+            }
+        }
+        throw new IllegalArgumentException("Filed " + filedName + " not exist in table schema.");
+    }
+}
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
new file mode 100644
index 000000000..2e227ad55
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.function.Function;
+
+/** Sink function for converting upserts into Elasticsearch {@link ActionRequest}s. */
+public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final IndexGenerator indexGenerator;
+    private final String docType;
+    private final SerializationSchema<RowData> serializationSchema;
+    private final XContentType contentType;
+    private final RequestFactory requestFactory;
+    private final Function<RowData, String> createKey;
+
+    private final Function<RowData, String> createRouting;
+
+    public RowElasticsearchSinkFunction(
+            IndexGenerator indexGenerator,
+            @Nullable String docType, // this is deprecated in es 7+
+            SerializationSchema<RowData> serializationSchema,
+            XContentType contentType,
+            RequestFactory requestFactory,
+            Function<RowData, String> createKey,
+            @Nullable Function<RowData, String> createRouting) {
+        this.indexGenerator = Preconditions.checkNotNull(indexGenerator);
+        this.docType = docType;
+        this.serializationSchema = Preconditions.checkNotNull(serializationSchema);
+        this.contentType = Preconditions.checkNotNull(contentType);
+        this.requestFactory = Preconditions.checkNotNull(requestFactory);
+        this.createKey = Preconditions.checkNotNull(createKey);
+        this.createRouting = createRouting;
+    }
+
+    @Override
+    public void open() {
+        indexGenerator.open();
+    }
+
+    @Override
+    public void process(RowData element, RuntimeContext ctx, RequestIndexer indexer) {
+        switch (element.getRowKind()) {
+            case INSERT:
+            case UPDATE_AFTER:
+                processUpsert(element, indexer);
+                break;
+            case UPDATE_BEFORE:
+            case DELETE:
+                processDelete(element, indexer);
+                break;
+            default:
+                throw new TableException("Unsupported message kind: " + element.getRowKind());
+        }
+    }
+
+    private void processUpsert(RowData row, RequestIndexer indexer) {
+        final byte[] document = serializationSchema.serialize(row);
+        final String key = createKey.apply(row);
+        if (key != null) {
+            final UpdateRequest updateRequest =
+                    requestFactory.createUpdateRequest(
+                            indexGenerator.generate(row), docType, key, contentType, document);
+            addRouting(updateRequest, row);
+            indexer.add(updateRequest);
+        } else {
+            final IndexRequest indexRequest =
+                    requestFactory.createIndexRequest(
+                            indexGenerator.generate(row), docType, key, contentType, document);
+            addRouting(indexRequest, row);
+            indexer.add(indexRequest);
+        }
+    }
+
+    private void processDelete(RowData row, RequestIndexer indexer) {
+        final String key = createKey.apply(row);
+        final DeleteRequest deleteRequest =
+                requestFactory.createDeleteRequest(indexGenerator.generate(row), docType, key);
+        addRouting(deleteRequest, row);
+        indexer.add(deleteRequest);
+    }
+
+    private void addRouting(DocWriteRequest<?> request, RowData row) {
+        if (null != createRouting) {
+            String routing = createRouting.apply(row);
+            request.routing(routing);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        RowElasticsearchSinkFunction that = (RowElasticsearchSinkFunction) o;
+        return Objects.equals(indexGenerator, that.indexGenerator)
+                && Objects.equals(docType, that.docType)
+                && Objects.equals(serializationSchema, that.serializationSchema)
+                && contentType == that.contentType
+                && Objects.equals(requestFactory, that.requestFactory)
+                && Objects.equals(createKey, that.createKey);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                indexGenerator,
+                docType,
+                serializationSchema,
+                contentType,
+                requestFactory,
+                createKey);
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java
new file mode 100644
index 000000000..fdacd5626
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.table.data.RowData;
+
+/** A static {@link IndexGenerator} which generate fixed index name. */
+
+public final class StaticIndexGenerator extends IndexGeneratorBase {
+
+    public StaticIndexGenerator(String index) {
+        super(index);
+    }
+
+    public String generate(RowData row) {
+        return index;
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/test/java/org/apache/inlong/sort/elasticsearch/table/TestContext.java b/inlong-sort/sort-connectors/elasticsearch-base/src/test/java/org/apache/inlong/sort/elasticsearch/table/TestContext.java
new file mode 100644
index 000000000..e10b9b0c3
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/test/java/org/apache/inlong/sort/elasticsearch/table/TestContext.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/** A utility class for mocking {@link DynamicTableFactory.Context}. */
+public class TestContext {
+
+    private ResolvedSchema schema = ResolvedSchema.of(Column.physical("a", DataTypes.TIME()));
+
+    private final Map<String, String> options = new HashMap<>();
+
+    public static TestContext context() {
+        return new TestContext();
+    }
+
+    public TestContext withSchema(ResolvedSchema schema) {
+        this.schema = schema;
+        return this;
+    }
+
+    public DynamicTableFactory.Context build() {
+        return new FactoryUtil.DefaultDynamicTableContext(
+                ObjectIdentifier.of("default", "default", "t1"),
+                new ResolvedCatalogTable(
+                        CatalogTable.of(
+                                Schema.newBuilder().fromResolvedSchema(schema).build(),
+                                "mock context",
+                                Collections.emptyList(),
+                                options),
+                        schema),
+                new Configuration(),
+                TestContext.class.getClassLoader(),
+                false);
+    }
+
+    public TestContext withOption(String key, String value) {
+        options.put(key, value);
+        return this;
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/test/resources/log4j2-test.properties b/inlong-sort/sort-connectors/elasticsearch-base/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..835c2ec9a
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/inlong-sort/sort-connectors/pom.xml b/inlong-sort/sort-connectors/pom.xml
index e687ba679..2a4e4902e 100644
--- a/inlong-sort/sort-connectors/pom.xml
+++ b/inlong-sort/sort-connectors/pom.xml
@@ -48,6 +48,7 @@
         <module>mongodb-cdc</module>
         <module>sqlserver-cdc</module>
         <module>oracle-cdc</module>
+        <module>elasticsearch-base</module>
         <module>elasticsearch-6</module>
         <module>elasticsearch-7</module>
     </modules>
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java
index e965cc42f..4ed00fb72 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java
@@ -54,7 +54,7 @@ public class ElasticsearchSqlParseTest extends AbstractTestBase {
         Map<String, String> map = new HashMap<>();
         return new MySqlExtractNode("1", "mysql_input", fields,
             null, map, "age",
-            Collections.singletonList("user"), "localhost", "root", "Eminem@123456",
+            Collections.singletonList("user"), "localhost", "root", "888888",
             "test", null, null,
             true, null);
     }
@@ -72,7 +72,7 @@ public class ElasticsearchSqlParseTest extends AbstractTestBase {
         return new ElasticsearchLoadNode("2", "kafka_output", fields, relations, null, null,
             2, null,
             "test", "http://localhost:9200",
-            "my_admin", "my_password", null, "age", 7);
+            "elastic", "my_password", null, "age", 7);
     }
 
     private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 23c2037d5..7f2471260 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -480,6 +480,29 @@
  Source  : pulsar-flink-connector_2.11 1.13.6.1-rc9 (Please note that the software have been modified.)
  License : https://github.com/streamnative/pulsar-flink/blob/master/LICENSE
 
+ 1.3.8 inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6Configuration.java
+       inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
+       inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
+       inlong-sort/sort-connectors/elasticsearch-6/src/test/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkITCase.java
+       inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7Configuration.java
+       inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
+       inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java
+       inlong-sort/sort-connectors/elasticsearch-7/src/test/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkITCase.java
+       inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/AbstractTimeIndexGenerator.java
+       inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java
+       inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java
+       inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java
+       inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchOptions.java
+       inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RequestFactory.java
+       inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java
+       inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java
+       inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+       inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java
+       inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java
+       inlong-sort/sort-connectors/elasticsearch-base/src/test/java/org/apache/inlong/sort/elasticsearch/table/TestContext.java
+  source  : flink-connector-elasticsearch 1.13.2 (Please note that the software have been modified.)
+  License : https://github.com/apache/flink/blob/release-1.13.2-rc2/LICENSE
+
 =======================================================================
 Apache InLong Subcomponents:
 
diff --git a/pom.xml b/pom.xml
index 3aff01ba1..6a30758f7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -155,7 +155,8 @@
         <mybatis.starter.version>2.1.3</mybatis.starter.version>
         <mybatis.version>3.5.9</mybatis.version>
         <druid.version>1.2.6</druid.version>
-        <elasticsearch.version>6.8.23</elasticsearch.version>
+        <elasticsearch6.version>6.8.17</elasticsearch6.version>
+        <elasticsearch7.version>7.9.2</elasticsearch7.version>
         <shiro.version>1.8.0</shiro.version>
 
         <snappy.version>1.1.8.4</snappy.version>
@@ -845,22 +846,22 @@
             <dependency>
                 <groupId>org.elasticsearch.client</groupId>
                 <artifactId>elasticsearch-rest-high-level-client</artifactId>
-                <version>${elasticsearch.version}</version>
+                <version>${elasticsearch6.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.elasticsearch.client</groupId>
                 <artifactId>elasticsearch-rest-client</artifactId>
-                <version>${elasticsearch.version}</version>
+                <version>${elasticsearch6.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.elasticsearch</groupId>
                 <artifactId>elasticsearch</artifactId>
-                <version>${elasticsearch.version}</version>
+                <version>${elasticsearch6.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.elasticsearch.client</groupId>
                 <artifactId>elasticsearch-rest-client-sniffer</artifactId>
-                <version>${elasticsearch.version}</version>
+                <version>${elasticsearch6.version}</version>
             </dependency>
 
             <dependency>