You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/04/08 04:41:12 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector] Re-add support for elasticsearch 6.x in source code. (#1663)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3c10987e [Feature][Connector] Re-add support for elasticsearch 6.x in source code. (#1663)
3c10987e is described below

commit 3c10987e7759cf6cba553e4fbe6463118940b694
Author: TrickyZerg <32...@users.noreply.github.com>
AuthorDate: Fri Apr 8 12:41:06 2022 +0800

    [Feature][Connector] Re-add support for elasticsearch 6.x in source code. (#1663)
    
    * Re-add support for elasticsearch 6.x.
    
    * fix check style
    
    * Update version to 2.1.1-SNAPSHOT
    
    * move elasticsearch properties to root pom
    
    * modify Elasticsearch.mdx doc
---
 docs/en/connector/sink/Elasticsearch.mdx           |  3 +-
 pom.xml                                            | 14 ++++----
 .../seatunnel-connectors-flink/pom.xml             |  3 +-
 .../pom.xml                                        | 11 +++---
 .../flink/elasticsearch6}/config/Config.java       |  2 +-
 .../flink/elasticsearch6/sink/Elasticsearch6.java} | 41 +++++++++-------------
 .../sink/ElasticsearchOutputFormat.java            |  8 ++---
 .../org.apache.seatunnel.flink.BaseFlinkSink       |  2 +-
 .../pom.xml                                        |  3 +-
 .../flink/elasticsearch/config/Config.java         |  5 ---
 .../flink/elasticsearch/sink/Elasticsearch.java    | 15 ++------
 .../sink/ElasticsearchOutputFormat.java            |  2 +-
 .../org.apache.seatunnel.flink.BaseFlinkSink       |  0
 seatunnel-core/seatunnel-core-flink/pom.xml        |  2 +-
 14 files changed, 46 insertions(+), 65 deletions(-)

diff --git a/docs/en/connector/sink/Elasticsearch.mdx b/docs/en/connector/sink/Elasticsearch.mdx
index a6314619..759d8894 100644
--- a/docs/en/connector/sink/Elasticsearch.mdx
+++ b/docs/en/connector/sink/Elasticsearch.mdx
@@ -12,7 +12,8 @@ Output data to `Elasticsearch`.
 Engine Supported and plugin name
 
 * [x] Spark: Elasticsearch(supported `ElasticSearch version is >= 2.x and <7.0.0`)
-* [x] Flink: Elasticsearch(supported `ElasticSearch version is >= 2.x and <8.0.0`)
+* [x] Flink: Elasticsearch(supported `ElasticSearch version = 7.x`, if you want use Elasticsearch version is 6.x,
+please use the source code to modify the dependencies corresponding to `seatunnel/seatunnel-connectors/seatunnel-connectors-flink/pom.xml` and repackage)
 
 :::
 
diff --git a/pom.xml b/pom.xml
index e6bcdc26..dc44bca6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -121,9 +121,9 @@
         <play-mailer.version>7.0.2</play-mailer.version>
         <phoenix-spark.version>5.0.0-HBase-2.0</phoenix-spark.version>
         <zkclient.version>0.3</zkclient.version>
+        <elasticsearch7.version>7.5.1</elasticsearch7.version>
         <flink-shaded-hadoop-2.version>2.7.5-7.0</flink-shaded-hadoop-2.version>
         <parquet-avro.version>1.10.0</parquet-avro.version>
-        <transport.version>7.5.1</transport.version>
         <elasticsearch-spark.version>6.8.3</elasticsearch-spark.version>
         <clickhouse-jdbc.version>0.2</clickhouse-jdbc.version>
         <hbase-spark.version>1.0.0</hbase-spark.version>
@@ -320,12 +320,6 @@
                 <version>${parquet-avro.version}</version>
             </dependency>
 
-            <dependency>
-                <groupId>org.elasticsearch.client</groupId>
-                <artifactId>transport</artifactId>
-                <version>${transport.version}</version>
-            </dependency>
-
             <dependency>
                 <groupId>org.elasticsearch</groupId>
                 <artifactId>elasticsearch-spark-20_${scala.binary.version}</artifactId>
