You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/04/08 04:41:12 UTC
[incubator-seatunnel] branch dev updated: [Feature][Connector] Re-add support for elasticsearch 6.x in source code. (#1663)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3c10987e [Feature][Connector] Re-add support for elasticsearch 6.x in source code. (#1663)
3c10987e is described below
commit 3c10987e7759cf6cba553e4fbe6463118940b694
Author: TrickyZerg <32...@users.noreply.github.com>
AuthorDate: Fri Apr 8 12:41:06 2022 +0800
[Feature][Connector] Re-add support for elasticsearch 6.x in source code. (#1663)
* Re-add support for elasticsearch 6.x.
* fix check style
* Update version to 2.1.1-SNAPSHOT
* move elasticsearch properties to root pom
* modify Elasticsearch.mdx doc
---
docs/en/connector/sink/Elasticsearch.mdx | 3 +-
pom.xml | 14 ++++----
.../seatunnel-connectors-flink/pom.xml | 3 +-
.../pom.xml | 11 +++---
.../flink/elasticsearch6}/config/Config.java | 2 +-
.../flink/elasticsearch6/sink/Elasticsearch6.java} | 41 +++++++++-------------
.../sink/ElasticsearchOutputFormat.java | 8 ++---
.../org.apache.seatunnel.flink.BaseFlinkSink | 2 +-
.../pom.xml | 3 +-
.../flink/elasticsearch/config/Config.java | 5 ---
.../flink/elasticsearch/sink/Elasticsearch.java | 15 ++------
.../sink/ElasticsearchOutputFormat.java | 2 +-
.../org.apache.seatunnel.flink.BaseFlinkSink | 0
seatunnel-core/seatunnel-core-flink/pom.xml | 2 +-
14 files changed, 46 insertions(+), 65 deletions(-)
diff --git a/docs/en/connector/sink/Elasticsearch.mdx b/docs/en/connector/sink/Elasticsearch.mdx
index a6314619..759d8894 100644
--- a/docs/en/connector/sink/Elasticsearch.mdx
+++ b/docs/en/connector/sink/Elasticsearch.mdx
@@ -12,7 +12,8 @@ Output data to `Elasticsearch`.
Engine Supported and plugin name
* [x] Spark: Elasticsearch(supported `ElasticSearch version is >= 2.x and <7.0.0`)
-* [x] Flink: Elasticsearch(supported `ElasticSearch version is >= 2.x and <8.0.0`)
+* [x] Flink: Elasticsearch(supported `ElasticSearch version = 7.x`, if you want use Elasticsearch version is 6.x,
+please use the source code to modify the dependencies corresponding to `seatunnel/seatunnel-connectors/seatunnel-connectors-flink/pom.xml` and repackage)
:::
diff --git a/pom.xml b/pom.xml
index e6bcdc26..dc44bca6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -121,9 +121,9 @@
<play-mailer.version>7.0.2</play-mailer.version>
<phoenix-spark.version>5.0.0-HBase-2.0</phoenix-spark.version>
<zkclient.version>0.3</zkclient.version>
+ <elasticsearch7.version>7.5.1</elasticsearch7.version>
<flink-shaded-hadoop-2.version>2.7.5-7.0</flink-shaded-hadoop-2.version>
<parquet-avro.version>1.10.0</parquet-avro.version>
- <transport.version>7.5.1</transport.version>
<elasticsearch-spark.version>6.8.3</elasticsearch-spark.version>
<clickhouse-jdbc.version>0.2</clickhouse-jdbc.version>
<hbase-spark.version>1.0.0</hbase-spark.version>
@@ -320,12 +320,6 @@
<version>${parquet-avro.version}</version>
</dependency>
- <dependency>
- <groupId>org.elasticsearch.client</groupId>
- <artifactId>transport</artifactId>
- <version>${transport.version}</version>
- </dependency>
-
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_${scala.binary.version}</artifactId>
@@ -338,6 +332,12 @@
<version>${flink.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink/pom.xml
index d7744b90..e5f374b0 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink/pom.xml
@@ -33,7 +33,8 @@
<modules>
<module>seatunnel-connector-flink-console</module>
<module>seatunnel-connector-flink-druid</module>
- <module>seatunnel-connector-flink-elasticsearch</module>
+ <module>seatunnel-connector-flink-elasticsearch7</module>
+ <!-- <module>seatunnel-connector-flink-elasticsearch6</module>-->
<module>seatunnel-connector-flink-file</module>
<module>seatunnel-connector-flink-jdbc</module>
<module>seatunnel-connector-flink-kafka</module>
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/pom.xml
similarity index 86%
copy from seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/pom.xml
copy to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/pom.xml
index aa982905..e3b98913 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/pom.xml
@@ -23,12 +23,14 @@
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-connectors-flink</artifactId>
- <version>${revision}</version>
+ <version>2.1.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>seatunnel-connector-flink-elasticsearch</artifactId>
-
+ <artifactId>seatunnel-connector-flink-elasticsearch6</artifactId>
+ <properties>
+ <elasticsearch6.version>6.3.1</elasticsearch6.version>
+ </properties>
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
@@ -51,11 +53,12 @@
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
+ <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
+ <version>${elasticsearch6.version}</version>
</dependency>
</dependencies>
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/config/Config.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/config/Config.java
similarity index 96%
copy from seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/config/Config.java
index f19937d2..46723dbc 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/config/Config.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.flink.elasticsearch.config;
+package org.apache.seatunnel.flink.elasticsearch6.config;
/**
* ElasticSearch sink configuration options
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/Elasticsearch6.java
similarity index 80%
copy from seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
copy to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/Elasticsearch6.java
index 529a0860..a6aca24d 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/Elasticsearch6.java
@@ -15,15 +15,16 @@
* limitations under the License.
*/
-package org.apache.seatunnel.flink.elasticsearch.sink;
+package org.apache.seatunnel.flink.elasticsearch6.sink;
-import static org.apache.seatunnel.flink.elasticsearch.config.Config.DEFAULT_INDEX;
-import static org.apache.seatunnel.flink.elasticsearch.config.Config.DEFAULT_INDEX_TIME_FORMAT;
-import static org.apache.seatunnel.flink.elasticsearch.config.Config.HOSTS;
-import static org.apache.seatunnel.flink.elasticsearch.config.Config.INDEX;
-import static org.apache.seatunnel.flink.elasticsearch.config.Config.INDEX_TIME_FORMAT;
-import static org.apache.seatunnel.flink.elasticsearch.config.Config.INDEX_TYPE;
-import static org.apache.seatunnel.flink.elasticsearch.config.Config.PARALLELISM;
+import static org.apache.seatunnel.flink.elasticsearch6.config.Config.DEFAULT_INDEX;
+import static org.apache.seatunnel.flink.elasticsearch6.config.Config.DEFAULT_INDEX_TIME_FORMAT;
+import static org.apache.seatunnel.flink.elasticsearch6.config.Config.DEFAULT_INDEX_TYPE;
+import static org.apache.seatunnel.flink.elasticsearch6.config.Config.HOSTS;
+import static org.apache.seatunnel.flink.elasticsearch6.config.Config.INDEX;
+import static org.apache.seatunnel.flink.elasticsearch6.config.Config.INDEX_TIME_FORMAT;
+import static org.apache.seatunnel.flink.elasticsearch6.config.Config.INDEX_TYPE;
+import static org.apache.seatunnel.flink.elasticsearch6.config.Config.PARALLELISM;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
@@ -41,7 +42,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.types.Row;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
@@ -52,7 +53,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class Elasticsearch implements FlinkStreamSink, FlinkBatchSink {
+public class Elasticsearch6 implements FlinkStreamSink, FlinkBatchSink {
private static final long serialVersionUID = 8445868321245456793L;
private static final int DEFAULT_CONFIG_SIZE = 3;
@@ -79,16 +80,12 @@ public class Elasticsearch implements FlinkStreamSink, FlinkBatchSink {
public void prepare(FlinkEnvironment env) {
Map<String, String> configMap = new HashMap<>(DEFAULT_CONFIG_SIZE);
configMap.put(INDEX, DEFAULT_INDEX);
+ configMap.put(INDEX_TYPE, DEFAULT_INDEX_TYPE);
configMap.put(INDEX_TIME_FORMAT, DEFAULT_INDEX_TIME_FORMAT);
Config defaultConfig = ConfigFactory.parseMap(configMap);
config = config.withFallback(defaultConfig);
}
- @Override
- public String getPluginName() {
- return "ElasticSearch";
- }
-
@Override
public DataStreamSink<Row> outputStream(FlinkEnvironment env, DataStream<Row> dataStream) {
@@ -140,15 +137,9 @@ public class Elasticsearch implements FlinkStreamSink, FlinkBatchSink {
json.put(fieldNames[i], element.getField(i));
}
- if (config.hasPath(INDEX_TYPE)) {
- return Requests.indexRequest()
- .index(indexName)
- .type(config.getString(INDEX_TYPE))
- .source(json);
- } else {
- return Requests.indexRequest()
- .index(indexName)
- .source(json);
- }
+ return Requests.indexRequest()
+ .index(indexName)
+ .type(config.getString(INDEX_TYPE))
+ .source(json);
}
}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/ElasticsearchOutputFormat.java
similarity index 95%
copy from seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.java
copy to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/ElasticsearchOutputFormat.java
index 45fdca3f..4687bd12 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/ElasticsearchOutputFormat.java
@@ -15,9 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.flink.elasticsearch.sink;
-
-import static org.apache.seatunnel.flink.elasticsearch.config.Config.HOSTS;
+package org.apache.seatunnel.flink.elasticsearch6.sink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -46,7 +44,7 @@ public class ElasticsearchOutputFormat<T> extends RichOutputFormat<T> {
private static final long serialVersionUID = 2048590860723433896L;
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchOutputFormat.class);
- private Config config;
+ private final Config config;
private static final String PREFIX = "es.";
@@ -63,7 +61,7 @@ public class ElasticsearchOutputFormat<T> extends RichOutputFormat<T> {
@Override
public void configure(Configuration configuration) {
- List<String> hosts = config.getStringList(HOSTS);
+ List<String> hosts = config.getStringList(org.apache.seatunnel.flink.elasticsearch6.config.Config.HOSTS);
Settings.Builder settings = Settings.builder();
config.entrySet().forEach(entry -> {
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
similarity index 92%
copy from seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
copy to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
index e6d1cf22..3e5fe44d 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.seatunnel.flink.elasticsearch.sink.Elasticsearch
+org.apache.seatunnel.flink.elasticsearch6.sink.Elasticsearch6
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/pom.xml
similarity index 94%
rename from seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/pom.xml
rename to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/pom.xml
index aa982905..516acc73 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/pom.xml
@@ -27,7 +27,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>seatunnel-connector-flink-elasticsearch</artifactId>
+ <artifactId>seatunnel-connector-flink-elasticsearch7</artifactId>
<dependencies>
<dependency>
@@ -56,6 +56,7 @@
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
+ <version>${elasticsearch7.version}</version>
</dependency>
</dependencies>
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/config/Config.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/config/Config.java
similarity index 94%
rename from seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/config/Config.java
rename to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/config/Config.java
index f19937d2..a4977a57 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/config/Config.java
@@ -51,11 +51,6 @@ public final class Config {
*/
public static final String HOSTS = "hosts";
- /**
- * Default index type
- */
- public static final String DEFAULT_INDEX_TYPE = "log";
-
/**
* Default index name
*/
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
similarity index 93%
rename from seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
rename to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
index 529a0860..34304c7e 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
@@ -22,7 +22,6 @@ import static org.apache.seatunnel.flink.elasticsearch.config.Config.DEFAULT_IND
import static org.apache.seatunnel.flink.elasticsearch.config.Config.HOSTS;
import static org.apache.seatunnel.flink.elasticsearch.config.Config.INDEX;
import static org.apache.seatunnel.flink.elasticsearch.config.Config.INDEX_TIME_FORMAT;
-import static org.apache.seatunnel.flink.elasticsearch.config.Config.INDEX_TYPE;
import static org.apache.seatunnel.flink.elasticsearch.config.Config.PARALLELISM;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -139,16 +138,8 @@ public class Elasticsearch implements FlinkStreamSink, FlinkBatchSink {
for (int i = 0; i < elementLen; i++) {
json.put(fieldNames[i], element.getField(i));
}
-
- if (config.hasPath(INDEX_TYPE)) {
- return Requests.indexRequest()
- .index(indexName)
- .type(config.getString(INDEX_TYPE))
- .source(json);
- } else {
- return Requests.indexRequest()
- .index(indexName)
- .source(json);
- }
+ return Requests.indexRequest()
+ .index(indexName)
+ .source(json);
}
}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.java
similarity index 99%
rename from seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.java
rename to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.java
index 45fdca3f..a55e3842 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.java
@@ -46,7 +46,7 @@ public class ElasticsearchOutputFormat<T> extends RichOutputFormat<T> {
private static final long serialVersionUID = 2048590860723433896L;
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchOutputFormat.class);
- private Config config;
+ private final Config config;
private static final String PREFIX = "es.";
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
similarity index 100%
rename from seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
rename to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
diff --git a/seatunnel-core/seatunnel-core-flink/pom.xml b/seatunnel-core/seatunnel-core-flink/pom.xml
index eb50c30f..4d80e14c 100644
--- a/seatunnel-core/seatunnel-core-flink/pom.xml
+++ b/seatunnel-core/seatunnel-core-flink/pom.xml
@@ -86,7 +86,7 @@
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-connector-flink-elasticsearch</artifactId>
+ <artifactId>seatunnel-connector-flink-elasticsearch7</artifactId>
<version>${project.version}</version>
</dependency>