You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/08/10 11:42:51 UTC

[inlong] branch master updated: [INLONG-5205][Sort] Add reporting metrics for Elasticsearch 6 connector (#5374)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7247af95d [INLONG-5205][Sort] Add reporting metrics for Elasticsearch 6 connector (#5374)
7247af95d is described below

commit 7247af95dbe54bce4a86203cc12537374a591c3c
Author: Oneal65 <li...@foxmail.com>
AuthorDate: Wed Aug 10 19:42:46 2022 +0800

    [INLONG-5205][Sort] Add reporting metrics for Elasticsearch 6 connector (#5374)
---
 .../sort-connectors/elasticsearch-6/pom.xml        |   5 +
 .../Elasticsearch6ApiCallBridge.java               | 147 ++++++
 .../Elasticsearch6BulkProcessorIndexer.java        |  85 ++++
 .../sort/elasticsearch6/ElasticsearchSink.java     | 279 ++++++++++
 .../table/Elasticsearch6DynamicSink.java           | 105 ++--
 .../table/Elasticsearch6DynamicSinkFactory.java    |   8 +-
 .../Elasticsearch7ApiCallBridge.java               | 151 ++++++
 .../Elasticsearch7BulkProcessorIndexer.java        |  86 ++++
 .../sort/elasticsearch7/ElasticsearchSink.java     | 265 ++++++++++
 .../table/Elasticsearch7DynamicSink.java           |  91 ++--
 .../sort-connectors/elasticsearch-base/pom.xml     |   5 +
 .../elasticsearch/BufferingNoOpRequestIndexer.java |  76 +++
 .../elasticsearch/ElasticsearchApiCallBridge.java  | 116 +++++
 .../sort/elasticsearch/ElasticsearchSinkBase.java  | 559 +++++++++++++++++++++
 .../elasticsearch/ElasticsearchSinkFunction.java   |  62 +++
 .../PreElasticsearch6BulkProcessorIndexer.java     |  88 ++++
 .../table/ElasticsearchConfiguration.java          |   7 +-
 .../elasticsearch/table/ElasticsearchOptions.java  |  23 +-
 .../table/RowElasticsearchSinkFunction.java        |  41 +-
 19 files changed, 2087 insertions(+), 112 deletions(-)

diff --git a/inlong-sort/sort-connectors/elasticsearch-6/pom.xml b/inlong-sort/sort-connectors/elasticsearch-6/pom.xml
index 1f3c65852..efd2ad903 100644
--- a/inlong-sort/sort-connectors/elasticsearch-6/pom.xml
+++ b/inlong-sort/sort-connectors/elasticsearch-6/pom.xml
@@ -48,6 +48,11 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.elasticsearch.client</groupId>
             <artifactId>elasticsearch-rest-high-level-client</artifactId>
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/Elasticsearch6ApiCallBridge.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/Elasticsearch6ApiCallBridge.java
new file mode 100644
index 000000000..c8fe3ecbf
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/Elasticsearch6ApiCallBridge.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch6;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
+import org.apache.flink.util.Preconditions;
+import org.apache.http.HttpHost;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 6 and later versions.
+ */
+@Internal
+public class Elasticsearch6ApiCallBridge
+        implements ElasticsearchApiCallBridge<RestHighLevelClient> {
+
+    private static final long serialVersionUID = -5222683870097809633L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6ApiCallBridge.class);
+
+    /**
+     * User-provided HTTP Host.
+     */
+    private final List<HttpHost> httpHosts;
+
+    /**
+     * The factory to configure the rest client.
+     */
+    private final RestClientFactory restClientFactory;
+
+    Elasticsearch6ApiCallBridge(List<HttpHost> httpHosts, RestClientFactory restClientFactory) {
+        Preconditions.checkArgument(httpHosts != null && !httpHosts.isEmpty());
+        this.httpHosts = httpHosts;
+        this.restClientFactory = Preconditions.checkNotNull(restClientFactory);
+    }
+
+    @Override
+    public RestHighLevelClient createClient(Map<String, String> clientConfig) {
+        RestClientBuilder builder =
+                RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()]));
+        restClientFactory.configureRestClientBuilder(builder);
+
+        RestHighLevelClient rhlClient = new RestHighLevelClient(builder);
+
+        return rhlClient;
+    }
+
+    @Override
+    public BulkProcessor.Builder createBulkProcessorBuilder(
+            RestHighLevelClient client, BulkProcessor.Listener listener) {
+        return BulkProcessor.builder(client::bulkAsync, listener);
+    }
+
+    @Override
+    public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
+        if (!bulkItemResponse.isFailed()) {
+            return null;
+        } else {
+            return bulkItemResponse.getFailure().getCause();
+        }
+    }
+
+    @Override
+    public void configureBulkProcessorBackoff(
+            BulkProcessor.Builder builder,
+            @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
+
+        BackoffPolicy backoffPolicy;
+        if (flushBackoffPolicy != null) {
+            switch (flushBackoffPolicy.getBackoffType()) {
+                case CONSTANT:
+                    backoffPolicy =
+                            BackoffPolicy.constantBackoff(
+                                    new TimeValue(flushBackoffPolicy.getDelayMillis()),
+                                    flushBackoffPolicy.getMaxRetryCount());
+                    break;
+                case EXPONENTIAL:
+                default:
+                    backoffPolicy =
+                            BackoffPolicy.exponentialBackoff(
+                                    new TimeValue(flushBackoffPolicy.getDelayMillis()),
+                                    flushBackoffPolicy.getMaxRetryCount());
+            }
+        } else {
+            backoffPolicy = BackoffPolicy.noBackoff();
+        }
+
+        builder.setBackoffPolicy(backoffPolicy);
+    }
+
+    @Override
+    public RequestIndexer createBulkProcessorIndexer(
+            BulkProcessor bulkProcessor,
+            boolean flushOnCheckpoint,
+            AtomicLong numPendingRequestsRef) {
+        return new Elasticsearch6BulkProcessorIndexer(
+                bulkProcessor, flushOnCheckpoint, numPendingRequestsRef);
+    }
+
+    @Override
+    public void verifyClientConnection(RestHighLevelClient client) throws IOException {
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Pinging Elasticsearch cluster via hosts {} ...", httpHosts);
+        }
+
+        if (!client.ping()) {
+            throw new RuntimeException("There are no reachable Elasticsearch nodes!");
+        }
+
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Elasticsearch RestHighLevelClient is connected to {}", httpHosts.toString());
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java
new file mode 100644
index 000000000..388dd122c
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch6;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}. {@link ActionRequest
+ * ActionRequests} will be buffered before sending a bulk request to the Elasticsearch cluster.
+ *
+ * <p>Note: This class is binary compatible to Elasticsearch 6.
+ */
+@Internal
+class Elasticsearch6BulkProcessorIndexer implements RequestIndexer {
+
+    private final BulkProcessor bulkProcessor;
+    private final boolean flushOnCheckpoint;
+    private final AtomicLong numPendingRequestsRef;
+
+    Elasticsearch6BulkProcessorIndexer(
+            BulkProcessor bulkProcessor,
+            boolean flushOnCheckpoint,
+            AtomicLong numPendingRequestsRef) {
+        this.bulkProcessor = checkNotNull(bulkProcessor);
+        this.flushOnCheckpoint = flushOnCheckpoint;
+        this.numPendingRequestsRef = checkNotNull(numPendingRequestsRef);
+    }
+
+    @Override
+    public void add(DeleteRequest... deleteRequests) {
+        for (DeleteRequest deleteRequest : deleteRequests) {
+            if (flushOnCheckpoint) {
+                numPendingRequestsRef.getAndIncrement();
+            }
+            this.bulkProcessor.add(deleteRequest);
+        }
+    }
+
+    @Override
+    public void add(IndexRequest... indexRequests) {
+        for (IndexRequest indexRequest : indexRequests) {
+            if (flushOnCheckpoint) {
+                numPendingRequestsRef.getAndIncrement();
+            }
+            this.bulkProcessor.add(indexRequest);
+        }
+    }
+
+    @Override
+    public void add(UpdateRequest... updateRequests) {
+        for (UpdateRequest updateRequest : updateRequests) {
+            if (flushOnCheckpoint) {
+                numPendingRequestsRef.getAndIncrement();
+            }
+            this.bulkProcessor.add(updateRequest);
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
new file mode 100644
index 000000000..c30787d7c
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch6;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
+import org.apache.flink.util.Preconditions;
+import org.apache.http.HttpHost;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Elasticsearch 6.x sink that requests multiple {@link ActionRequest ActionRequests} against a
+ * cluster for each incoming element.
+ *
+ * <p>The sink internally uses a {@link RestHighLevelClient} to communicate with an Elasticsearch
+ * cluster. The sink will fail if no cluster can be connected to using the provided transport
+ * addresses passed to the constructor.
+ *
+ * <p>Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest
+ * ActionRequests}. This will buffer elements before sending a request to the cluster. The behaviour
+ * of the {@code BulkProcessor} can be configured using these config keys:
+ *
+ * <ul>
+ *   <li>{@code bulk.flush.max.actions}: Maximum amount of elements to buffer
+ *   <li>{@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer
+ *   <li>{@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two
+ *       settings in milliseconds
+ * </ul>
+ *
+ * <p>You also have to provide an {@link ElasticsearchSinkFunction}. This is used to create multiple
+ * {@link ActionRequest ActionRequests} for each incoming element. See the class level documentation
+ * of {@link ElasticsearchSinkFunction} for an example.
+ *
+ * @param <T> Type of the elements handled by this sink
+ */
+@PublicEvolving
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevelClient> {
+
+    private static final long serialVersionUID = 1L;
+
+    private ElasticsearchSink(
+            Map<String, String> bulkRequestsConfig,
+            List<HttpHost> httpHosts,
+            ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+            ActionRequestFailureHandler failureHandler,
+            RestClientFactory restClientFactory,
+            String inLongMetric) {
+
+        super(
+                new Elasticsearch6ApiCallBridge(httpHosts, restClientFactory),
+                bulkRequestsConfig,
+                elasticsearchSinkFunction,
+                failureHandler,
+                inLongMetric);
+    }
+
+    /**
+     * A builder for creating an {@link ElasticsearchSink}.
+     *
+     * @param <T> Type of the elements handled by the sink this builder creates.
+     */
+    @PublicEvolving
+    public static class Builder<T> {
+
+        private final List<HttpHost> httpHosts;
+        private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
+
+        private Map<String, String> bulkRequestsConfig = new HashMap<>();
+        private ActionRequestFailureHandler failureHandler = new NoOpFailureHandler();
+        private RestClientFactory restClientFactory = restClientBuilder -> {
+        };
+        private String inLongMetric = null;
+
+        /**
+         * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link
+         * RestHighLevelClient}.
+         *
+         * @param httpHosts The list of {@link HttpHost} to which the {@link RestHighLevelClient}
+         *         connects to.
+         * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest}
+         *         from the incoming element.
+         */
+        public Builder(
+                List<HttpHost> httpHosts, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+            this.httpHosts = Preconditions.checkNotNull(httpHosts);
+            this.elasticsearchSinkFunction = Preconditions.checkNotNull(elasticsearchSinkFunction);
+        }
+
+        /**
+         * set InLongMetric for reporting metrics
+         * @param inLongMetric
+         */
+        public void setInLongMetric(String inLongMetric) {
+            this.inLongMetric = inLongMetric;
+        }
+
+        /**
+         * Sets the maximum number of actions to buffer for each bulk request. You can pass -1 to
+         * disable it.
+         *
+         * @param numMaxActions the maximum number of actions to buffer per bulk request.
+         */
+        public void setBulkFlushMaxActions(int numMaxActions) {
+            Preconditions.checkArgument(
+                    numMaxActions == -1 || numMaxActions > 0,
+                    "Max number of buffered actions must be larger than 0.");
+
+            this.bulkRequestsConfig.put(
+                    CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, String.valueOf(numMaxActions));
+        }
+
+        /**
+         * Sets the maximum size of buffered actions, in mb, per bulk request. You can pass -1 to
+         * disable it.
+         *
+         * @param maxSizeMb the maximum size of buffered actions, in mb.
+         */
+        public void setBulkFlushMaxSizeMb(int maxSizeMb) {
+            Preconditions.checkArgument(
+                    maxSizeMb == -1 || maxSizeMb > 0,
+                    "Max size of buffered actions must be larger than 0.");
+
+            this.bulkRequestsConfig.put(
+                    CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, String.valueOf(maxSizeMb));
+        }
+
+        /**
+         * Sets the bulk flush interval, in milliseconds. You can pass -1 to disable it.
+         *
+         * @param intervalMillis the bulk flush interval, in milliseconds.
+         */
+        public void setBulkFlushInterval(long intervalMillis) {
+            Preconditions.checkArgument(
+                    intervalMillis == -1 || intervalMillis >= 0,
+                    "Interval (in milliseconds) between each flush must be larger than or equal to 0.");
+
+            this.bulkRequestsConfig.put(
+                    CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, String.valueOf(intervalMillis));
+        }
+
+        /**
+         * Sets whether or not to enable bulk flush backoff behaviour.
+         *
+         * @param enabled whether or not to enable backoffs.
+         */
+        public void setBulkFlushBackoff(boolean enabled) {
+            this.bulkRequestsConfig.put(
+                    CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, String.valueOf(enabled));
+        }
+
+        /**
+         * Sets the type of back of to use when flushing bulk requests.
+         *
+         * @param flushBackoffType the backoff type to use.
+         */
+        public void setBulkFlushBackoffType(FlushBackoffType flushBackoffType) {
+            this.bulkRequestsConfig.put(
+                    CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE,
+                    Preconditions.checkNotNull(flushBackoffType).toString());
+        }
+
+        /**
+         * Sets the maximum number of retries for a backoff attempt when flushing bulk requests.
+         *
+         * @param maxRetries the maximum number of retries for a backoff attempt when flushing bulk
+         *         requests
+         */
+        public void setBulkFlushBackoffRetries(int maxRetries) {
+            Preconditions.checkArgument(
+                    maxRetries > 0, "Max number of backoff attempts must be larger than 0.");
+
+            this.bulkRequestsConfig.put(
+                    CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES, String.valueOf(maxRetries));
+        }
+
+        /**
+         * Sets the amount of delay between each backoff attempt when flushing bulk requests, in
+         * milliseconds.
+         *
+         * @param delayMillis the amount of delay between each backoff attempt when flushing bulk
+         *         requests, in milliseconds.
+         */
+        public void setBulkFlushBackoffDelay(long delayMillis) {
+            Preconditions.checkArgument(
+                    delayMillis >= 0,
+                    "Delay (in milliseconds) between each backoff attempt must be larger than or equal to 0.");
+            this.bulkRequestsConfig.put(
+                    CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY, String.valueOf(delayMillis));
+        }
+
+        /**
+         * Sets a failure handler for action requests.
+         *
+         * @param failureHandler This is used to handle failed {@link ActionRequest}.
+         */
+        public void setFailureHandler(ActionRequestFailureHandler failureHandler) {
+            this.failureHandler = Preconditions.checkNotNull(failureHandler);
+        }
+
+        /**
+         * Sets a REST client factory for custom client configuration.
+         *
+         * @param restClientFactory the factory that configures the rest client.
+         */
+        public void setRestClientFactory(RestClientFactory restClientFactory) {
+            this.restClientFactory = Preconditions.checkNotNull(restClientFactory);
+        }
+
+        /**
+         * Creates the Elasticsearch sink.
+         *
+         * @return the created Elasticsearch sink.
+         */
+        public ElasticsearchSink<T> build() {
+            return new ElasticsearchSink<>(
+                    bulkRequestsConfig,
+                    httpHosts,
+                    elasticsearchSinkFunction,
+                    failureHandler,
+                    restClientFactory,
+                    inLongMetric
+                    );
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            Builder<?> builder = (Builder<?>) o;
+            return Objects.equals(httpHosts, builder.httpHosts)
+                    && Objects.equals(elasticsearchSinkFunction, builder.elasticsearchSinkFunction)
+                    && Objects.equals(bulkRequestsConfig, builder.bulkRequestsConfig)
+                    && Objects.equals(failureHandler, builder.failureHandler)
+                    && Objects.equals(restClientFactory, builder.restClientFactory)
+                    && Objects.equals(inLongMetric, builder.inLongMetric);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(
+                    httpHosts,
+                    elasticsearchSinkFunction,
+                    bulkRequestsConfig,
+                    failureHandler,
+                    restClientFactory,
+                    inLongMetric);
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
index af2ec05d8..af9841148 100644
--- a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
@@ -21,10 +21,6 @@ package org.apache.inlong.sort.elasticsearch6.table;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.inlong.sort.elasticsearch.table.RequestFactory;
-import org.apache.inlong.sort.elasticsearch.table.IndexGeneratorFactory;
-import org.apache.inlong.sort.elasticsearch.table.KeyExtractor;
-import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
 import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
@@ -34,14 +30,17 @@ import org.apache.flink.table.connector.sink.SinkFunctionProvider;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.StringUtils;
-
 import org.apache.http.HttpHost;
 import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
 import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.inlong.sort.elasticsearch.table.IndexGeneratorFactory;
+import org.apache.inlong.sort.elasticsearch.table.KeyExtractor;
+import org.apache.inlong.sort.elasticsearch.table.RequestFactory;
 import org.apache.inlong.sort.elasticsearch.table.RoutingExtractor;
 import org.apache.inlong.sort.elasticsearch.table.RowElasticsearchSinkFunction;
+import org.apache.inlong.sort.elasticsearch6.ElasticsearchSink;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.update.UpdateRequest;
@@ -49,7 +48,6 @@ import org.elasticsearch.client.RestClientBuilder;
 import org.elasticsearch.common.xcontent.XContentType;
 
 import javax.annotation.Nullable;
-
 import java.util.List;
 import java.util.Objects;
 
@@ -59,19 +57,15 @@ import java.util.Objects;
  */
 @PublicEvolving
 final class Elasticsearch6DynamicSink implements DynamicTableSink {
+
     @VisibleForTesting
     static final Elasticsearch6RequestFactory REQUEST_FACTORY = new Elasticsearch6RequestFactory();
 
     private final EncodingFormat<SerializationSchema<RowData>> format;
     private final TableSchema schema;
     private final Elasticsearch6Configuration config;
-
-    public Elasticsearch6DynamicSink(
-            EncodingFormat<SerializationSchema<RowData>> format,
-            Elasticsearch6Configuration config,
-            TableSchema schema) {
-        this(format, config, schema, (ElasticsearchSink.Builder::new));
-    }
+    private final String inLongMetric;
+    private final ElasticSearchBuilderProvider builderProvider;
 
     // --------------------------------------------------------------
     // Hack to make configuration testing possible.
@@ -83,29 +77,27 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
     // on the sink itself.
     // --------------------------------------------------------------
 
-    private final ElasticSearchBuilderProvider builderProvider;
-
-    @FunctionalInterface
-    interface ElasticSearchBuilderProvider {
-        ElasticsearchSink.Builder<RowData> createBuilder(
-                List<HttpHost> httpHosts, RowElasticsearchSinkFunction upsertSinkFunction);
+    public Elasticsearch6DynamicSink(
+            EncodingFormat<SerializationSchema<RowData>> format,
+            Elasticsearch6Configuration config,
+            TableSchema schema,
+            String inLongMetric) {
+        this(format, config, schema, (ElasticsearchSink.Builder::new), inLongMetric);
     }
 
     Elasticsearch6DynamicSink(
             EncodingFormat<SerializationSchema<RowData>> format,
             Elasticsearch6Configuration config,
             TableSchema schema,
-            ElasticSearchBuilderProvider builderProvider) {
+            ElasticSearchBuilderProvider builderProvider,
+            String inLongMetric) {
         this.format = format;
         this.schema = schema;
         this.config = config;
         this.builderProvider = builderProvider;
+        this.inLongMetric = inLongMetric;
     }
 
-    // --------------------------------------------------------------
-    // End of hack to make configuration testing possible
-    // --------------------------------------------------------------
-
     @Override
     public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
         ChangelogMode.Builder builder = ChangelogMode.newBuilder();
@@ -117,6 +109,10 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
         return builder.build();
     }
 
+    // --------------------------------------------------------------
+    // End of hack to make configuration testing possible
+    // --------------------------------------------------------------
+
     @Override
     public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
         return () -> {
@@ -132,7 +128,8 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
                             REQUEST_FACTORY,
                             KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()),
                             RoutingExtractor.createRoutingExtractor(
-                                    schema, config.getRoutingField().orElse(null)));
+                                    schema, config.getRoutingField().orElse(null)),
+                            inLongMetric);
 
             final ElasticsearchSink.Builder<RowData> builder =
                     builderProvider.createBuilder(config.getHosts(), upsertFunction);
@@ -142,6 +139,7 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
             builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20));
             builder.setBulkFlushInterval(config.getBulkFlushInterval());
             builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
+            builder.setInLongMetric(inLongMetric);
             config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
             config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
             config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
@@ -182,7 +180,37 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
         return "Elasticsearch6";
     }
 
-    /** Serializable {@link RestClientFactory} used by the sink. */
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Elasticsearch6DynamicSink that = (Elasticsearch6DynamicSink) o;
+        return Objects.equals(format, that.format)
+                && Objects.equals(schema, that.schema)
+                && Objects.equals(config, that.config)
+                && Objects.equals(builderProvider, that.builderProvider)
+                && Objects.equals(inLongMetric, that.inLongMetric);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(format, schema, config, builderProvider, inLongMetric);
+    }
+
+    @FunctionalInterface
+    interface ElasticSearchBuilderProvider {
+
+        ElasticsearchSink.Builder<RowData> createBuilder(
+                List<HttpHost> httpHosts, RowElasticsearchSinkFunction upsertSinkFunction);
+    }
+
+    /**
+     * Serializable {@link RestClientFactory} used by the sink.
+     */
     @VisibleForTesting
     static class DefaultRestClientFactory implements RestClientFactory {
 
@@ -217,7 +245,9 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
         }
     }
 