@@ -338,6 +332,12 @@
                 <version>${flink.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
+                <version>${flink.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink/pom.xml
index d7744b90..e5f374b0 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink/pom.xml
@@ -33,7 +33,8 @@
     <modules>
         <module>seatunnel-connector-flink-console</module>
         <module>seatunnel-connector-flink-druid</module>
-        <module>seatunnel-connector-flink-elasticsearch</module>
+        <module>seatunnel-connector-flink-elasticsearch7</module>
+        <!--        <module>seatunnel-connector-flink-elasticsearch6</module>-->
         <module>seatunnel-connector-flink-file</module>
         <module>seatunnel-connector-flink-jdbc</module>
         <module>seatunnel-connector-flink-kafka</module>
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/pom.xml
similarity index 86%
copy from seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/pom.xml
copy to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/pom.xml
index aa982905..e3b98913 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/pom.xml
@@ -23,12 +23,14 @@
     <parent>
         <groupId>org.apache.seatunnel</groupId>
         <artifactId>seatunnel-connectors-flink</artifactId>
-        <version>${revision}</version>
+        <version>2.1.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>seatunnel-connector-flink-elasticsearch</artifactId>
-
+    <artifactId>seatunnel-connector-flink-elasticsearch6</artifactId>
+    <properties>
+        <elasticsearch6.version>6.3.1</elasticsearch6.version>
+    </properties>
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
@@ -51,11 +53,12 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
+            <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
         </dependency>
         <dependency>
             <groupId>org.elasticsearch.client</groupId>
             <artifactId>transport</artifactId>
+            <version>${elasticsearch6.version}</version>
         </dependency>
     </dependencies>
 
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/config/Config.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/config/Config.java
similarity index 96%
copy from seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/config/Config.java
copy to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/config/Config.java
index f19937d2..46723dbc 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/config/Config.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.flink.elasticsearch.config;
+package org.apache.seatunnel.flink.elasticsearch6.config;
 
 /**
  * ElasticSearch sink configuration options
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/Elasticsearch6.java
similarity index 80%
copy from seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
copy to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/Elasticsearch6.java
index 529a0860..a6aca24d 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/Elasticsearch6.java
@@ -15,15 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.flink.elasticsearch.sink;
+package org.apache.seatunnel.flink.elasticsearch6.sink;
 
-import static org.apache.seatunnel.flink.elasticsearch.config.Config.DEFAULT_INDEX;
-import static org.apache.seatunnel.flink.elasticsearch.config.Config.DEFAULT_INDEX_TIME_FORMAT;
-import static org.apache.seatunnel.flink.elasticsearch.config.Config.HOSTS;
-import static org.apache.seatunnel.flink.elasticsearch.config.Config.INDEX;
-import static org.apache.seatunnel.flink.elasticsearch.config.Config.INDEX_TIME_FORMAT;
-import static org.apache.seatunnel.flink.elasticsearch.config.Config.INDEX_TYPE;
-import static org.apache.seatunnel.flink.elasticsearch.config.Config.PARALLELISM;
+import static org.apache.seatunnel.flink.elasticsearch6.config.Config.DEFAULT_INDEX;
+import static org.apache.seatunnel.flink.elasticsearch6.config.Config.DEFAULT_INDEX_TIME_FORMAT;
+import static org.apache.seatunnel.flink.elasticsearch6.config.Config.DEFAULT_INDEX_TYPE;
+import static org.apache.seatunnel.flink.elasticsearch6.config.Config.HOSTS;
+import static org.apache.seatunnel.flink.elasticsearch6.config.Config.INDEX;
+import static org.apache.seatunnel.flink.elasticsearch6.config.Config.INDEX_TIME_FORMAT;
+import static org.apache.seatunnel.flink.elasticsearch6.config.Config.INDEX_TYPE;
+import static org.apache.seatunnel.flink.elasticsearch6.config.Config.PARALLELISM;
 
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
@@ -41,7 +42,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
 import org.apache.flink.types.Row;
 import org.apache.http.HttpHost;
 import org.elasticsearch.action.index.IndexRequest;
@@ -52,7 +53,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class Elasticsearch implements FlinkStreamSink, FlinkBatchSink {
+public class Elasticsearch6 implements FlinkStreamSink, FlinkBatchSink {
 
     private static final long serialVersionUID = 8445868321245456793L;
     private static final int DEFAULT_CONFIG_SIZE = 3;
@@ -79,16 +80,12 @@ public class Elasticsearch implements FlinkStreamSink, FlinkBatchSink {
     public void prepare(FlinkEnvironment env) {
         Map<String, String> configMap = new HashMap<>(DEFAULT_CONFIG_SIZE);
         configMap.put(INDEX, DEFAULT_INDEX);
+        configMap.put(INDEX_TYPE, DEFAULT_INDEX_TYPE);
         configMap.put(INDEX_TIME_FORMAT, DEFAULT_INDEX_TIME_FORMAT);
         Config defaultConfig = ConfigFactory.parseMap(configMap);
         config = config.withFallback(defaultConfig);
     }
 
-    @Override
-    public String getPluginName() {
-        return "ElasticSearch";
-    }
-
     @Override
     public DataStreamSink<Row> outputStream(FlinkEnvironment env, DataStream<Row> dataStream) {
 
@@ -140,15 +137,9 @@ public class Elasticsearch implements FlinkStreamSink, FlinkBatchSink {
             json.put(fieldNames[i], element.getField(i));
         }
 
-        if (config.hasPath(INDEX_TYPE)) {
-            return Requests.indexRequest()
-                    .index(indexName)
-                    .type(config.getString(INDEX_TYPE))
-                    .source(json);
-        } else {
-            return Requests.indexRequest()
-                    .index(indexName)
-                    .source(json);
-        }
+        return Requests.indexRequest()
+                .index(indexName)
+                .type(config.getString(INDEX_TYPE))
+                .source(json);
     }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/ElasticsearchOutputFormat.java
similarity index 95%
copy from seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.java
copy to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/ElasticsearchOutputFormat.java
index 45fdca3f..4687bd12 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/ElasticsearchOutputFormat.java
@@ -15,9 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.flink.elasticsearch.sink;
-
-import static org.apache.seatunnel.flink.elasticsearch.config.Config.HOSTS;
+package org.apache.seatunnel.flink.elasticsearch6.sink;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -46,7 +44,7 @@ public class ElasticsearchOutputFormat<T> extends RichOutputFormat<T> {
     private static final long serialVersionUID = 2048590860723433896L;
     private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchOutputFormat.class);
 
-    private Config config;
+    private final Config config;
 
     private static final String PREFIX = "es.";
 
@@ -63,7 +61,7 @@ public class ElasticsearchOutputFormat<T> extends RichOutputFormat<T> {
 
     @Override
     public void configure(Configuration configuration) {
-        List<String> hosts = config.getStringList(HOSTS);
+        List<String> hosts = config.getStringList(org.apache.seatunnel.flink.elasticsearch6.config.Config.HOSTS);
         Settings.Builder settings = Settings.builder();
 
         config.entrySet().forEach(entry -> {
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
similarity index 92%
copy from seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
copy to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
index e6d1cf22..3e5fe44d 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.seatunnel.flink.elasticsearch.sink.Elasticsearch
+org.apache.seatunnel.flink.elasticsearch6.sink.Elasticsearch6
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/pom.xml
similarity index 94%
rename from seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/pom.xml
rename to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/pom.xml
index aa982905..516acc73 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/pom.xml
@@ -27,7 +27,7 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>seatunnel-connector-flink-elasticsearch</artifactId>
+    <artifactId>seatunnel-connector-flink-elasticsearch7</artifactId>
 
     <dependencies>
         <dependency>
@@ -56,6 +56,7 @@
         <dependency>
             <groupId>org.elasticsearch.client</groupId>
             <artifactId>transport</artifactId>
+            <version>${elasticsearch7.version}</version>
         </dependency>
     </dependencies>
 
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/config/Config.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/config/Config.java
similarity index 94%
rename from seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/config/Config.java
rename to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/config/Config.java
index f19937d2..a4977a57 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/config/Config.java
@@ -51,11 +51,6 @@ public final class Config {
      */
     public static final String HOSTS = "hosts";
 
-    /**
-     * Default index type
-     */
-    public static final String DEFAULT_INDEX_TYPE = "log";
-
     /**
      * Default index name
      */
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
similarity index 93%
rename from seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
rename to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
index 529a0860..34304c7e 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
@@ -22,7 +22,6 @@ import static org.apache.seatunnel.flink.elasticsearch.config.Config.DEFAULT_IND
 import static org.apache.seatunnel.flink.elasticsearch.config.Config.HOSTS;
 import static org.apache.seatunnel.flink.elasticsearch.config.Config.INDEX;
 import static org.apache.seatunnel.flink.elasticsearch.config.Config.INDEX_TIME_FORMAT;
-import static org.apache.seatunnel.flink.elasticsearch.config.Config.INDEX_TYPE;
 import static org.apache.seatunnel.flink.elasticsearch.config.Config.PARALLELISM;
 
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -139,16 +138,8 @@ public class Elasticsearch implements FlinkStreamSink, FlinkBatchSink {
         for (int i = 0; i < elementLen; i++) {
             json.put(fieldNames[i], element.getField(i));
         }
-
-        if (config.hasPath(INDEX_TYPE)) {
-            return Requests.indexRequest()
-                    .index(indexName)
-                    .type(config.getString(INDEX_TYPE))
-                    .source(json);
-        } else {
-            return Requests.indexRequest()
-                    .index(indexName)
-                    .source(json);
-        }
+        return Requests.indexRequest()
+                .index(indexName)
+                .source(json);
     }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.java
similarity index 99%
rename from seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.java
rename to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.java
index 45fdca3f..a55e3842 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.java
@@ -46,7 +46,7 @@ public class ElasticsearchOutputFormat<T> extends RichOutputFormat<T> {
     private static final long serialVersionUID = 2048590860723433896L;
     private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchOutputFormat.class);
 
-    private Config config;
+    private final Config config;
 
     private static final String PREFIX = "es.";
 
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
similarity index 100%
rename from seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
rename to seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
diff --git a/seatunnel-core/seatunnel-core-flink/pom.xml b/seatunnel-core/seatunnel-core-flink/pom.xml
index eb50c30f..4d80e14c 100644
--- a/seatunnel-core/seatunnel-core-flink/pom.xml
+++ b/seatunnel-core/seatunnel-core-flink/pom.xml
@@ -86,7 +86,7 @@
 
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-connector-flink-elasticsearch</artifactId>
+            <artifactId>seatunnel-connector-flink-elasticsearch7</artifactId>
             <version>${project.version}</version>
         </dependency>