You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/08/05 15:59:34 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] new connecotor of Elasticsearch sink(#2326) (#2330)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2a1fd5027 [Feature][Connector-V2] new connecotor of Elasticsearch sink(#2326) (#2330)
2a1fd5027 is described below

commit 2a1fd5027f0eea2c929d257f5a150bb83cb6df63
Author: iture123 <10...@qq.com>
AuthorDate: Fri Aug 5 23:59:28 2022 +0800

    [Feature][Connector-V2] new connecotor of Elasticsearch sink(#2326) (#2330)
---
 docs/en/new-connector/sink/Elasticsearch.md        |  56 +++++++++
 plugin-mapping.properties                          |   1 +
 pom.xml                                            |   1 +
 seatunnel-connectors-v2-dist/pom.xml               |   5 +
 .../{ => connector-elasticsearch}/pom.xml          |  52 +++------
 .../elasticsearch/client/EsRestClient.java         | 123 ++++++++++++++++++++
 .../seatunnel/elasticsearch/config/SinkConfig.java |  47 ++++++++
 .../elasticsearch/constant/BulkConfig.java         |  37 ++++++
 .../constant/ElasticsearchVersion.java             |  51 ++++++++
 .../seatunnel/elasticsearch/dto/BulkResponse.java  |  61 ++++++++++
 .../seatunnel/elasticsearch/dto/IndexInfo.java     |  53 +++++++++
 .../exception/BulkElasticsearchException.java      |  30 +++++
 .../GetElasticsearchVersionException.java          |  25 ++++
 .../serialize/ElasticsearchRowSerializer.java      |  80 +++++++++++++
 .../serialize/SeaTunnelRowSerializer.java          |  26 +++++
 .../serialize/index/IndexSerializer.java           |  28 +++++
 .../serialize/index/IndexSerializerFactory.java    |  37 ++++++
 .../index/impl/FixedValueIndexSerializer.java      |  38 ++++++
 .../index/impl/VariableIndexSerializer.java        |  69 +++++++++++
 .../serialize/type/IndexTypeSerializer.java        |  27 +++++
 .../serialize/type/IndexTypeSerializerFactory.java |  49 ++++++++
 .../type/impl/NotIndexTypeSerializer.java          |  35 ++++++
 .../type/impl/RequiredIndexTypeSerializer.java     |  39 +++++++
 .../elasticsearch/sink/ElasticsearchSink.java      |  76 ++++++++++++
 .../sink/ElasticsearchSinkWriter.java              | 129 +++++++++++++++++++++
 .../state/ElasticsearchAggregatedCommitInfo.java   |  27 +++++
 .../state/ElasticsearchCommitInfo.java             |  35 ++++++
 .../state/ElasticsearchSinkState.java              |  24 ++++
 .../seatunnel/elasticsearch/util/RegexUtils.java   |  39 +++++++
 seatunnel-connectors-v2/pom.xml                    |   1 +
 30 files changed, 1266 insertions(+), 35 deletions(-)

diff --git a/docs/en/new-connector/sink/Elasticsearch.md b/docs/en/new-connector/sink/Elasticsearch.md
new file mode 100644
index 000000000..c8fbb551e
--- /dev/null
+++ b/docs/en/new-connector/sink/Elasticsearch.md
@@ -0,0 +1,56 @@
+# Elasticsearch
+
+## Description
+
+Output data to `Elasticsearch`.
+
+:::tip
+
+Engine Supported
+
+* supported  `ElasticSearch version is >= 2.x and < 8.x`
+
+:::
+
+## Options
+
+| name              | type   | required | default value | 
+|-------------------| ------ | -------- |---------------|
+| hosts             | array  | yes      | -             |
+| index             | string | yes      | -             |
+| index_type        | string | no       |               |
+| username          | string | no       |               |
+| password          | string | no       |               | 
+| max_retry_size    | int    | no       | 3             |
+| max_batch_size    | int    | no       | 10            |
+
+
+
+### hosts [array]
+`Elasticsearch` cluster http address, the format is `host:port` , allowing multiple hosts to be specified. Such as `["host1:9200", "host2:9200"]`.
+
+### index [string]
+`Elasticsearch`  `index` name.Index support contains variables of field name,such as `seatunnel_${age}`,and the field must appear at seatunnel row.
+
+### index_type [string]
+`Elasticsearch` index type, it is recommended not to specify in elasticsearch 6 and above
+
+### username [string]
+x-pack username
+
+### password [string]
+x-pack password
+
+### max_retry_size [int]
+one bulk request max try size
+
+### max_batch_size [int]
+batch bulk doc max size
+
+## Examples
+```bash
+Elasticsearch {
+    hosts = ["localhost:9200"]
+    index = "seatunnel-${age}"
+}
+```
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index c6c3f16f5..ac7497907 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -111,3 +111,4 @@ seatunnel.sink.LocalFile = connector-file-local
 seatunnel.source.Pulsar = connector-pulsar
 seatunnel.source.Hudi = connector-hudi
 seatunnel.sink.DingTalk = connector-dingtalk
+seatunnel.sink.elasticsearch = connector-elasticsearch
diff --git a/pom.xml b/pom.xml
index 188a1ba57..e9e5ff448 100644
--- a/pom.xml
+++ b/pom.xml
@@ -223,6 +223,7 @@
         <hibernate.validator.version>6.2.2.Final</hibernate.validator.version>
         <jsoup.version>1.14.3</jsoup.version>
         <javax.annotation-api.version>1.3.2</javax.annotation-api.version>
+        <elasticsearch-rest-client.version>7.5.1</elasticsearch-rest-client.version>
     </properties>
 
     <dependencyManagement>
diff --git a/seatunnel-connectors-v2-dist/pom.xml b/seatunnel-connectors-v2-dist/pom.xml
index 336c92b74..059158a7c 100644
--- a/seatunnel-connectors-v2-dist/pom.xml
+++ b/seatunnel-connectors-v2-dist/pom.xml
@@ -111,6 +111,11 @@
             <artifactId>connector-email</artifactId>
             <version>${project.version}</version>
         </dependency>
+          <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-elasticsearch</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/connector-elasticsearch/pom.xml
similarity index 52%
copy from seatunnel-connectors-v2/pom.xml
copy to seatunnel-connectors-v2/connector-elasticsearch/pom.xml
index 60fd7db0e..7a6b6c8dc 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/connector-elasticsearch/pom.xml
@@ -17,51 +17,33 @@
     limitations under the License.
 
 -->
-
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>seatunnel</artifactId>
         <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connectors-v2</artifactId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <packaging>pom</packaging>
-    <artifactId>seatunnel-connectors-v2</artifactId>
 
-    <modules>
-        <module>connector-common</module>
-        <module>connector-clickhouse</module>
-        <module>connector-console</module>
-        <module>connector-fake</module>
-        <module>connector-http</module>
-        <module>connector-jdbc</module>
-        <module>connector-kafka</module>
-        <module>connector-pulsar</module>
-        <module>connector-socket</module>
-        <module>connector-hive</module>
-        <module>connector-file</module>
-        <module>connector-hudi</module>
-        <module>connector-assert</module>
-        <module>connector-kudu</module>
-        <module>connector-email</module>
-        <module>connector-dingtalk</module>
-    </modules>
+    <artifactId>connector-elasticsearch</artifactId>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>elasticsearch-rest-client</artifactId>
+            <version>${elasticsearch-rest-client.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
     </dependencies>
 
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-dependency-plugin</artifactId>
-                <configuration>
-                    <skip>true</skip>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-
-</project>
+</project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
new file mode 100644
index 000000000..5a2a3df09
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpStatus;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.util.EntityUtils;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkElasticsearchException;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetElasticsearchVersionException;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+
+import java.io.IOException;
+import java.util.List;
+
+public class EsRestClient {
+
+    private static EsRestClient esRestClient;
+    private static RestClient restClient;
+
+    private EsRestClient() {
+
+    }
+
+    private static RestClientBuilder getRestClientBuilder(List<String> hosts, String username, String password) {
+        HttpHost[] httpHosts = new HttpHost[hosts.size()];
+        for (int i = 0; i < hosts.size(); i++) {
+            String[] hostInfo = hosts.get(i).replace("http://", "").split(":");
+            httpHosts[i] = new HttpHost(hostInfo[0], Integer.parseInt(hostInfo[1]));
+        }
+
+        RestClientBuilder builder = RestClient.builder(httpHosts)
+                .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
+                        .setConnectionRequestTimeout(10 * 1000)
+                        .setSocketTimeout(5 * 60 * 1000));
+
+        if (StringUtils.isNotEmpty(username)) {
+            CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
+            builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
+        }
+        return builder;
+    }
+
+    public static EsRestClient getInstance(List<String> hosts, String username, String password) {
+        if (restClient == null) {
+            RestClientBuilder restClientBuilder = getRestClientBuilder(hosts, username, password);
+            restClient = restClientBuilder.build();
+            esRestClient = new EsRestClient();
+        }
+        return esRestClient;
+    }
+
+    public BulkResponse bulk(String requestBody) {
+        Request request = new Request("POST", "_bulk");
+        request.setJsonEntity(requestBody);
+        try {
+            Response response = restClient.performRequest(request);
+            if (response == null) {
+                throw new BulkElasticsearchException("bulk es Response is null");
+            }
+            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+                ObjectMapper objectMapper = new ObjectMapper();
+                String entity = EntityUtils.toString(response.getEntity());
+                JsonNode json = objectMapper.readTree(entity);
+                int took = json.get("took").asInt();
+                boolean errors = json.get("errors").asBoolean();
+                return new BulkResponse(errors, took, entity);
+            } else {
+                throw new BulkElasticsearchException(String.format("bulk es response status code=%d,request boy=%s", response.getStatusLine().getStatusCode(), requestBody));
+            }
+        } catch (IOException e) {
+            throw new BulkElasticsearchException(String.format("bulk es error,request boy=%s", requestBody), e);
+        }
+    }
+
+    /**
+     * @return version.number, example:2.0.0
+     */
+    public static String getClusterVersion() {
+        Request request = new Request("GET", "/");
+        try {
+            Response response = restClient.performRequest(request);
+            String result = EntityUtils.toString(response.getEntity());
+            ObjectMapper objectMapper = new ObjectMapper();
+            JsonNode jsonNode = objectMapper.readTree(result);
+            JsonNode versionNode = jsonNode.get("version");
+            return versionNode.get("number").asText();
+        } catch (IOException e) {
+            throw new GetElasticsearchVersionException("fail to get elasticsearch version.", e);
+        }
+    }
+
+    public void close() throws IOException {
+        restClient.close();
+    }
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
new file mode 100644
index 000000000..f747fad85
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
+
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.BulkConfig;
+
+public class SinkConfig {
+
+    public static final String INDEX = "index";
+
+    public static final String INDEX_TYPE = "index_type";
+
+    public static final String USERNAME = "username";
+
+    public static final String PASSWORD = "password";
+
+    public static final String HOSTS = "hosts";
+
+    public static final String MAX_BATCH_SIZE = "max_batch_size";
+
+    public static final String MAX_RETRY_SIZE = "max_retry_size";
+
+    public static void setValue(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig){
+        if(pluginConfig.hasPath(MAX_BATCH_SIZE)){
+            BulkConfig.MAX_BATCH_SIZE = pluginConfig.getInt(MAX_BATCH_SIZE);
+        }
+        if(pluginConfig.hasPath(MAX_RETRY_SIZE)){
+            BulkConfig.MAX_RETRY_SIZE = pluginConfig.getInt(MAX_RETRY_SIZE);
+        }
+    }
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/BulkConfig.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/BulkConfig.java
new file mode 100644
index 000000000..b6108dc47
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/BulkConfig.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant;
+
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
+
+/**
+ * bulk es config
+ */
+public class BulkConfig {
+    /**
+     * once bulk es include max document size
+     * {@link SinkConfig#MAX_BATCH_SIZE}
+     */
+    public static int MAX_BATCH_SIZE = 10;
+
+    /**
+     * the max retry size of bulk es
+     * {@link SinkConfig#MAX_RETRY_SIZE}
+     */
+    public static int MAX_RETRY_SIZE = 3;
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/ElasticsearchVersion.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/ElasticsearchVersion.java
new file mode 100644
index 000000000..747ba7d2a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/ElasticsearchVersion.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant;
+
+public enum ElasticsearchVersion {
+    ES2(2), ES5(5), ES6(6), ES7(7), ES8(8);
+
+    private int version;
+
+    ElasticsearchVersion(int version) {
+        this.version = version;
+    }
+
+    public int getVersion() {
+        return version;
+    }
+
+    public void setVersion(int version) {
+        this.version = version;
+    }
+
+    public static ElasticsearchVersion get(int version) {
+        for (ElasticsearchVersion elasticsearchVersion : ElasticsearchVersion.values()) {
+            if (elasticsearchVersion.getVersion() == version) {
+                return elasticsearchVersion;
+            }
+        }
+        throw new IllegalArgumentException(String.format("version=%d,fail fo find ElasticsearchVersion.", version));
+    }
+
+    public static ElasticsearchVersion get(String clusterVersion) {
+        String[] versionArr = clusterVersion.split("\\.");
+        int version = Integer.parseInt(versionArr[0]);
+        return get(version);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/BulkResponse.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/BulkResponse.java
new file mode 100644
index 000000000..348107952
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/BulkResponse.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto;
+
+/**
+ * the response of bulk ES by http request
+ */
+public class BulkResponse {
+
+    private boolean errors;
+    private int took;
+    private String response;
+
+    public BulkResponse() {
+    }
+
+    public BulkResponse(boolean errors, int took, String response) {
+        this.errors = errors;
+        this.took = took;
+        this.response = response;
+    }
+
+    public boolean isErrors() {
+        return errors;
+    }
+
+    public void setErrors(boolean errors) {
+        this.errors = errors;
+    }
+
+    public int getTook() {
+        return took;
+    }
+
+    public void setTook(int took) {
+        this.took = took;
+    }
+
+    public String getResponse() {
+        return response;
+    }
+
+    public void setResponse(String response) {
+        this.response = response;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
new file mode 100644
index 000000000..0c6907a4b
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto;
+
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
+
+/**
+ * index config by seatunnel
+ */
+public class IndexInfo {
+
+    private String index;
+    private String type;
+
+    public IndexInfo(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig) {
+        index = pluginConfig.getString(SinkConfig.INDEX);
+        if (pluginConfig.hasPath(SinkConfig.INDEX_TYPE)) {
+            type = pluginConfig.getString(SinkConfig.INDEX_TYPE);
+        }
+    }
+
+    public String getIndex() {
+        return index;
+    }
+
+    public void setIndex(String index) {
+        this.index = index;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/BulkElasticsearchException.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/BulkElasticsearchException.java
new file mode 100644
index 000000000..14dbfb7ae
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/BulkElasticsearchException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception;
+
+public class BulkElasticsearchException extends RuntimeException {
+
+    public BulkElasticsearchException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public BulkElasticsearchException(String message) {
+        super(message);
+    }
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetElasticsearchVersionException.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetElasticsearchVersionException.java
new file mode 100644
index 000000000..c146de07a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetElasticsearchVersionException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception;
+
+public class GetElasticsearchVersionException extends RuntimeException {
+
+    public GetElasticsearchVersionException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
new file mode 100644
index 000000000..06c5581bb
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.IndexSerializer;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.IndexSerializerFactory;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.IndexTypeSerializer;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.IndexTypeSerializerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *  use in elasticsearch version >= 7.*
+ */
+public class ElasticsearchRowSerializer implements SeaTunnelRowSerializer{
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    private final IndexSerializer indexSerializer;
+
+    private final IndexTypeSerializer indexTypeSerializer;
+
+    public ElasticsearchRowSerializer(ElasticsearchVersion elasticsearchVersion, IndexInfo indexInfo, SeaTunnelRowType seaTunnelRowType) {
+        this.indexTypeSerializer = IndexTypeSerializerFactory.getIndexTypeSerializer(elasticsearchVersion,indexInfo.getType());
+        this.indexSerializer = IndexSerializerFactory.getIndexSerializer(indexInfo.getIndex(),seaTunnelRowType);
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    @Override
+    public String serializeRow(SeaTunnelRow row){
+        String[] fieldNames = seaTunnelRowType.getFieldNames();
+        Map<String, Object> doc = new HashMap<>(fieldNames.length);
+        Object[] fields = row.getFields();
+        for (int i = 0; i < fieldNames.length; i++) {
+            doc.put(fieldNames[i], fields[i]);
+        }
+
+        StringBuilder sb = new StringBuilder();
+
+        Map<String,String> indexInner = new HashMap<>();
+        String index = indexSerializer.serialize(row);
+        indexInner.put("_index",index);
+        indexTypeSerializer.fillType(indexInner);
+
+        Map<String, Map<String,String>> indexParam = new HashMap<>();
+        indexParam.put("index",indexInner);
+        try {
+            sb.append(objectMapper.writeValueAsString(indexParam));
+            sb.append("\n");
+            String indexDoc = objectMapper.writeValueAsString(doc);
+            sb.append(indexDoc);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException("Object json deserialization exception.", e);
+        }
+
+        return sb.toString();
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/SeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/SeaTunnelRowSerializer.java
new file mode 100644
index 000000000..d1fbae8a4
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/SeaTunnelRowSerializer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize;
+
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+public interface SeaTunnelRowSerializer {
+
+    String serializeRow(SeaTunnelRow row);
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/IndexSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/IndexSerializer.java
new file mode 100644
index 000000000..67dd89455
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/IndexSerializer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+/**
+ * index is a variable
+ */
+public interface IndexSerializer {
+
+    String serialize(SeaTunnelRow row);
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/IndexSerializerFactory.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/IndexSerializerFactory.java
new file mode 100644
index 000000000..152181c3e
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/IndexSerializerFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.impl.FixedValueIndexSerializer;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.impl.VariableIndexSerializer;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.util.RegexUtils;
+
+import java.util.List;
+
+public class IndexSerializerFactory {
+
+    public static IndexSerializer getIndexSerializer(String index, SeaTunnelRowType seaTunnelRowType) {
+        List<String> fieldNames = RegexUtils.extractDatas(index, "\\$\\{(.*?)\\}");
+        if (fieldNames != null && fieldNames.size() > 0) {
+            return new VariableIndexSerializer(seaTunnelRowType, index, fieldNames);
+        } else {
+            return new FixedValueIndexSerializer(index);
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/FixedValueIndexSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/FixedValueIndexSerializer.java
new file mode 100644
index 000000000..07d5dd5b6
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/FixedValueIndexSerializer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.impl;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.IndexSerializer;
+
+/**
+ * index is a fixed value,not a variable
+ */
+public class FixedValueIndexSerializer implements IndexSerializer {
+
+    private final String index;
+
+    public FixedValueIndexSerializer(String index) {
+        this.index = index;
+    }
+
+    @Override
+    public String serialize(SeaTunnelRow row) {
+        return index;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/VariableIndexSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/VariableIndexSerializer.java
new file mode 100644
index 000000000..799763235
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/VariableIndexSerializer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.impl;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.IndexSerializer;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * index include variable
+ */
+public class VariableIndexSerializer implements IndexSerializer {
+
+    private final String index;
+    private final Map<String, Integer> fieldIndexMap;
+
+    private final String NULL_DEFAULT = "null";
+
+    public VariableIndexSerializer(SeaTunnelRowType seaTunnelRowType, String index, List<String> fieldNames) {
+        this.index = index;
+        String[] rowFieldNames = seaTunnelRowType.getFieldNames();
+        fieldIndexMap = new HashMap<>(rowFieldNames.length);
+        for (int i = 0; i < rowFieldNames.length; i++) {
+            if (fieldNames.contains(rowFieldNames[i])) {
+                fieldIndexMap.put(rowFieldNames[i], i);
+            }
+        }
+    }
+
+    @Override
+    public String serialize(SeaTunnelRow row) {
+        String indexName = this.index;
+        for (Map.Entry<String, Integer> fieldIndexEntry : fieldIndexMap.entrySet()) {
+            String fieldName = fieldIndexEntry.getKey();
+            int fieldIndex = fieldIndexEntry.getValue();
+            String value = getValue(fieldIndex, row);
+            indexName = indexName.replace(String.format("${%s}", fieldName), value);
+        }
+        return indexName.toLowerCase();
+    }
+
+    private String getValue(int fieldIndex, SeaTunnelRow row) {
+        Object valueObj = row.getField(fieldIndex);
+        if (valueObj == null) {
+            return NULL_DEFAULT;
+        } else {
+            return valueObj.toString();
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializer.java
new file mode 100644
index 000000000..3e528058e
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializer.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type;
+
+
+import java.util.Map;
+
+public interface IndexTypeSerializer {
+
+    void fillType(Map<String, String> indexInner);
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializerFactory.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializerFactory.java
new file mode 100644
index 000000000..878257cb6
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializerFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type;
+
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.impl.NotIndexTypeSerializer;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.impl.RequiredIndexTypeSerializer;
+
+import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion.*;
+
+public class IndexTypeSerializerFactory {
+
+    private static final String DEFAULT_TYPE = "st";
+
+    private IndexTypeSerializerFactory() {
+
+    }
+
+    public static IndexTypeSerializer getIndexTypeSerializer(ElasticsearchVersion elasticsearchVersion, String type) {
+        if (elasticsearchVersion == ES2 || elasticsearchVersion == ES5) {
+            if (type == null || "".equals(type)) {
+                type = DEFAULT_TYPE;
+            }
+            return new RequiredIndexTypeSerializer(type);
+        }
+        if (elasticsearchVersion == ES6) {
+            if (type != null && !"".equals(type)) {
+                return new RequiredIndexTypeSerializer(type);
+            }
+        }
+        return new NotIndexTypeSerializer();
+    }
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/NotIndexTypeSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/NotIndexTypeSerializer.java
new file mode 100644
index 000000000..fa5afb5b8
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/NotIndexTypeSerializer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.impl;
+
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.IndexTypeSerializer;
+
+import java.util.Map;
+
+/**
+ * not need an index type for elasticsearch version:6.*,7.*,8.*
+ */
+public class NotIndexTypeSerializer implements IndexTypeSerializer {
+
+
+    @Override
+    public void fillType(Map<String, String> indexInner) {
+
+    }
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/RequiredIndexTypeSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/RequiredIndexTypeSerializer.java
new file mode 100644
index 000000000..11e4f1d51
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/RequiredIndexTypeSerializer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.impl;
+
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.IndexTypeSerializer;
+
+import java.util.Map;
+
+/**
+ * generate an index type for elasticsearch version:2.*,5.*,6.*
+ */
+public class RequiredIndexTypeSerializer implements IndexTypeSerializer {
+
+    private final String type;
+
+    public RequiredIndexTypeSerializer(String type) {
+        this.type = type;
+    }
+
+    @Override
+    public void fillType(Map<String, String> indexInner) {
+        indexInner.put("_type", type);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
new file mode 100644
index 000000000..a5eac83ac
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
+
+import com.google.auto.service.AutoService;
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchSinkState;
+
+import java.util.Collections;
+
+
+@AutoService(SeaTunnelSink.class)
+public class ElasticsearchSink implements SeaTunnelSink<SeaTunnelRow, ElasticsearchSinkState, ElasticsearchCommitInfo, ElasticsearchAggregatedCommitInfo> {
+
+
+    private org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig;
+    private SeaTunnelContext seaTunnelContext;
+    private SeaTunnelRowType seaTunnelRowType;
+
+
+    @Override
+    public String getPluginName() {
+        return "Elasticsearch";
+    }
+
+    @Override
+    public void prepare(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig) throws PrepareFailException {
+        this.pluginConfig = pluginConfig;
+        SinkConfig.setValue(pluginConfig);
+    }
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+        return this.seaTunnelRowType;
+    }
+
+    @Override
+    public SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, ElasticsearchSinkState> createWriter(SinkWriter.Context context) {
+        return new ElasticsearchSinkWriter(context, seaTunnelRowType, pluginConfig, Collections.emptyList());
+    }
+
+    @Override
+    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+        this.seaTunnelContext = seaTunnelContext;
+    }
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
new file mode 100644
index 000000000..048f48f44
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.BulkConfig;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkElasticsearchException;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.ElasticsearchRowSerializer;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.SeaTunnelRowSerializer;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * ElasticsearchSinkWriter is a sink writer that will write {@link SeaTunnelRow} to Elasticsearch.
+ */
+public class ElasticsearchSinkWriter<ElasticsearchSinkState> implements SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, ElasticsearchSinkState> {
+
+    private final Context context;
+
+    private final SeaTunnelRowSerializer seaTunnelRowSerializer;
+    private final List<String> requestEsList;
+    private EsRestClient esRestClient;
+
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchSinkWriter.class);
+
+
+    public ElasticsearchSinkWriter(
+            Context context,
+            SeaTunnelRowType seaTunnelRowType,
+            Config pluginConfig,
+            List<ElasticsearchSinkState> elasticsearchStates) {
+        this.context = context;
+
+        IndexInfo indexInfo = new IndexInfo(pluginConfig);
+        initRestClient(pluginConfig);
+        ElasticsearchVersion elasticsearchVersion = ElasticsearchVersion.get(EsRestClient.getClusterVersion());
+        this.seaTunnelRowSerializer = new ElasticsearchRowSerializer(elasticsearchVersion, indexInfo, seaTunnelRowType);
+
+        this.requestEsList = new ArrayList<>(BulkConfig.MAX_BATCH_SIZE);
+    }
+
+    private void initRestClient(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig) {
+        List<String> hosts = pluginConfig.getStringList(SinkConfig.HOSTS);
+        String username = null;
+        String password = null;
+        if (pluginConfig.hasPath(SinkConfig.USERNAME)) {
+            username = pluginConfig.getString(SinkConfig.USERNAME);
+            if (pluginConfig.hasPath(SinkConfig.PASSWORD)) {
+                password = pluginConfig.getString(SinkConfig.PASSWORD);
+            }
+        }
+        esRestClient = EsRestClient.getInstance(hosts, username, password);
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) {
+        String indexRequestRow = seaTunnelRowSerializer.serializeRow(element);
+        requestEsList.add(indexRequestRow);
+        if (requestEsList.size() >= BulkConfig.MAX_BATCH_SIZE) {
+            bulkEsWithRetry(this.esRestClient, this.requestEsList, BulkConfig.MAX_RETRY_SIZE);
+            requestEsList.clear();
+        }
+    }
+
+    @Override
+    public Optional<ElasticsearchCommitInfo> prepareCommit() {
+        return Optional.empty();
+    }
+
+    @Override
+    public void abortPrepare() {
+    }
+
+    public void bulkEsWithRetry(EsRestClient esRestClient, List<String> requestEsList, int maxRetry) {
+        for (int tryCnt = 1; tryCnt <= maxRetry; tryCnt++) {
+            if (requestEsList.size() > 0) {
+                String requestBody = String.join("\n", requestEsList) + "\n";
+                try {
+                    BulkResponse bulkResponse = esRestClient.bulk(requestBody);
+                    if (!bulkResponse.isErrors()) {
+                        break;
+                    }
+                } catch (Exception ex) {
+                    if (tryCnt == maxRetry) {
+                        throw new BulkElasticsearchException("bulk es error,try count=%d", ex);
+                    }
+                    LOGGER.warn(String.format("bulk es error,try count=%d", tryCnt), ex);
+                }
+
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        bulkEsWithRetry(this.esRestClient, this.requestEsList, BulkConfig.MAX_RETRY_SIZE);
+        esRestClient.close();
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchAggregatedCommitInfo.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchAggregatedCommitInfo.java
new file mode 100644
index 000000000..6a0057b2b
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchAggregatedCommitInfo.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.state;
+
+import java.io.Serializable;
+
+/**
+ * Right now, we don't need aggregated commit in kafka.
+ * Todo: we need to add a default implementation of this state.
+ */
+public class ElasticsearchAggregatedCommitInfo implements Serializable {
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchCommitInfo.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchCommitInfo.java
new file mode 100644
index 000000000..2fea90d31
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchCommitInfo.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.state;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+@Data
+@AllArgsConstructor
+public class ElasticsearchCommitInfo implements Serializable {
+
+    private final String transactionId;
+    private final Properties kafkaProperties;
+    private final long producerId;
+    private final short epoch;
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchSinkState.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchSinkState.java
new file mode 100644
index 000000000..7cd01c65d
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchSinkState.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.state;
+
+import java.io.Serializable;
+
+public class ElasticsearchSinkState implements Serializable {
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/util/RegexUtils.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/util/RegexUtils.java
new file mode 100644
index 000000000..9ccc413ff
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/util/RegexUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+
+import java.util.regex.Pattern;
+
+public class RegexUtils {
+
+    public static List<String> extractDatas(String content, String regex) {
+        List<String> datas = new ArrayList<>();
+        Pattern pattern = Pattern.compile(regex, Pattern.DOTALL);
+        Matcher matcher = pattern.matcher(content);
+        while (matcher.find()) {
+            String result = matcher.group(1);
+            datas.add(result);
+        }
+        return datas;
+    }
+
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 60fd7db0e..083bae888 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -47,6 +47,7 @@
         <module>connector-kudu</module>
         <module>connector-email</module>
         <module>connector-dingtalk</module>
+        <module>connector-elasticsearch</module>
     </modules>
 
     <dependencies>