-    /** Serializable {@link RestClientFactory} used by the sink which enable authentication. */
+    /**
+     * Serializable {@link RestClientFactory} used by the sink which enable authentication.
+     */
     @VisibleForTesting
     static class AuthRestClientFactory implements RestClientFactory {
 
@@ -274,6 +304,7 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
      * sink.
      */
     private static class Elasticsearch6RequestFactory implements RequestFactory {
+
         @Override
         public UpdateRequest createUpdateRequest(
                 String index,
@@ -301,24 +332,4 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
             return new DeleteRequest(index, docType, key);
         }
     }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        Elasticsearch6DynamicSink that = (Elasticsearch6DynamicSink) o;
-        return Objects.equals(format, that.format)
-                && Objects.equals(schema, that.schema)
-                && Objects.equals(config, that.config)
-                && Objects.equals(builderProvider, that.builderProvider);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(format, schema, config, builderProvider);
-    }
 }
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
index 4a49939ec..61be6506b 100644
--- a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
@@ -53,6 +53,7 @@ import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.FL
 import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.FORMAT_OPTION;
 import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
 import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.INDEX_OPTION;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.KEY_DELIMITER_OPTION;
 import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.ROUTING_FIELD_NAME;
 import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.PASSWORD_OPTION;
@@ -79,7 +80,8 @@ public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory
                     CONNECTION_PATH_PREFIX,
                     FORMAT_OPTION,
                     PASSWORD_OPTION,
