You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/08/05 15:59:34 UTC
[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] new connecotor of Elasticsearch sink(#2326) (#2330)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2a1fd5027 [Feature][Connector-V2] new connecotor of Elasticsearch sink(#2326) (#2330)
2a1fd5027 is described below
commit 2a1fd5027f0eea2c929d257f5a150bb83cb6df63
Author: iture123 <10...@qq.com>
AuthorDate: Fri Aug 5 23:59:28 2022 +0800
[Feature][Connector-V2] new connecotor of Elasticsearch sink(#2326) (#2330)
---
docs/en/new-connector/sink/Elasticsearch.md | 56 +++++++++
plugin-mapping.properties | 1 +
pom.xml | 1 +
seatunnel-connectors-v2-dist/pom.xml | 5 +
.../{ => connector-elasticsearch}/pom.xml | 52 +++------
.../elasticsearch/client/EsRestClient.java | 123 ++++++++++++++++++++
.../seatunnel/elasticsearch/config/SinkConfig.java | 47 ++++++++
.../elasticsearch/constant/BulkConfig.java | 37 ++++++
.../constant/ElasticsearchVersion.java | 51 ++++++++
.../seatunnel/elasticsearch/dto/BulkResponse.java | 61 ++++++++++
.../seatunnel/elasticsearch/dto/IndexInfo.java | 53 +++++++++
.../exception/BulkElasticsearchException.java | 30 +++++
.../GetElasticsearchVersionException.java | 25 ++++
.../serialize/ElasticsearchRowSerializer.java | 80 +++++++++++++
.../serialize/SeaTunnelRowSerializer.java | 26 +++++
.../serialize/index/IndexSerializer.java | 28 +++++
.../serialize/index/IndexSerializerFactory.java | 37 ++++++
.../index/impl/FixedValueIndexSerializer.java | 38 ++++++
.../index/impl/VariableIndexSerializer.java | 69 +++++++++++
.../serialize/type/IndexTypeSerializer.java | 27 +++++
.../serialize/type/IndexTypeSerializerFactory.java | 49 ++++++++
.../type/impl/NotIndexTypeSerializer.java | 35 ++++++
.../type/impl/RequiredIndexTypeSerializer.java | 39 +++++++
.../elasticsearch/sink/ElasticsearchSink.java | 76 ++++++++++++
.../sink/ElasticsearchSinkWriter.java | 129 +++++++++++++++++++++
.../state/ElasticsearchAggregatedCommitInfo.java | 27 +++++
.../state/ElasticsearchCommitInfo.java | 35 ++++++
.../state/ElasticsearchSinkState.java | 24 ++++
.../seatunnel/elasticsearch/util/RegexUtils.java | 39 +++++++
seatunnel-connectors-v2/pom.xml | 1 +
30 files changed, 1266 insertions(+), 35 deletions(-)
diff --git a/docs/en/new-connector/sink/Elasticsearch.md b/docs/en/new-connector/sink/Elasticsearch.md
new file mode 100644
index 000000000..c8fbb551e
--- /dev/null
+++ b/docs/en/new-connector/sink/Elasticsearch.md
@@ -0,0 +1,56 @@
+# Elasticsearch
+
+## Description
+
+Output data to `Elasticsearch`.
+
+:::tip
+
+Engine Supported
+
+* supported `ElasticSearch version is >= 2.x and < 8.x`
+
+:::
+
+## Options
+
+| name | type | required | default value |
+|-------------------| ------ | -------- |---------------|
+| hosts | array | yes | - |
+| index | string | yes | - |
+| index_type | string | no | |
+| username | string | no | |
+| password | string | no | |
+| max_retry_size | int | no | 3 |
+| max_batch_size | int | no | 10 |
+
+
+
+### hosts [array]
+`Elasticsearch` cluster http address, the format is `host:port` , allowing multiple hosts to be specified. Such as `["host1:9200", "host2:9200"]`.
+
+### index [string]
+`Elasticsearch` `index` name.Index support contains variables of field name,such as `seatunnel_${age}`,and the field must appear at seatunnel row.
+
+### index_type [string]
+`Elasticsearch` index type, it is recommended not to specify in elasticsearch 6 and above
+
+### username [string]
+x-pack username
+
+### password [string]
+x-pack password
+
+### max_retry_size [int]
+one bulk request max try size
+
+### max_batch_size [int]
+batch bulk doc max size
+
+## Examples
+```bash
+Elasticsearch {
+ hosts = ["localhost:9200"]
+ index = "seatunnel-${age}"
+}
+```
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index c6c3f16f5..ac7497907 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -111,3 +111,4 @@ seatunnel.sink.LocalFile = connector-file-local
seatunnel.source.Pulsar = connector-pulsar
seatunnel.source.Hudi = connector-hudi
seatunnel.sink.DingTalk = connector-dingtalk
+seatunnel.sink.elasticsearch = connector-elasticsearch
diff --git a/pom.xml b/pom.xml
index 188a1ba57..e9e5ff448 100644
--- a/pom.xml
+++ b/pom.xml
@@ -223,6 +223,7 @@
<hibernate.validator.version>6.2.2.Final</hibernate.validator.version>
<jsoup.version>1.14.3</jsoup.version>
<javax.annotation-api.version>1.3.2</javax.annotation-api.version>
+ <elasticsearch-rest-client.version>7.5.1</elasticsearch-rest-client.version>
</properties>
<dependencyManagement>
diff --git a/seatunnel-connectors-v2-dist/pom.xml b/seatunnel-connectors-v2-dist/pom.xml
index 336c92b74..059158a7c 100644
--- a/seatunnel-connectors-v2-dist/pom.xml
+++ b/seatunnel-connectors-v2-dist/pom.xml
@@ -111,6 +111,11 @@
<artifactId>connector-email</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-elasticsearch</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/connector-elasticsearch/pom.xml
similarity index 52%
copy from seatunnel-connectors-v2/pom.xml
copy to seatunnel-connectors-v2/connector-elasticsearch/pom.xml
index 60fd7db0e..7a6b6c8dc 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/connector-elasticsearch/pom.xml
@@ -17,51 +17,33 @@
limitations under the License.
-->
-
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>seatunnel</artifactId>
<groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connectors-v2</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <packaging>pom</packaging>
- <artifactId>seatunnel-connectors-v2</artifactId>
- <modules>
- <module>connector-common</module>
- <module>connector-clickhouse</module>
- <module>connector-console</module>
- <module>connector-fake</module>
- <module>connector-http</module>
- <module>connector-jdbc</module>
- <module>connector-kafka</module>
- <module>connector-pulsar</module>
- <module>connector-socket</module>
- <module>connector-hive</module>
- <module>connector-file</module>
- <module>connector-hudi</module>
- <module>connector-assert</module>
- <module>connector-kudu</module>
- <module>connector-email</module>
- <module>connector-dingtalk</module>
- </modules>
+ <artifactId>connector-elasticsearch</artifactId>
<dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>elasticsearch-rest-client</artifactId>
+ <version>${elasticsearch-rest-client.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
</dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <configuration>
- <skip>true</skip>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
-</project>
+</project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
new file mode 100644
index 000000000..5a2a3df09
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpStatus;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.util.EntityUtils;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkElasticsearchException;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetElasticsearchVersionException;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+
+import java.io.IOException;
+import java.util.List;
+
+public class EsRestClient {
+
+ private static EsRestClient esRestClient;
+ private static RestClient restClient;
+
+ private EsRestClient() {
+
+ }
+
+ private static RestClientBuilder getRestClientBuilder(List<String> hosts, String username, String password) {
+ HttpHost[] httpHosts = new HttpHost[hosts.size()];
+ for (int i = 0; i < hosts.size(); i++) {
+ String[] hostInfo = hosts.get(i).replace("http://", "").split(":");
+ httpHosts[i] = new HttpHost(hostInfo[0], Integer.parseInt(hostInfo[1]));
+ }
+
+ RestClientBuilder builder = RestClient.builder(httpHosts)
+ .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
+ .setConnectionRequestTimeout(10 * 1000)
+ .setSocketTimeout(5 * 60 * 1000));
+
+ if (StringUtils.isNotEmpty(username)) {
+ CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
+ builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
+ }
+ return builder;
+ }
+
+ public static EsRestClient getInstance(List<String> hosts, String username, String password) {
+ if (restClient == null) {
+ RestClientBuilder restClientBuilder = getRestClientBuilder(hosts, username, password);
+ restClient = restClientBuilder.build();
+ esRestClient = new EsRestClient();
+ }
+ return esRestClient;
+ }
+
+ public BulkResponse bulk(String requestBody) {
+ Request request = new Request("POST", "_bulk");
+ request.setJsonEntity(requestBody);
+ try {
+ Response response = restClient.performRequest(request);
+ if (response == null) {
+ throw new BulkElasticsearchException("bulk es Response is null");
+ }
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+ ObjectMapper objectMapper = new ObjectMapper();
+ String entity = EntityUtils.toString(response.getEntity());
+ JsonNode json = objectMapper.readTree(entity);
+ int took = json.get("took").asInt();
+ boolean errors = json.get("errors").asBoolean();
+ return new BulkResponse(errors, took, entity);
+ } else {
+ throw new BulkElasticsearchException(String.format("bulk es response status code=%d,request boy=%s", response.getStatusLine().getStatusCode(), requestBody));
+ }
+ } catch (IOException e) {
+ throw new BulkElasticsearchException(String.format("bulk es error,request boy=%s", requestBody), e);
+ }
+ }
+
+ /**
+ * @return version.number, example:2.0.0
+ */
+ public static String getClusterVersion() {
+ Request request = new Request("GET", "/");
+ try {
+ Response response = restClient.performRequest(request);
+ String result = EntityUtils.toString(response.getEntity());
+ ObjectMapper objectMapper = new ObjectMapper();
+ JsonNode jsonNode = objectMapper.readTree(result);
+ JsonNode versionNode = jsonNode.get("version");
+ return versionNode.get("number").asText();
+ } catch (IOException e) {
+ throw new GetElasticsearchVersionException("fail to get elasticsearch version.", e);
+ }
+ }
+
+ public void close() throws IOException {
+ restClient.close();
+ }
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
new file mode 100644
index 000000000..f747fad85
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
+
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.BulkConfig;
+
+public class SinkConfig {
+
+ public static final String INDEX = "index";
+
+ public static final String INDEX_TYPE = "index_type";
+
+ public static final String USERNAME = "username";
+
+ public static final String PASSWORD = "password";
+
+ public static final String HOSTS = "hosts";
+
+ public static final String MAX_BATCH_SIZE = "max_batch_size";
+
+ public static final String MAX_RETRY_SIZE = "max_retry_size";
+
+ public static void setValue(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig){
+ if(pluginConfig.hasPath(MAX_BATCH_SIZE)){
+ BulkConfig.MAX_BATCH_SIZE = pluginConfig.getInt(MAX_BATCH_SIZE);
+ }
+ if(pluginConfig.hasPath(MAX_RETRY_SIZE)){
+ BulkConfig.MAX_RETRY_SIZE = pluginConfig.getInt(MAX_RETRY_SIZE);
+ }
+ }
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/BulkConfig.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/BulkConfig.java
new file mode 100644
index 000000000..b6108dc47
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/BulkConfig.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant;
+
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
+
+/**
+ * bulk es config
+ */
+public class BulkConfig {
+ /**
+ * once bulk es include max document size
+ * {@link SinkConfig#MAX_BATCH_SIZE}
+ */
+ public static int MAX_BATCH_SIZE = 10;
+
+ /**
+ * the max retry size of bulk es
+ * {@link SinkConfig#MAX_RETRY_SIZE}
+ */
+ public static int MAX_RETRY_SIZE = 3;
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/ElasticsearchVersion.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/ElasticsearchVersion.java
new file mode 100644
index 000000000..747ba7d2a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/ElasticsearchVersion.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant;
+
+public enum ElasticsearchVersion {
+ ES2(2), ES5(5), ES6(6), ES7(7), ES8(8);
+
+ private int version;
+
+ ElasticsearchVersion(int version) {
+ this.version = version;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ public static ElasticsearchVersion get(int version) {
+ for (ElasticsearchVersion elasticsearchVersion : ElasticsearchVersion.values()) {
+ if (elasticsearchVersion.getVersion() == version) {
+ return elasticsearchVersion;
+ }
+ }
+ throw new IllegalArgumentException(String.format("version=%d,fail fo find ElasticsearchVersion.", version));
+ }
+
+ public static ElasticsearchVersion get(String clusterVersion) {
+ String[] versionArr = clusterVersion.split("\\.");
+ int version = Integer.parseInt(versionArr[0]);
+ return get(version);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/BulkResponse.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/BulkResponse.java
new file mode 100644
index 000000000..348107952
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/BulkResponse.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto;
+
+/**
+ * the response of bulk ES by http request
+ */
+public class BulkResponse {
+
+ private boolean errors;
+ private int took;
+ private String response;
+
+ public BulkResponse() {
+ }
+
+ public BulkResponse(boolean errors, int took, String response) {
+ this.errors = errors;
+ this.took = took;
+ this.response = response;
+ }
+
+ public boolean isErrors() {
+ return errors;
+ }
+
+ public void setErrors(boolean errors) {
+ this.errors = errors;
+ }
+
+ public int getTook() {
+ return took;
+ }
+
+ public void setTook(int took) {
+ this.took = took;
+ }
+
+ public String getResponse() {
+ return response;
+ }
+
+ public void setResponse(String response) {
+ this.response = response;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
new file mode 100644
index 000000000..0c6907a4b
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto;
+
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
+
+/**
+ * index config by seatunnel
+ */
+public class IndexInfo {
+
+ private String index;
+ private String type;
+
+ public IndexInfo(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig) {
+ index = pluginConfig.getString(SinkConfig.INDEX);
+ if (pluginConfig.hasPath(SinkConfig.INDEX_TYPE)) {
+ type = pluginConfig.getString(SinkConfig.INDEX_TYPE);
+ }
+ }
+
+ public String getIndex() {
+ return index;
+ }
+
+ public void setIndex(String index) {
+ this.index = index;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/BulkElasticsearchException.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/BulkElasticsearchException.java
new file mode 100644
index 000000000..14dbfb7ae
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/BulkElasticsearchException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception;
+
+public class BulkElasticsearchException extends RuntimeException {
+
+ public BulkElasticsearchException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public BulkElasticsearchException(String message) {
+ super(message);
+ }
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetElasticsearchVersionException.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetElasticsearchVersionException.java
new file mode 100644
index 000000000..c146de07a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetElasticsearchVersionException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception;
+
+public class GetElasticsearchVersionException extends RuntimeException {
+
+ public GetElasticsearchVersionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
new file mode 100644
index 000000000..06c5581bb
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.IndexSerializer;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.IndexSerializerFactory;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.IndexTypeSerializer;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.IndexTypeSerializerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * use in elasticsearch version >= 7.*
+ */
+public class ElasticsearchRowSerializer implements SeaTunnelRowSerializer{
+ private final SeaTunnelRowType seaTunnelRowType;
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ private final IndexSerializer indexSerializer;
+
+ private final IndexTypeSerializer indexTypeSerializer;
+
+ public ElasticsearchRowSerializer(ElasticsearchVersion elasticsearchVersion, IndexInfo indexInfo, SeaTunnelRowType seaTunnelRowType) {
+ this.indexTypeSerializer = IndexTypeSerializerFactory.getIndexTypeSerializer(elasticsearchVersion,indexInfo.getType());
+ this.indexSerializer = IndexSerializerFactory.getIndexSerializer(indexInfo.getIndex(),seaTunnelRowType);
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ @Override
+ public String serializeRow(SeaTunnelRow row){
+ String[] fieldNames = seaTunnelRowType.getFieldNames();
+ Map<String, Object> doc = new HashMap<>(fieldNames.length);
+ Object[] fields = row.getFields();
+ for (int i = 0; i < fieldNames.length; i++) {
+ doc.put(fieldNames[i], fields[i]);
+ }
+
+ StringBuilder sb = new StringBuilder();
+
+ Map<String,String> indexInner = new HashMap<>();
+ String index = indexSerializer.serialize(row);
+ indexInner.put("_index",index);
+ indexTypeSerializer.fillType(indexInner);
+
+ Map<String, Map<String,String>> indexParam = new HashMap<>();
+ indexParam.put("index",indexInner);
+ try {
+ sb.append(objectMapper.writeValueAsString(indexParam));
+ sb.append("\n");
+ String indexDoc = objectMapper.writeValueAsString(doc);
+ sb.append(indexDoc);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Object json deserialization exception.", e);
+ }
+
+ return sb.toString();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/SeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/SeaTunnelRowSerializer.java
new file mode 100644
index 000000000..d1fbae8a4
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/SeaTunnelRowSerializer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize;
+
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+public interface SeaTunnelRowSerializer {
+
+ String serializeRow(SeaTunnelRow row);
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/IndexSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/IndexSerializer.java
new file mode 100644
index 000000000..67dd89455
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/IndexSerializer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+/**
+ * index is a variable
+ */
+public interface IndexSerializer {
+
+ String serialize(SeaTunnelRow row);
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/IndexSerializerFactory.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/IndexSerializerFactory.java
new file mode 100644
index 000000000..152181c3e
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/IndexSerializerFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.impl.FixedValueIndexSerializer;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.impl.VariableIndexSerializer;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.util.RegexUtils;
+
+import java.util.List;
+
+public class IndexSerializerFactory {
+
+ public static IndexSerializer getIndexSerializer(String index, SeaTunnelRowType seaTunnelRowType) {
+ List<String> fieldNames = RegexUtils.extractDatas(index, "\\$\\{(.*?)\\}");
+ if (fieldNames != null && fieldNames.size() > 0) {
+ return new VariableIndexSerializer(seaTunnelRowType, index, fieldNames);
+ } else {
+ return new FixedValueIndexSerializer(index);
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/FixedValueIndexSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/FixedValueIndexSerializer.java
new file mode 100644
index 000000000..07d5dd5b6
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/FixedValueIndexSerializer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.impl;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.IndexSerializer;
+
+/**
+ * index is a fixed value,not a variable
+ */
+public class FixedValueIndexSerializer implements IndexSerializer {
+
+ private final String index;
+
+ public FixedValueIndexSerializer(String index) {
+ this.index = index;
+ }
+
+ @Override
+ public String serialize(SeaTunnelRow row) {
+ return index;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/VariableIndexSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/VariableIndexSerializer.java
new file mode 100644
index 000000000..799763235
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/impl/VariableIndexSerializer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.impl;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.IndexSerializer;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * index include variable
+ */
+public class VariableIndexSerializer implements IndexSerializer {
+
+ private final String index;
+ private final Map<String, Integer> fieldIndexMap;
+
+ private final String NULL_DEFAULT = "null";
+
+ public VariableIndexSerializer(SeaTunnelRowType seaTunnelRowType, String index, List<String> fieldNames) {
+ this.index = index;
+ String[] rowFieldNames = seaTunnelRowType.getFieldNames();
+ fieldIndexMap = new HashMap<>(rowFieldNames.length);
+ for (int i = 0; i < rowFieldNames.length; i++) {
+ if (fieldNames.contains(rowFieldNames[i])) {
+ fieldIndexMap.put(rowFieldNames[i], i);
+ }
+ }
+ }
+
+ @Override
+ public String serialize(SeaTunnelRow row) {
+ String indexName = this.index;
+ for (Map.Entry<String, Integer> fieldIndexEntry : fieldIndexMap.entrySet()) {
+ String fieldName = fieldIndexEntry.getKey();
+ int fieldIndex = fieldIndexEntry.getValue();
+ String value = getValue(fieldIndex, row);
+ indexName = indexName.replace(String.format("${%s}", fieldName), value);
+ }
+ return indexName.toLowerCase();
+ }
+
+ private String getValue(int fieldIndex, SeaTunnelRow row) {
+ Object valueObj = row.getField(fieldIndex);
+ if (valueObj == null) {
+ return NULL_DEFAULT;
+ } else {
+ return valueObj.toString();
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializer.java
new file mode 100644
index 000000000..3e528058e
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializer.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type;
+
+
+import java.util.Map;
+
+public interface IndexTypeSerializer {
+
+ void fillType(Map<String, String> indexInner);
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializerFactory.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializerFactory.java
new file mode 100644
index 000000000..878257cb6
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/IndexTypeSerializerFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type;
+
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.impl.NotIndexTypeSerializer;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.impl.RequiredIndexTypeSerializer;
+
+import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion.*;
+
+public class IndexTypeSerializerFactory {
+
+ private static final String DEFAULT_TYPE = "st";
+
+ private IndexTypeSerializerFactory() {
+
+ }
+
+ public static IndexTypeSerializer getIndexTypeSerializer(ElasticsearchVersion elasticsearchVersion, String type) {
+ if (elasticsearchVersion == ES2 || elasticsearchVersion == ES5) {
+ if (type == null || "".equals(type)) {
+ type = DEFAULT_TYPE;
+ }
+ return new RequiredIndexTypeSerializer(type);
+ }
+ if (elasticsearchVersion == ES6) {
+ if (type != null && !"".equals(type)) {
+ return new RequiredIndexTypeSerializer(type);
+ }
+ }
+ return new NotIndexTypeSerializer();
+ }
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/NotIndexTypeSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/NotIndexTypeSerializer.java
new file mode 100644
index 000000000..fa5afb5b8
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/NotIndexTypeSerializer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.impl;
+
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.IndexTypeSerializer;
+
+import java.util.Map;
+
+/**
+ * not need an index type for elasticsearch version:6.*,7.*,8.*
+ */
+public class NotIndexTypeSerializer implements IndexTypeSerializer {
+
+
+ @Override
+ public void fillType(Map<String, String> indexInner) {
+
+ }
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/RequiredIndexTypeSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/RequiredIndexTypeSerializer.java
new file mode 100644
index 000000000..11e4f1d51
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/type/impl/RequiredIndexTypeSerializer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.impl;
+
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.IndexTypeSerializer;
+
+import java.util.Map;
+
+/**
+ * generate an index type for elasticsearch version:2.*,5.*,6.*
+ */
+public class RequiredIndexTypeSerializer implements IndexTypeSerializer {
+
+ private final String type;
+
+ public RequiredIndexTypeSerializer(String type) {
+ this.type = type;
+ }
+
+ @Override
+ public void fillType(Map<String, String> indexInner) {
+ indexInner.put("_type", type);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
new file mode 100644
index 000000000..a5eac83ac
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
+
+import com.google.auto.service.AutoService;
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchSinkState;
+
+import java.util.Collections;
+
+
+@AutoService(SeaTunnelSink.class)
+public class ElasticsearchSink implements SeaTunnelSink<SeaTunnelRow, ElasticsearchSinkState, ElasticsearchCommitInfo, ElasticsearchAggregatedCommitInfo> {
+
+
+ private org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig;
+ private SeaTunnelContext seaTunnelContext;
+ private SeaTunnelRowType seaTunnelRowType;
+
+
+ @Override
+ public String getPluginName() {
+ return "Elasticsearch";
+ }
+
+ @Override
+ public void prepare(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig) throws PrepareFailException {
+ this.pluginConfig = pluginConfig;
+ SinkConfig.setValue(pluginConfig);
+ }
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+ return this.seaTunnelRowType;
+ }
+
+ @Override
+ public SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, ElasticsearchSinkState> createWriter(SinkWriter.Context context) {
+ return new ElasticsearchSinkWriter(context, seaTunnelRowType, pluginConfig, Collections.emptyList());
+ }
+
+ @Override
+ public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+ this.seaTunnelContext = seaTunnelContext;
+ }
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
new file mode 100644
index 000000000..048f48f44
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.BulkConfig;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkElasticsearchException;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.ElasticsearchRowSerializer;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.SeaTunnelRowSerializer;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * ElasticsearchSinkWriter is a sink writer that will write {@link SeaTunnelRow} to Elasticsearch.
+ */
+public class ElasticsearchSinkWriter<ElasticsearchSinkState> implements SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, ElasticsearchSinkState> {
+
+ private final Context context;
+
+ private final SeaTunnelRowSerializer seaTunnelRowSerializer;
+ private final List<String> requestEsList;
+ private EsRestClient esRestClient;
+
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchSinkWriter.class);
+
+
+ public ElasticsearchSinkWriter(
+ Context context,
+ SeaTunnelRowType seaTunnelRowType,
+ Config pluginConfig,
+ List<ElasticsearchSinkState> elasticsearchStates) {
+ this.context = context;
+
+ IndexInfo indexInfo = new IndexInfo(pluginConfig);
+ initRestClient(pluginConfig);
+ ElasticsearchVersion elasticsearchVersion = ElasticsearchVersion.get(EsRestClient.getClusterVersion());
+ this.seaTunnelRowSerializer = new ElasticsearchRowSerializer(elasticsearchVersion, indexInfo, seaTunnelRowType);
+
+ this.requestEsList = new ArrayList<>(BulkConfig.MAX_BATCH_SIZE);
+ }
+
+ private void initRestClient(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig) {
+ List<String> hosts = pluginConfig.getStringList(SinkConfig.HOSTS);
+ String username = null;
+ String password = null;
+ if (pluginConfig.hasPath(SinkConfig.USERNAME)) {
+ username = pluginConfig.getString(SinkConfig.USERNAME);
+ if (pluginConfig.hasPath(SinkConfig.PASSWORD)) {
+ password = pluginConfig.getString(SinkConfig.PASSWORD);
+ }
+ }
+ esRestClient = EsRestClient.getInstance(hosts, username, password);
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) {
+ String indexRequestRow = seaTunnelRowSerializer.serializeRow(element);
+ requestEsList.add(indexRequestRow);
+ if (requestEsList.size() >= BulkConfig.MAX_BATCH_SIZE) {
+ bulkEsWithRetry(this.esRestClient, this.requestEsList, BulkConfig.MAX_RETRY_SIZE);
+ requestEsList.clear();
+ }
+ }
+
+ @Override
+ public Optional<ElasticsearchCommitInfo> prepareCommit() {
+ return Optional.empty();
+ }
+
+ @Override
+ public void abortPrepare() {
+ }
+
+ public void bulkEsWithRetry(EsRestClient esRestClient, List<String> requestEsList, int maxRetry) {
+ for (int tryCnt = 1; tryCnt <= maxRetry; tryCnt++) {
+ if (requestEsList.size() > 0) {
+ String requestBody = String.join("\n", requestEsList) + "\n";
+ try {
+ BulkResponse bulkResponse = esRestClient.bulk(requestBody);
+ if (!bulkResponse.isErrors()) {
+ break;
+ }
+ } catch (Exception ex) {
+ if (tryCnt == maxRetry) {
+ throw new BulkElasticsearchException("bulk es error,try count=%d", ex);
+ }
+ LOGGER.warn(String.format("bulk es error,try count=%d", tryCnt), ex);
+ }
+
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ bulkEsWithRetry(this.esRestClient, this.requestEsList, BulkConfig.MAX_RETRY_SIZE);
+ esRestClient.close();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchAggregatedCommitInfo.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchAggregatedCommitInfo.java
new file mode 100644
index 000000000..6a0057b2b
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchAggregatedCommitInfo.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.state;
+
+import java.io.Serializable;
+
+/**
+ * Right now, we don't need aggregated commit in kafka.
+ * Todo: we need to add a default implementation of this state.
+ */
+public class ElasticsearchAggregatedCommitInfo implements Serializable {
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchCommitInfo.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchCommitInfo.java
new file mode 100644
index 000000000..2fea90d31
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchCommitInfo.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.state;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+@Data
+@AllArgsConstructor
+public class ElasticsearchCommitInfo implements Serializable {
+
+ private final String transactionId;
+ private final Properties kafkaProperties;
+ private final long producerId;
+ private final short epoch;
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchSinkState.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchSinkState.java
new file mode 100644
index 000000000..7cd01c65d
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/state/ElasticsearchSinkState.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.state;
+
+import java.io.Serializable;
+
+public class ElasticsearchSinkState implements Serializable {
+
+}
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/util/RegexUtils.java b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/util/RegexUtils.java
new file mode 100644
index 000000000..9ccc413ff
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/util/RegexUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+
+import java.util.regex.Pattern;
+
+public class RegexUtils {
+
+ public static List<String> extractDatas(String content, String regex) {
+ List<String> datas = new ArrayList<>();
+ Pattern pattern = Pattern.compile(regex, Pattern.DOTALL);
+ Matcher matcher = pattern.matcher(content);
+ while (matcher.find()) {
+ String result = matcher.group(1);
+ datas.add(result);
+ }
+ return datas;
+ }
+
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 60fd7db0e..083bae888 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -47,6 +47,7 @@
<module>connector-kudu</module>
<module>connector-email</module>
<module>connector-dingtalk</module>
+ <module>connector-elasticsearch</module>
</modules>
<dependencies>