-                    USERNAME_OPTION)
+                    USERNAME_OPTION,
+                    INLONG_METRIC)
                     .collect(Collectors.toSet());
 
     @Override
@@ -100,8 +102,10 @@ public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory
 
         validate(config, configuration);
 
+        String inLongMetric = helper.getOptions().get(INLONG_METRIC);
+
         return new Elasticsearch6DynamicSink(
-                format, config, TableSchemaUtils.getPhysicalSchema(tableSchema));
+                format, config, TableSchemaUtils.getPhysicalSchema(tableSchema), inLongMetric);
     }
 
     private void validate(Elasticsearch6Configuration config, Configuration originalConfiguration) {
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/Elasticsearch7ApiCallBridge.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/Elasticsearch7ApiCallBridge.java
new file mode 100644
index 000000000..f43708245
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/Elasticsearch7ApiCallBridge.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch7;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
+import org.apache.flink.util.Preconditions;
+import org.apache.http.HttpHost;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 7 and later versions.
+ */
+@Internal
+public class Elasticsearch7ApiCallBridge
+        implements ElasticsearchApiCallBridge<RestHighLevelClient> {
+
+    private static final long serialVersionUID = -5222683870097809633L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch7ApiCallBridge.class);
+
+    /**
+     * User-provided HTTP Host.
+     */
+    private final List<HttpHost> httpHosts;
+
+    /**
+     * The factory to configure the rest client.
+     */
+    private final RestClientFactory restClientFactory;
+
+    Elasticsearch7ApiCallBridge(List<HttpHost> httpHosts, RestClientFactory restClientFactory) {
+        Preconditions.checkArgument(httpHosts != null && !httpHosts.isEmpty());
+        this.httpHosts = httpHosts;
+        this.restClientFactory = Preconditions.checkNotNull(restClientFactory);
+    }
+
+    @Override
+    public RestHighLevelClient createClient(Map<String, String> clientConfig) {
+        RestClientBuilder builder =
+                RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()]));
+        restClientFactory.configureRestClientBuilder(builder);
+
+        RestHighLevelClient rhlClient = new RestHighLevelClient(builder);
+
+        return rhlClient;
+    }
+
+    @Override
+    public BulkProcessor.Builder createBulkProcessorBuilder(
+            RestHighLevelClient client, BulkProcessor.Listener listener) {
+        return BulkProcessor.builder(
+                (request, bulkListener) ->
+                        client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
+                listener);
+    }
+
+    @Override
+    public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
+        if (!bulkItemResponse.isFailed()) {
+            return null;
+        } else {
+            return bulkItemResponse.getFailure().getCause();
+        }
+    }
+
+    @Override
+    public void configureBulkProcessorBackoff(
+            BulkProcessor.Builder builder,
+            @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
+
+        BackoffPolicy backoffPolicy;
+        if (flushBackoffPolicy != null) {
+            switch (flushBackoffPolicy.getBackoffType()) {
+                case CONSTANT:
+                    backoffPolicy =
+                            BackoffPolicy.constantBackoff(
+                                    new TimeValue(flushBackoffPolicy.getDelayMillis()),
+                                    flushBackoffPolicy.getMaxRetryCount());
+                    break;
+                case EXPONENTIAL:
+                default:
+                    backoffPolicy =
+                            BackoffPolicy.exponentialBackoff(
+                                    new TimeValue(flushBackoffPolicy.getDelayMillis()),
+                                    flushBackoffPolicy.getMaxRetryCount());
+            }
+        } else {
+            backoffPolicy = BackoffPolicy.noBackoff();
+        }
+
+        builder.setBackoffPolicy(backoffPolicy);
+    }
+
+    @Override
+    public RequestIndexer createBulkProcessorIndexer(
+            BulkProcessor bulkProcessor,
+            boolean flushOnCheckpoint,
+            AtomicLong numPendingRequestsRef) {
+        return new Elasticsearch7BulkProcessorIndexer(
+                bulkProcessor, flushOnCheckpoint, numPendingRequestsRef);
+    }
+
+    @Override
+    public void verifyClientConnection(RestHighLevelClient client) throws IOException {
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Pinging Elasticsearch cluster via hosts {} ...", httpHosts);
+        }
+
+        if (!client.ping(RequestOptions.DEFAULT)) {
+            throw new RuntimeException("There are no reachable Elasticsearch nodes!");
+        }
+
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Elasticsearch RestHighLevelClient is connected to {}", httpHosts.toString());
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/Elasticsearch7BulkProcessorIndexer.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/Elasticsearch7BulkProcessorIndexer.java
new file mode 100644
index 000000000..9094048a1
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/Elasticsearch7BulkProcessorIndexer.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch7;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}. {@link ActionRequest
+ * ActionRequests} will be buffered before sending a bulk request to the Elasticsearch cluster.
+ *
+ * <p>Note: This class is binary compatible to Elasticsearch 7.
+ */
+@Internal
+class Elasticsearch7BulkProcessorIndexer implements RequestIndexer {
+
+    private final BulkProcessor bulkProcessor;
+    private final boolean flushOnCheckpoint;
+    private final AtomicLong numPendingRequestsRef;
+
+    Elasticsearch7BulkProcessorIndexer(
+            BulkProcessor bulkProcessor,
+            boolean flushOnCheckpoint,
+            AtomicLong numPendingRequestsRef) {
+        this.bulkProcessor = checkNotNull(bulkProcessor);
+        this.flushOnCheckpoint = flushOnCheckpoint;
+        this.numPendingRequestsRef = checkNotNull(numPendingRequestsRef);
+    }
+
+    @Override
+    public void add(DeleteRequest... deleteRequests) {
+        for (DeleteRequest deleteRequest : deleteRequests) {
+            if (flushOnCheckpoint) {
+                numPendingRequestsRef.getAndIncrement();
+            }
+            this.bulkProcessor.add(deleteRequest);
+        }
+    }
+
+    @Override
+    public void add(IndexRequest... indexRequests) {
+        for (IndexRequest indexRequest : indexRequests) {
+            if (flushOnCheckpoint) {
+                numPendingRequestsRef.getAndIncrement();
+            }
+            this.bulkProcessor.add(indexRequest);
+        }
+    }
+
+    @Override
+    public void add(UpdateRequest... updateRequests) {
+        for (UpdateRequest updateRequest : updateRequests) {
+            if (flushOnCheckpoint) {
+                numPendingRequestsRef.getAndIncrement();
+            }
+            this.bulkProcessor.add(updateRequest);
+        }
+    }
+}
+
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java
new file mode 100644
index 000000000..595d6da1c
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch7;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Elasticsearch 7.x sink that requests multiple {@link ActionRequest ActionRequests} against a
+ * cluster for each incoming element.
+ *
+ * <p>The sink internally uses a {@link RestHighLevelClient} to communicate with an Elasticsearch
+ * cluster. The sink will fail if no cluster can be connected to using the provided transport
+ * addresses passed to the constructor.
+ *
+ * <p>Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest
+ * ActionRequests}. This will buffer elements before sending a request to the cluster. The behaviour
+ * of the {@code BulkProcessor} can be configured using these config keys:
+ *
+ * <ul>
+ *   <li>{@code bulk.flush.max.actions}: Maximum amount of elements to buffer
+ *   <li>{@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer
+ *   <li>{@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two
+ *       settings in milliseconds
+ * </ul>
+ *
+ * <p>You also have to provide an {@link ElasticsearchSinkFunction}. This is used to create multiple
+ * {@link ActionRequest ActionRequests} for each incoming element. See the class level documentation
+ * of {@link ElasticsearchSinkFunction} for an example.
+ *
+ * @param <T> Type of the elements handled by this sink
+ */
+@PublicEvolving
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevelClient> {
+
+    private static final long serialVersionUID = 1L;
+
+    private ElasticsearchSink(
+            Map<String, String> bulkRequestsConfig,
+            List<HttpHost> httpHosts,
+            ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+            ActionRequestFailureHandler failureHandler,
+            RestClientFactory restClientFactory) {
+
+        super(
+                new Elasticsearch7ApiCallBridge(httpHosts, restClientFactory),
+                bulkRequestsConfig,
+                elasticsearchSinkFunction,
+                failureHandler,
+                null);
+    }
+
+    /**
+     * A builder for creating an {@link ElasticsearchSink}.
+     *
+     * @param <T> Type of the elements handled by the sink this builder creates.
+     */
+    @PublicEvolving
+    public static class Builder<T> {
+
+        private final List<HttpHost> httpHosts;
+        private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
+
+        private Map<String, String> bulkRequestsConfig = new HashMap<>();
+        private ActionRequestFailureHandler failureHandler = new NoOpFailureHandler();
+        private RestClientFactory restClientFactory = restClientBuilder -> {};
+
+        /**
+         * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link
+         * RestHighLevelClient}.
+         *
+         * @param httpHosts The list of {@link HttpHost} to which the {@link RestHighLevelClient}
+         *     connects to.
+         * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest}
+         *     from the incoming element.
+         */
+        public Builder(
+                List<HttpHost> httpHosts, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+            this.httpHosts = Preconditions.checkNotNull(httpHosts);
+            this.elasticsearchSinkFunction = Preconditions.checkNotNull(elasticsearchSinkFunction);
+        }
+
+        /**
+         * Sets the maximum number of actions to buffer for each bulk request. You can pass -1 to
+         * disable it.
+         *
+         * @param numMaxActions the maximum number of actions to buffer per bulk request.
+         */
+        public void setBulkFlushMaxActions(int numMaxActions) {
+            Preconditions.checkArgument(
+                    numMaxActions == -1 || numMaxActions > 0,
+                    "Max number of buffered actions must be larger than 0.");
+
+            this.bulkRequestsConfig.put(
+                    CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, String.valueOf(numMaxActions));
+        }
+
+        /**
+         * Sets the maximum size of buffered actions, in mb, per bulk request. You can pass -1 to
+         * disable it.
+         *
+         * @param maxSizeMb the maximum size of buffered actions, in mb.
+         */
+        public void setBulkFlushMaxSizeMb(int maxSizeMb) {
+            Preconditions.checkArgument(
+                    maxSizeMb == -1 || maxSizeMb > 0,
+                    "Max size of buffered actions must be larger than 0.");
+
+            this.bulkRequestsConfig.put(
+                    CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, String.valueOf(maxSizeMb));
+        }
+
+        /**
+         * Sets the bulk flush interval, in milliseconds. You can pass -1 to disable it.
+         *
+         * @param intervalMillis the bulk flush interval, in milliseconds.
+         */
+        public void setBulkFlushInterval(long intervalMillis) {
+            Preconditions.checkArgument(
+                    intervalMillis == -1 || intervalMillis >= 0,
+                    "Interval (in milliseconds) between each flush must be larger than or equal to 0.");
+
+            this.bulkRequestsConfig.put(
+                    CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, String.valueOf(intervalMillis));
+        }
+
+        /**
+         * Sets whether or not to enable bulk flush backoff behaviour.
+         *
+         * @param enabled whether or not to enable backoffs.
+         */
+        public void setBulkFlushBackoff(boolean enabled) {
+            this.bulkRequestsConfig.put(
+                    CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, String.valueOf(enabled));
+        }
+
+        /**
+         * Sets the type of back of to use when flushing bulk requests.
+         *
+         * @param flushBackoffType the backoff type to use.
+         */
+        public void setBulkFlushBackoffType(FlushBackoffType flushBackoffType) {
+            this.bulkRequestsConfig.put(
+                    CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE,
+                    Preconditions.checkNotNull(flushBackoffType).toString());
+        }
+
+        /**
+         * Sets the maximum number of retries for a backoff attempt when flushing bulk requests.
+         *
+         * @param maxRetries the maximum number of retries for a backoff attempt when flushing bulk
+         *     requests
+         */
+        public void setBulkFlushBackoffRetries(int maxRetries) {
+            Preconditions.checkArgument(
+                    maxRetries > 0, "Max number of backoff attempts must be larger than 0.");
+
+            this.bulkRequestsConfig.put(
+                    CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES, String.valueOf(maxRetries));
+        }
+
+        /**
+         * Sets the amount of delay between each backoff attempt when flushing bulk requests, in
+         * milliseconds.
+         *
+         * @param delayMillis the amount of delay between each backoff attempt when flushing bulk
+         *     requests, in milliseconds.
+         */
+        public void setBulkFlushBackoffDelay(long delayMillis) {
+            Preconditions.checkArgument(
+                    delayMillis >= 0,
+                    "Delay (in milliseconds) between each backoff attempt must be larger than or equal to 0.");
+            this.bulkRequestsConfig.put(
+                    CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY, String.valueOf(delayMillis));
+        }
+
+        /**
+         * Sets a failure handler for action requests.
+         *
+         * @param failureHandler This is used to handle failed {@link ActionRequest}.
+         */
+        public void setFailureHandler(ActionRequestFailureHandler failureHandler) {
+            this.failureHandler = Preconditions.checkNotNull(failureHandler);
+        }
+
+        /**
+         * Sets a REST client factory for custom client configuration.
+         *
+         * @param restClientFactory the factory that configures the rest client.
+         */
+        public void setRestClientFactory(RestClientFactory restClientFactory) {
+            this.restClientFactory = Preconditions.checkNotNull(restClientFactory);
+        }
+
+        /**
+         * Creates the Elasticsearch sink.
+         *
+         * @return the created Elasticsearch sink.
+         */
+        public ElasticsearchSink<T> build() {
+            return new ElasticsearchSink<>(
+                    bulkRequestsConfig,
+                    httpHosts,
+                    elasticsearchSinkFunction,
+                    failureHandler,
+                    restClientFactory);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            Builder<?> builder = (Builder<?>) o;
+            return Objects.equals(httpHosts, builder.httpHosts)
+                    && Objects.equals(elasticsearchSinkFunction, builder.elasticsearchSinkFunction)
+                    && Objects.equals(bulkRequestsConfig, builder.bulkRequestsConfig)
+                    && Objects.equals(failureHandler, builder.failureHandler)
+                    && Objects.equals(restClientFactory, builder.restClientFactory);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(
+                    httpHosts,
+                    elasticsearchSinkFunction,
+                    bulkRequestsConfig,
+                    failureHandler,
+                    restClientFactory);
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
index be09fa99a..bddfa3ebe 100644
--- a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
@@ -21,7 +21,6 @@ package org.apache.inlong.sort.elasticsearch7.table;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
 import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
@@ -31,7 +30,6 @@ import org.apache.flink.table.connector.sink.SinkFunctionProvider;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.StringUtils;
-
 import org.apache.http.HttpHost;
 import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
@@ -42,6 +40,7 @@ import org.apache.inlong.sort.elasticsearch.table.KeyExtractor;
 import org.apache.inlong.sort.elasticsearch.table.RequestFactory;
 import org.apache.inlong.sort.elasticsearch.table.RoutingExtractor;
 import org.apache.inlong.sort.elasticsearch.table.RowElasticsearchSinkFunction;
+import org.apache.inlong.sort.elasticsearch7.ElasticsearchSink;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.update.UpdateRequest;
@@ -49,7 +48,6 @@ import org.elasticsearch.client.RestClientBuilder;
 import org.elasticsearch.common.xcontent.XContentType;
 
 import javax.annotation.Nullable;
-
 import java.util.List;
 import java.util.Objects;
 
@@ -59,6 +57,7 @@ import java.util.Objects;
  */
 @Internal
 final class Elasticsearch7DynamicSink implements DynamicTableSink {
+
     @VisibleForTesting
     static final Elasticsearch7RequestFactory REQUEST_FACTORY =
             new Elasticsearch7DynamicSink.Elasticsearch7RequestFactory();
@@ -66,13 +65,7 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
     private final EncodingFormat<SerializationSchema<RowData>> format;
     private final TableSchema schema;
     private final Elasticsearch7Configuration config;
-
-    public Elasticsearch7DynamicSink(
-            EncodingFormat<SerializationSchema<RowData>> format,
-            Elasticsearch7Configuration config,
-            TableSchema schema) {
-        this(format, config, schema, (ElasticsearchSink.Builder::new));
-    }
+    private final ElasticSearchBuilderProvider builderProvider;
 
     // --------------------------------------------------------------
     // Hack to make configuration testing possible.
@@ -84,12 +77,11 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
     // on the sink itself.
     // --------------------------------------------------------------
 
-    private final ElasticSearchBuilderProvider builderProvider;
-
-    @FunctionalInterface
-    interface ElasticSearchBuilderProvider {
-        ElasticsearchSink.Builder<RowData> createBuilder(
-                List<HttpHost> httpHosts, RowElasticsearchSinkFunction upsertSinkFunction);
+    public Elasticsearch7DynamicSink(
+            EncodingFormat<SerializationSchema<RowData>> format,
+            Elasticsearch7Configuration config,
+            TableSchema schema) {
+        this(format, config, schema, (ElasticsearchSink.Builder::new));
     }
 
     Elasticsearch7DynamicSink(
@@ -103,10 +95,6 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
         this.builderProvider = builderProvider;
     }
 
-    // --------------------------------------------------------------
-    // End of hack to make configuration testing possible
-    // --------------------------------------------------------------
-
     @Override
     public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
         ChangelogMode.Builder builder = ChangelogMode.newBuilder();
@@ -118,6 +106,10 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
         return builder.build();
     }
 
+    // --------------------------------------------------------------
+    // End of hack to make configuration testing possible
+    // --------------------------------------------------------------
+
     @Override
     public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
         return () -> {
@@ -133,7 +125,8 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
                             REQUEST_FACTORY,
                             KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()),
                             RoutingExtractor.createRoutingExtractor(
-                                    schema, config.getRoutingField().orElse(null)));
+                                    schema, config.getRoutingField().orElse(null)),
+                            null);
 
             final ElasticsearchSink.Builder<RowData> builder =
                     builderProvider.createBuilder(config.getHosts(), upsertFunction);
@@ -183,7 +176,36 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
         return "Elasticsearch7";
     }
 
-    /** Serializable {@link RestClientFactory} used by the sink. */
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Elasticsearch7DynamicSink that = (Elasticsearch7DynamicSink) o;
+        return Objects.equals(format, that.format)
+                && Objects.equals(schema, that.schema)
+                && Objects.equals(config, that.config)
+                && Objects.equals(builderProvider, that.builderProvider);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(format, schema, config, builderProvider);
+    }
+
+    @FunctionalInterface
+    interface ElasticSearchBuilderProvider {
+
+        ElasticsearchSink.Builder<RowData> createBuilder(
+                List<HttpHost> httpHosts, RowElasticsearchSinkFunction upsertSinkFunction);
+    }
+
+    /**
+     * Serializable {@link RestClientFactory} used by the sink.
+     */
     @VisibleForTesting
     static class DefaultRestClientFactory implements RestClientFactory {
 
@@ -218,7 +240,9 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
         }
     }
 
-    /** Serializable {@link RestClientFactory} used by the sink which enable authentication. */
+    /**
+     * Serializable {@link RestClientFactory} used by the sink which enable authentication.
+     */
     @VisibleForTesting
     static class AuthRestClientFactory implements RestClientFactory {
 
@@ -275,6 +299,7 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
      * sink.
      */
     private static class Elasticsearch7RequestFactory implements RequestFactory {
+
         @Override
         public UpdateRequest createUpdateRequest(
                 String index,
@@ -302,24 +327,4 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
             return new DeleteRequest(index, key);
         }
     }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        Elasticsearch7DynamicSink that = (Elasticsearch7DynamicSink) o;
-        return Objects.equals(format, that.format)
-                && Objects.equals(schema, that.schema)
-                && Objects.equals(config, that.config)
-                && Objects.equals(builderProvider, that.builderProvider);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(format, schema, config, builderProvider);
-    }
 }
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/pom.xml b/inlong-sort/sort-connectors/elasticsearch-base/pom.xml
index eaadca70e..146a76e9d 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/pom.xml
+++ b/inlong-sort/sort-connectors/elasticsearch-base/pom.xml
@@ -31,6 +31,11 @@
     <packaging>jar</packaging>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <!-- core dependencies -->
         <dependency>
             <groupId>org.apache.flink</groupId>
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/BufferingNoOpRequestIndexer.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/BufferingNoOpRequestIndexer.java
new file mode 100644
index 000000000..12b6e29ee
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/BufferingNoOpRequestIndexer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Collections;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Implementation of a {@link RequestIndexer} that buffers {@link ActionRequest ActionRequests}
+ * before re-sending them to the Elasticsearch cluster upon request.
+ */
+@Internal
+@NotThreadSafe
+class BufferingNoOpRequestIndexer implements RequestIndexer {
+
+    private ConcurrentLinkedQueue<ActionRequest> bufferedRequests;
+
+    BufferingNoOpRequestIndexer() {
+        this.bufferedRequests = new ConcurrentLinkedQueue<ActionRequest>();
+    }
+
+    @Override
+    public void add(DeleteRequest... deleteRequests) {
+        Collections.addAll(bufferedRequests, deleteRequests);
+    }
+
+    @Override
+    public void add(IndexRequest... indexRequests) {
+        Collections.addAll(bufferedRequests, indexRequests);
+    }
+
+    @Override
+    public void add(UpdateRequest... updateRequests) {
+        Collections.addAll(bufferedRequests, updateRequests);
+    }
+
+    void processBufferedRequests(RequestIndexer actualIndexer) {
+        for (ActionRequest request : bufferedRequests) {
+            if (request instanceof IndexRequest) {
+                actualIndexer.add((IndexRequest) request);
+            } else if (request instanceof DeleteRequest) {
+                actualIndexer.add((DeleteRequest) request);
+            } else if (request instanceof UpdateRequest) {
+                actualIndexer.add((UpdateRequest) request);
+            }
+        }
+
+        bufferedRequests.clear();
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchApiCallBridge.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchApiCallBridge.java
new file mode 100644
index 000000000..34c448518
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchApiCallBridge.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * An {@link ElasticsearchApiCallBridge} is used to bridge incompatible Elasticsearch Java API calls
+ * across different versions. This includes calls to create Elasticsearch clients, handle failed
+ * item responses, etc. Any incompatible Elasticsearch Java APIs should be bridged using this
+ * interface.
+ *
+ * <p>Implementations are allowed to be stateful. For example, for Elasticsearch 1.x, since
+ * connecting via an embedded node is allowed, the call bridge will hold reference to the created
+ * embedded node. Each instance of the sink will hold exactly one instance of the call bridge, and
+ * state cleanup is performed when the sink is closed.
+ *
+ * @param <C> The Elasticsearch client, that implements {@link AutoCloseable}.
+ */
+@Internal
+public interface ElasticsearchApiCallBridge<C extends AutoCloseable> extends Serializable {
+
+    /**
+     * Creates an Elasticsearch client implementing {@link AutoCloseable}.
+     *
+     * @param clientConfig The configuration to use when constructing the client.
+     * @return The created client.
+     */
+    C createClient(Map<String, String> clientConfig);
+
+    /**
+     * Creates a {@link BulkProcessor.Builder} for creating the bulk processor.
+     *
+     * @param client the Elasticsearch client.
+     * @param listener the bulk processor listener.
+     * @return the bulk processor builder.
+     */
+    BulkProcessor.Builder createBulkProcessorBuilder(C client, BulkProcessor.Listener listener);
+
+    /**
+     * Extracts the cause of failure of a bulk item action.
+     *
+     * @param bulkItemResponse the bulk item response to extract cause of failure
+     * @return the extracted {@link Throwable} from the response ({@code null} is the response is
+     *     successful).
+     */
+    @Nullable
+    Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse);
+
+    /**
+     * Set backoff-related configurations on the provided {@link BulkProcessor.Builder}. The builder
+     * will be later on used to instantiate the actual {@link BulkProcessor}.
+     *
+     * @param builder the {@link BulkProcessor.Builder} to configure.
+     * @param flushBackoffPolicy user-provided backoff retry settings ({@code null} if the user
+     *     disabled backoff retries).
+     */
+    void configureBulkProcessorBackoff(
+            BulkProcessor.Builder builder,
+            @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy);
+
+    /**
+     * Verify the client connection by making a test request/ping to the Elasticsearch cluster.
+     *
+     * <p>Called by {@link ElasticsearchSinkBase#open(org.apache.flink.configuration.Configuration)}
+     * after creating the client. This makes sure the underlying client is closed if the connection
+     * is not successful and preventing thread leak.
+     *
+     * @param client the Elasticsearch client.
+     */
+    void verifyClientConnection(C client) throws IOException;
+
+    /**
+     * Creates a {@link RequestIndexer} that is able to work with {@link BulkProcessor} binary
+     * compatible.
+     */
+    default RequestIndexer createBulkProcessorIndexer(
+            BulkProcessor bulkProcessor,
+            boolean flushOnCheckpoint,
+            AtomicLong numPendingRequestsRef) {
+        return new PreElasticsearch6BulkProcessorIndexer(
+                bulkProcessor, flushOnCheckpoint, numPendingRequestsRef);
+    }
+
+    /** Perform any necessary state cleanup. */
+    default void cleanup() {
+        // nothing to cleanup by default
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
new file mode 100644
index 000000000..4dd35d060
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.rest.RestStatus;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Elasticsearch Sinks.
+ *
+ * <p>This class implements the common behaviour across Elasticsearch versions, such as the use of
+ * an internal {@link BulkProcessor} to buffer multiple {@link ActionRequest}s before sending the
+ * requests to the cluster, as well as passing input records to the user provided {@link
+ * ElasticsearchSinkFunction} for processing.
+ *
+ * <p>The version specific API calls for different Elasticsearch versions should be defined by a
+ * concrete implementation of a {@link ElasticsearchApiCallBridge}, which is provided to the
+ * constructor of this class. This call bridge is used, for example, to create a Elasticsearch
+ * {@link Client}, handle failed item responses, etc.
+ *
+ * @param <T> Type of the elements handled by this sink
+ * @param <C> Type of the Elasticsearch client, which implements {@link AutoCloseable}
+ */
+@Internal
+public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends RichSinkFunction<T>
+        implements CheckpointedFunction {
+
+    public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
+
+    // ------------------------------------------------------------------------
+    //  Internal bulk processor configuration
+    // ------------------------------------------------------------------------
+    public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
+    public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
+    public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable";
+    public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type";
+    public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries";
+    public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay";
+    private static final long serialVersionUID = -1007596293618451942L;
+    private final Integer bulkProcessorFlushMaxActions;
+    private final Integer bulkProcessorFlushMaxSizeMb;
+    private final Long bulkProcessorFlushIntervalMillis;
+    private final BulkFlushBackoffPolicy bulkProcessorFlushBackoffPolicy;
+    /**
+     * The config map that contains configuration for the bulk flushing behaviours.
+     *
+     * <p>For {@link org.elasticsearch.client.transport.TransportClient} based implementations, this
+     * config map would also contain Elasticsearch-shipped configuration, and therefore this config
+     * map would also be forwarded when creating the Elasticsearch client.
+     */
+    private final Map<String, String> userConfig;
+    /**
+     * The function that is used to construct multiple {@link ActionRequest ActionRequests} from
+     * each incoming element.
+     */
+    private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
+
+    // ------------------------------------------------------------------------
+    //  User-facing API and configuration
+    // ------------------------------------------------------------------------
+    /**
+     * User-provided handler for failed {@link ActionRequest ActionRequests}.
+     */
+    private final ActionRequestFailureHandler failureHandler;
+    /**
+     * Call bridge for different version-specific.
+     */
+    private final ElasticsearchApiCallBridge<C> callBridge;
+    /**
+     * This is set from inside the {@link BulkProcessor.Listener} if a {@link Throwable} was thrown
+     * in callbacks and the user considered it should fail the sink via the {@link
+     * ActionRequestFailureHandler#onFailure(ActionRequest, Throwable, int, RequestIndexer)} method.
+     *
+     * <p>Errors will be checked and rethrown before processing each input element, and when the
+     * sink is closed.
+     */
+    private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
+    private final String inLongMetric;
+    /**
+     * If true, the producer will wait until all outstanding action requests have been sent to
+     * Elasticsearch.
+     */
+    private boolean flushOnCheckpoint = true;
+    /**
+     * Provided to the user via the {@link ElasticsearchSinkFunction} to add {@link ActionRequest
+     * ActionRequests}.
+     */
+    private transient RequestIndexer requestIndexer;
+
+    // ------------------------------------------------------------------------
+    //  Internals for the Flink Elasticsearch Sink
+    // ------------------------------------------------------------------------
+    /**
+     * Provided to the {@link ActionRequestFailureHandler} to allow users to re-index failed
+     * requests.
+     */
+    private transient BufferingNoOpRequestIndexer failureRequestIndexer;
+    /**
+     * Number of pending action requests not yet acknowledged by Elasticsearch. This value is
+     * maintained only if {@link ElasticsearchSinkBase#flushOnCheckpoint} is {@code true}.
+     *
+     * <p>This is incremented whenever the user adds (or re-adds through the {@link
+     * ActionRequestFailureHandler}) requests to the {@link RequestIndexer}. It is decremented for
+     * each completed request of a bulk request, in {@link BulkProcessor.Listener#afterBulk(long,
+     * BulkRequest, BulkResponse)} and {@link BulkProcessor.Listener#afterBulk(long, BulkRequest,
+     * Throwable)}.
+     */
+    private AtomicLong numPendingRequests = new AtomicLong(0);
+    /**
+     * Elasticsearch client created using the call bridge.
+     */
+    private transient C client;
+    /**
+     * Bulk processor to buffer and send requests to Elasticsearch, created using the client.
+     */
+    private transient BulkProcessor bulkProcessor;
+    private SinkMetricData sinkMetricData;
+
+    public ElasticsearchSinkBase(
+            ElasticsearchApiCallBridge<C> callBridge,
+            Map<String, String> userConfig,
+            ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+            ActionRequestFailureHandler failureHandler,
+            String inLongMetric) {
+        this.inLongMetric = inLongMetric;
+        this.callBridge = checkNotNull(callBridge);
+        this.elasticsearchSinkFunction = checkNotNull(elasticsearchSinkFunction);
+        this.failureHandler = checkNotNull(failureHandler);
+        // we eagerly check if the user-provided sink function and failure handler is serializable;
+        // otherwise, if they aren't serializable, users will merely get a non-informative error
+        // message
+        // "ElasticsearchSinkBase is not serializable"
+
+        checkArgument(
+                InstantiationUtil.isSerializable(elasticsearchSinkFunction),
+                "The implementation of the provided ElasticsearchSinkFunction is not serializable. "
+                        + "The object probably contains or references non-serializable fields.");
+
+        checkArgument(
+                InstantiationUtil.isSerializable(failureHandler),
+                "The implementation of the provided ActionRequestFailureHandler is not serializable. "
+                        + "The object probably contains or references non-serializable fields.");
+
+        // extract and remove bulk processor related configuration from the user-provided config,
+        // so that the resulting user config only contains configuration related to the
+        // Elasticsearch client.
+
+        checkNotNull(userConfig);
+
+        // copy config so we can remove entries without side-effects
+        userConfig = new HashMap<>(userConfig);
+
+        ParameterTool params = ParameterTool.fromMap(userConfig);
+
+        if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
+            bulkProcessorFlushMaxActions = params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS);
+            userConfig.remove(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS);
+        } else {
+            bulkProcessorFlushMaxActions = null;
+        }
+
+        if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
+            bulkProcessorFlushMaxSizeMb = params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB);
+            userConfig.remove(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB);
+        } else {
+            bulkProcessorFlushMaxSizeMb = null;
+        }
+
+        if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
+            bulkProcessorFlushIntervalMillis = params.getLong(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
+            userConfig.remove(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
+        } else {
+            bulkProcessorFlushIntervalMillis = null;
+        }
+
+        boolean bulkProcessorFlushBackoffEnable =
+                params.getBoolean(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, true);
+        userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE);
+
+        if (bulkProcessorFlushBackoffEnable) {
+            this.bulkProcessorFlushBackoffPolicy = new BulkFlushBackoffPolicy();
+
+            if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE)) {
+                bulkProcessorFlushBackoffPolicy.setBackoffType(
+                        FlushBackoffType.valueOf(params.get(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE)));
+                userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE);
+            }
+
+            if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES)) {
+                bulkProcessorFlushBackoffPolicy.setMaxRetryCount(
+                        params.getInt(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES));
+                userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES);
+            }
+
+            if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY)) {
+                bulkProcessorFlushBackoffPolicy.setDelayMillis(
+                        params.getLong(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY));
+                userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY);
+            }
+
+        } else {
+            bulkProcessorFlushBackoffPolicy = null;
+        }
+
+        this.userConfig = userConfig;
+    }
+
+    /**
+     * Disable flushing on checkpoint. When disabled, the sink will not wait for all pending action
+     * requests to be acknowledged by Elasticsearch on checkpoints.
+     *
+     * <p>NOTE: If flushing on checkpoint is disabled, the Flink Elasticsearch Sink does NOT provide
+     * any strong guarantees for at-least-once delivery of action requests.
+     */
+    public void disableFlushOnCheckpoint() {
+        this.flushOnCheckpoint = false;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        client = callBridge.createClient(userConfig);
+        sinkMetricData = new SinkMetricData(getRuntimeContext().getMetricGroup());
+        if (inLongMetric != null && !inLongMetric.isEmpty()) {
+            String[] inLongMetricArray = inLongMetric.split("&");
+            String groupId = inLongMetricArray[0];
+            String streamId = inLongMetricArray[1];
+            String nodeId = inLongMetricArray[2];
+            sinkMetricData.registerMetricsForDirtyBytes(groupId, streamId, nodeId, "dirtyBytes");
+            sinkMetricData.registerMetricsForDirtyRecords(groupId, streamId, nodeId, "dirtyRecords");
+            sinkMetricData.registerMetricsForNumBytesOut(groupId, streamId, nodeId, "numBytesOut");
+            sinkMetricData.registerMetricsForNumRecordsOut(groupId, streamId, nodeId, "numRecordsOut");
+            sinkMetricData.registerMetricsForNumBytesOutPerSecond(groupId, streamId, nodeId,
+                    "numBytesOutPerSecond");
+            sinkMetricData.registerMetricsForNumRecordsOutPerSecond(groupId, streamId, nodeId,
+                    "numRecordsOutPerSecond");
+        }
+        callBridge.verifyClientConnection(client);
+        bulkProcessor = buildBulkProcessor(new BulkProcessorListener(sinkMetricData));
+        requestIndexer =
+                callBridge.createBulkProcessorIndexer(
+                        bulkProcessor, flushOnCheckpoint, numPendingRequests);
+        failureRequestIndexer = new BufferingNoOpRequestIndexer();
+        elasticsearchSinkFunction.open(getRuntimeContext());
+    }
+
+    @Override
+    public void invoke(T value, Context context) throws Exception {
+        checkAsyncErrorsAndRequests();
+        elasticsearchSinkFunction.process(value, getRuntimeContext(), requestIndexer);
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        // no initialization needed
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        checkAsyncErrorsAndRequests();
+
+        if (flushOnCheckpoint) {
+            while (numPendingRequests.get() != 0) {
+                bulkProcessor.flush();
+                checkAsyncErrorsAndRequests();
+            }
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        elasticsearchSinkFunction.close();
+        if (bulkProcessor != null) {
+            bulkProcessor.close();
+            bulkProcessor = null;
+        }
+
+        if (client != null) {
+            client.close();
+            client = null;
+        }
+
+        callBridge.cleanup();
+
+        // make sure any errors from callbacks are rethrown
+        checkErrorAndRethrow();
+    }
+
+    /**
+     * Build the {@link BulkProcessor}.
+     *
+     * <p>Note: this is exposed for testing purposes.
+     */
+    @VisibleForTesting
+    protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
+        checkNotNull(listener);
+
+        BulkProcessor.Builder bulkProcessorBuilder =
+                callBridge.createBulkProcessorBuilder(client, listener);
+
+        // This makes flush() blocking
+        bulkProcessorBuilder.setConcurrentRequests(0);
+
+        if (bulkProcessorFlushMaxActions != null) {
+            bulkProcessorBuilder.setBulkActions(bulkProcessorFlushMaxActions);
+        }
+
+        if (bulkProcessorFlushMaxSizeMb != null) {
+            configureBulkSize(bulkProcessorBuilder);
+        }
+
+        if (bulkProcessorFlushIntervalMillis != null) {
+            configureFlushInterval(bulkProcessorBuilder);
+        }
+
+        // if backoff retrying is disabled, bulkProcessorFlushBackoffPolicy will be null
+        callBridge.configureBulkProcessorBackoff(
+                bulkProcessorBuilder, bulkProcessorFlushBackoffPolicy);
+
+        return bulkProcessorBuilder.build();
+    }
+
+    private void configureBulkSize(BulkProcessor.Builder bulkProcessorBuilder) {
+        final ByteSizeUnit sizeUnit;
+        if (bulkProcessorFlushMaxSizeMb == -1) {
+            // bulk size can be disabled with -1, however the ByteSizeValue constructor accepts -1
+            // only with BYTES as the size unit
+            sizeUnit = ByteSizeUnit.BYTES;
+        } else {
+            sizeUnit = ByteSizeUnit.MB;
+        }
+        bulkProcessorBuilder.setBulkSize(new ByteSizeValue(bulkProcessorFlushMaxSizeMb, sizeUnit));
+    }
+
+    private void configureFlushInterval(BulkProcessor.Builder bulkProcessorBuilder) {
+        if (bulkProcessorFlushIntervalMillis == -1) {
+            bulkProcessorBuilder.setFlushInterval(null);
+        } else {
+            bulkProcessorBuilder.setFlushInterval(
+                    TimeValue.timeValueMillis(bulkProcessorFlushIntervalMillis));
+        }
+    }
+
+    private void checkErrorAndRethrow() {
+        Throwable cause = failureThrowable.get();
+        if (cause != null) {
+            throw new RuntimeException("An error occurred in ElasticsearchSink.", cause);
+        }
+    }
+
+    private void checkAsyncErrorsAndRequests() {
+        checkErrorAndRethrow();
+        failureRequestIndexer.processBufferedRequests(requestIndexer);
+    }
+
+    @VisibleForTesting
+    long getNumPendingRequests() {
+        if (flushOnCheckpoint) {
+            return numPendingRequests.get();
+        } else {
+            throw new UnsupportedOperationException(
+                    "The number of pending requests is not maintained when flushing on checkpoint is disabled.");
+        }
+    }
+
+    /**
+     * Used to control whether the retry delay should increase exponentially or remain constant.
+     */
+    @PublicEvolving
+    public enum FlushBackoffType {
+        CONSTANT,
+        EXPONENTIAL
+    }
+
+    /**
+     * Provides a backoff policy for bulk requests. Whenever a bulk request is rejected due to
+     * resource constraints (i.e. the client's internal thread pool is full), the backoff policy
+     * decides how long the bulk processor will wait before the operation is retried internally.
+     *
+     * <p>This is a proxy for version specific backoff policies.
+     */
+    public static class BulkFlushBackoffPolicy implements Serializable {
+
+        private static final long serialVersionUID = -6022851996101826049L;
+
+        // the default values follow the Elasticsearch default settings for BulkProcessor
+        private FlushBackoffType backoffType = FlushBackoffType.EXPONENTIAL;
+        private int maxRetryCount = 8;
+        private long delayMillis = 50;
+
+        public FlushBackoffType getBackoffType() {
+            return backoffType;
+        }
+
+        public void setBackoffType(FlushBackoffType backoffType) {
+            this.backoffType = checkNotNull(backoffType);
+        }
+
+        public int getMaxRetryCount() {
+            return maxRetryCount;
+        }
+
+        public void setMaxRetryCount(int maxRetryCount) {
+            checkArgument(maxRetryCount >= 0);
+            this.maxRetryCount = maxRetryCount;
+        }
+
+        public long getDelayMillis() {
+            return delayMillis;
+        }
+
+        public void setDelayMillis(long delayMillis) {
+            checkArgument(delayMillis >= 0);
+            this.delayMillis = delayMillis;
+        }
+    }
+
+    private class BulkProcessorListener implements BulkProcessor.Listener {
+
+        private SinkMetricData sinkMetricData;
+
+        public BulkProcessorListener(SinkMetricData sinkMetricData) {
+            this.sinkMetricData = sinkMetricData;
+        }
+
+        @Override
+        public void beforeBulk(long executionId, BulkRequest request) {
+        }
+
+        @Override
+        public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+            if (response.hasFailures()) {
+                BulkItemResponse itemResponse;
+                Throwable failure;
+                RestStatus restStatus;
+                DocWriteRequest actionRequest;
+
+                try {
+                    for (int i = 0; i < response.getItems().length; i++) {
+                        itemResponse = response.getItems()[i];
+                        failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
+                        if (failure != null) {
+                            restStatus = itemResponse.getFailure().getStatus();
+                            actionRequest = request.requests().get(i);
+                            if (sinkMetricData.getDirtyRecords() != null) {
+                                sinkMetricData.getDirtyRecords().inc();
+                            }
+                            if (restStatus == null) {
+                                if (actionRequest instanceof ActionRequest) {
+                                    failureHandler.onFailure(
+                                            (ActionRequest) actionRequest,
+                                            failure,
+                                            -1,
+                                            failureRequestIndexer);
+                                } else {
+                                    throw new UnsupportedOperationException(
+                                            "The sink currently only supports ActionRequests");
+                                }
+                            } else {
+                                if (actionRequest instanceof ActionRequest) {
+                                    failureHandler.onFailure(
+                                            (ActionRequest) actionRequest,
+                                            failure,
+                                            restStatus.getStatus(),
+                                            failureRequestIndexer);
+                                } else {
+                                    throw new UnsupportedOperationException(
+                                            "The sink currently only supports ActionRequests");
+                                }
+                            }
+                        }
+                        if (sinkMetricData.getNumRecordsOut() != null) {
+                            sinkMetricData.getNumRecordsOut().inc();
+                        }
+                    }
+                } catch (Throwable t) {
+                    // fail the sink and skip the rest of the items
+                    // if the failure handler decides to throw an exception
+                    failureThrowable.compareAndSet(null, t);
+                }
+            }
+
+            if (flushOnCheckpoint) {
+                numPendingRequests.getAndAdd(-request.numberOfActions());
+            }
+        }
+
+        @Override
+        public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+            try {
+                for (DocWriteRequest writeRequest : request.requests()) {
+                    if (sinkMetricData.getDirtyRecords() != null) {
+                        sinkMetricData.getDirtyRecords().inc();
+                    }
+                    if (writeRequest instanceof ActionRequest) {
+                        failureHandler.onFailure(
+                                (ActionRequest) writeRequest, failure, -1, failureRequestIndexer);
+                    } else {
+                        throw new UnsupportedOperationException(
+                                "The sink currently only supports ActionRequests");
+                    }
+                }
+            } catch (Throwable t) {
+                // fail the sink and skip the rest of the items
+                // if the failure handler decides to throw an exception
+                failureThrowable.compareAndSet(null, t);
+            }
+
+            if (flushOnCheckpoint) {
+                numPendingRequests.getAndAdd(-request.numberOfActions());
+            }
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
new file mode 100644
index 000000000..dc4bf5af1
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.elasticsearch.action.ActionRequest;
+
+import java.io.Serializable;
+
+/**
+ * Creates multiple {@link ActionRequest ActionRequests} from an element in a stream.
+ *
+ * This is used by sinks to prepare elements for sending them to Elasticsearch.
+ *
+ *
+ * @param <T> The type of the element handled by this {@code ElasticsearchSinkFunction}
+ */
+@PublicEvolving
+public interface ElasticsearchSinkFunction<T> extends Serializable, Function {
+
+    /**
+     * Initialization method for the function. It is called once before the actual working process
+     * methods.
+     */
+    default void open(RuntimeContext ctx) throws Exception {
+    }
+
+    /**
+     * Tear-down method for the function. It is called when the sink closes.
+     */
+    default void close() throws Exception {
+    }
+
+    /**
+     * Process the incoming element to produce multiple {@link ActionRequest ActionsRequests}. The
+     * produced requests should be added to the provided {@link RequestIndexer}.
+     *
+     * @param element incoming element to process
+     * @param ctx runtime context containing information about the sink instance
+     * @param indexer request indexer that {@code ActionRequest} should be added to
+     */
+    void process(T element, RuntimeContext ctx, RequestIndexer indexer);
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/PreElasticsearch6BulkProcessorIndexer.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/PreElasticsearch6BulkProcessorIndexer.java
new file mode 100644
index 000000000..be7b9668c
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/PreElasticsearch6BulkProcessorIndexer.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}. {@link ActionRequest
+ * ActionRequests} will be buffered before sending a bulk request to the Elasticsearch cluster.
+ *
+ * @deprecated This class is not binary compatible with newer Elasticsearch 6+ versions (i.e. the
+ *     {@link #add(UpdateRequest...)} ). However, this module is currently compiled against a very
+ *     old Elasticsearch version.
+ */
+@Deprecated
+@Internal
+class PreElasticsearch6BulkProcessorIndexer implements RequestIndexer {
+
+    private final BulkProcessor bulkProcessor;
+    private final boolean flushOnCheckpoint;
+    private final AtomicLong numPendingRequestsRef;
+
+    PreElasticsearch6BulkProcessorIndexer(
+            BulkProcessor bulkProcessor,
+            boolean flushOnCheckpoint,
+            AtomicLong numPendingRequestsRef) {
+        this.bulkProcessor = checkNotNull(bulkProcessor);
+        this.flushOnCheckpoint = flushOnCheckpoint;
+        this.numPendingRequestsRef = checkNotNull(numPendingRequestsRef);
+    }
+
+    @Override
+    public void add(DeleteRequest... deleteRequests) {
+        for (DeleteRequest deleteRequest : deleteRequests) {
+            if (flushOnCheckpoint) {
+                numPendingRequestsRef.getAndIncrement();
+            }
+            this.bulkProcessor.add(deleteRequest);
+        }
+    }
+
+    @Override
+    public void add(IndexRequest... indexRequests) {
+        for (IndexRequest indexRequest : indexRequests) {
+            if (flushOnCheckpoint) {
+                numPendingRequestsRef.getAndIncrement();
+            }
+            this.bulkProcessor.add(indexRequest);
+        }
+    }
+
+    @Override
+    public void add(UpdateRequest... updateRequests) {
+        for (UpdateRequest updateRequest : updateRequests) {
+            if (flushOnCheckpoint) {
+                numPendingRequestsRef.getAndIncrement();
+            }
+            this.bulkProcessor.add(updateRequest);
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java
index 86ed88875..07d8adbf9 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java
@@ -20,12 +20,12 @@ package org.apache.inlong.sort.elasticsearch.table;
 
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase;
 
 import java.time.Duration;
 import java.util.Objects;
@@ -39,8 +39,11 @@ import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.FA
 import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.PASSWORD_OPTION;
 import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.USERNAME_OPTION;
 
-/** Accessor methods to elasticsearch options. */
+/**
+ * Accessor methods to elasticsearch options.
+ */
 public class ElasticsearchConfiguration {
+
     protected final ReadableConfig config;
     private final ClassLoader classLoader;
 
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchOptions.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchOptions.java
index 0d74c2c3b..6954bd36a 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchOptions.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchOptions.java
@@ -22,7 +22,7 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.description.Description;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase;
 
 import java.time.Duration;
 import java.util.List;
@@ -33,15 +33,6 @@ import static org.apache.flink.configuration.description.TextElement.text;
  * Options for {@link org.apache.flink.table.factories.DynamicTableSinkFactory} for Elasticsearch.
  */
 public class ElasticsearchOptions {
-    /**
-     * Backoff strategy. Extends {@link ElasticsearchSinkBase.FlushBackoffType} with {@code
-     * DISABLED} option.
-     */
-    public enum BackOffType {
-        DISABLED,
-        CONSTANT,
-        EXPONENTIAL
-    }
 
     public static final ConfigOption<List<String>> HOSTS_OPTION =
             ConfigOptions.key("hosts")
@@ -75,13 +66,11 @@ public class ElasticsearchOptions {
                     .defaultValue("_")
                     .withDescription(
                             "Delimiter for composite keys e.g., \"$\" would result in IDs \"KEY1$KEY2$KEY3\".");
-
     public static final ConfigOption<String> ROUTING_FIELD_NAME =
             ConfigOptions.key("routing.field-name")
                     .stringType()
                     .noDefaultValue()
                     .withDescription("Elasticsearch routing filed.");
-
     public static final ConfigOption<String> FAILURE_HANDLER_OPTION =
             ConfigOptions.key("failure-handler")
                     .stringType()
@@ -159,4 +148,14 @@ public class ElasticsearchOptions {
     private ElasticsearchOptions() {
 
     }
+
+    /**
+     * Backoff strategy. Extends {@link ElasticsearchSinkBase.FlushBackoffType} with {@code
+     * DISABLED} option.
+     */
+    public enum BackOffType {
+        DISABLED,
+        CONSTANT,
+        EXPONENTIAL
+    }
 }
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
index 2e227ad55..8af55faaf 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -20,12 +20,12 @@ package org.apache.inlong.sort.elasticsearch.table;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.Preconditions;
-
+import org.apache.inlong.sort.base.metric.SinkMetricData;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.delete.DeleteRequest;
@@ -49,9 +49,16 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
     private final XContentType contentType;
     private final RequestFactory requestFactory;
     private final Function<RowData, String> createKey;
+    private final String inLongMetric;
 
     private final Function<RowData, String> createRouting;
 
+    private transient  RuntimeContext runtimeContext;
+
+    private SinkMetricData sinkMetricData;
+    private Long dataSize = 0L;
+    private Long rowSize = 0L;
+
     public RowElasticsearchSinkFunction(
             IndexGenerator indexGenerator,
             @Nullable String docType, // this is deprecated in es 7+
@@ -59,7 +66,8 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
             XContentType contentType,
             RequestFactory requestFactory,
             Function<RowData, String> createKey,
-            @Nullable Function<RowData, String> createRouting) {
+            @Nullable Function<RowData, String> createRouting,
+            String inLongMetric) {
         this.indexGenerator = Preconditions.checkNotNull(indexGenerator);
         this.docType = docType;
         this.serializationSchema = Preconditions.checkNotNull(serializationSchema);
@@ -67,11 +75,27 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
         this.requestFactory = Preconditions.checkNotNull(requestFactory);
         this.createKey = Preconditions.checkNotNull(createKey);
         this.createRouting = createRouting;
+        this.inLongMetric = inLongMetric;
     }
 
     @Override
-    public void open() {
+    public void open(RuntimeContext ctx) {
         indexGenerator.open();
+        this.runtimeContext = ctx;
+        sinkMetricData = new SinkMetricData(runtimeContext.getMetricGroup());
+        if (inLongMetric != null && !inLongMetric.isEmpty()) {
+            String[] inLongMetricArray = inLongMetric.split("&");
+            String groupId = inLongMetricArray[0];
+            String streamId = inLongMetricArray[1];
+            String nodeId = inLongMetricArray[2];
+            sinkMetricData.registerMetricsForDirtyBytes(groupId, streamId, nodeId, "dirtyBytes");
+            sinkMetricData.registerMetricsForDirtyRecords(groupId, streamId, nodeId, "dirtyRecords");
+            sinkMetricData.registerMetricsForNumBytesOut(groupId, streamId, nodeId, "numBytesOut");
+            sinkMetricData.registerMetricsForNumRecordsOut(groupId, streamId, nodeId, "numRecordsOut");
+            sinkMetricData.registerMetricsForNumBytesOutPerSecond(groupId, streamId, nodeId, "numBytesOutPerSecond");
+            sinkMetricData.registerMetricsForNumRecordsOutPerSecond(groupId, streamId, nodeId,
+                    "numRecordsOutPerSecond");
+        }
     }
 
     @Override
@@ -93,6 +117,9 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
     private void processUpsert(RowData row, RequestIndexer indexer) {
         final byte[] document = serializationSchema.serialize(row);
         final String key = createKey.apply(row);
+        if (sinkMetricData.getNumBytesOut() != null) {
+            sinkMetricData.getNumBytesOut().inc(document.length);
+        }
         if (key != null) {
             final UpdateRequest updateRequest =
                     requestFactory.createUpdateRequest(
@@ -137,7 +164,8 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
                 && Objects.equals(serializationSchema, that.serializationSchema)
                 && contentType == that.contentType
                 && Objects.equals(requestFactory, that.requestFactory)
-                && Objects.equals(createKey, that.createKey);
+                && Objects.equals(createKey, that.createKey)
+                && Objects.equals(inLongMetric, that.inLongMetric);
     }
 
     @Override
@@ -148,6 +176,7 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
                 serializationSchema,
                 contentType,
                 requestFactory,
-                createKey);
+                createKey,
+                inLongMetric);
     }
 }