You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/08/31 05:17:17 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector-V2]Support datahub sink (#2558)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 43600a704 [Feature][Connector-V2]Support datahub sink  (#2558)
43600a704 is described below

commit 43600a70499cf220485cc8a32604c89e464e8c04
Author: chessplay <27...@qq.com>
AuthorDate: Wed Aug 31 13:17:13 2022 +0800

    [Feature][Connector-V2]Support datahub sink  (#2558)
    
    Co-authored-by: zhoulw11 <zh...@chinatelecom.cn>
---
 docs/en/connector-v2/sink/Datahub.md               |  63 +++++++++++
 plugin-mapping.properties                          |   1 +
 pom.xml                                            |   7 ++
 seatunnel-connectors-v2-dist/pom.xml               |   5 +
 seatunnel-connectors-v2/connector-datahub/pom.xml  |  44 ++++++++
 .../connectors/seatunnel/sink/DataHubSink.java     |  91 ++++++++++++++++
 .../connectors/seatunnel/sink/DataHubWriter.java   | 116 +++++++++++++++++++++
 seatunnel-connectors-v2/pom.xml                    |   1 +
 .../bin/start-seatunnel-flink-new-connector.sh     |   2 +-
 .../seatunnel-flink-connector-v2-e2e/pom.xml       |   1 +
 .../flink/v2/datahub/FakeSourceToDatahubIT.java    |  37 +++++++
 .../resources/datahub/fakesource_to_datahub.conf   |  66 ++++++++++++
 .../spark/v2/datahub/FakeSourceToDatahubIT.java    |  37 +++++++
 .../resources/datahub/fakesource_to_datahub.conf   |  66 ++++++++++++
 .../seatunnel-flink-connector-v2-example/pom.xml   |   5 +
 15 files changed, 541 insertions(+), 1 deletion(-)

diff --git a/docs/en/connector-v2/sink/Datahub.md b/docs/en/connector-v2/sink/Datahub.md
new file mode 100644
index 000000000..292944cd5
--- /dev/null
+++ b/docs/en/connector-v2/sink/Datahub.md
@@ -0,0 +1,63 @@
+# Datahub
+
+> Datahub sink connector
+
+## Description
+
+A sink plugin which use send message to datahub
+
+## Options
+
+| name       | type   | required | default value |
+|------------|--------|----------|---------------|
+| endpoint   | string | yes      | -             |
+| accessId   | string | yes      | -             |
+| accessKey  | string | yes      | -             |
+| project    | string | yes      | -             |
+| topic      | string | yes      | -             |
+| timeout    | int    | yes      | -             |
+| retryTimes | int    | yes      | -             |
+
+### url [string]
+
+your datahub endpoint start with http (string)
+
+### accessId [string]
+
+your datahub accessId which cloud be access from Alibaba Cloud  (string)
+
+### accessKey[string]
+
+your datahub accessKey which cloud be access from Alibaba Cloud  (string)
+
+### project [string]
+
+your datahub project which is created in Alibaba Cloud  (string)
+
+### topic [string]
+
+your datahub topic  (string)
+
+### timeout [int]
+
+the max connection timeout (int)
+
+### retryTimes [int]
+
+the max retry times when your client put record failed  (int)
+
+## Example
+
+```hocon
+sink {
+ DataHub {
+  endpoint="yourendpoint"
+  accessId="xxx"
+  accessKey="xxx"
+  project="projectname"
+  topic="topicname"
+  timeout=3000
+  retryTimes=3
+ }
+}
+```
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 7d92053ad..7f06da42d 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -121,4 +121,5 @@ seatunnel.sink.IoTDB = connector-iotdb
 seatunnel.sink.Neo4j = connector-neo4j
 seatunnel.sink.FtpFile = connector-file-ftp
 seatunnel.sink.Socket = connector-socket
+seatunnel.sink.DataHub = connector-datahub
 
diff --git a/pom.xml b/pom.xml
index 6369654fe..6982068de 100644
--- a/pom.xml
+++ b/pom.xml
@@ -231,6 +231,7 @@
         <phoenix.version>5.2.5-HBase-2.x</phoenix.version>
         <awaitility.version>4.2.0</awaitility.version>
         <neo4j-java-driver.version>4.4.9</neo4j-java-driver.version>
+        <datahub.version>2.19.0-public</datahub.version>
     </properties>
 
     <dependencyManagement>
@@ -983,6 +984,12 @@
                 <artifactId>commons-net</artifactId>
                 <version>${commons-net.version}</version>
             </dependency>
+
+            <dependency>
+                <groupId>com.aliyun.datahub</groupId>
+                <artifactId>aliyun-sdk-datahub</artifactId>
+                <version>${datahub.version}</version>
+            </dependency>
         </dependencies>
 
     </dependencyManagement>
diff --git a/seatunnel-connectors-v2-dist/pom.xml b/seatunnel-connectors-v2-dist/pom.xml
index 0b90e13cd..ad63551c2 100644
--- a/seatunnel-connectors-v2-dist/pom.xml
+++ b/seatunnel-connectors-v2-dist/pom.xml
@@ -146,6 +146,11 @@
             <artifactId>connector-neo4j</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-datahub</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/seatunnel-connectors-v2/connector-datahub/pom.xml b/seatunnel-connectors-v2/connector-datahub/pom.xml
new file mode 100644
index 000000000..f928b216c
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-datahub/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>seatunnel-connectors-v2</artifactId>
+    <groupId>org.apache.seatunnel</groupId>
+    <version>${revision}</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>connector-datahub</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.seatunnel</groupId>
+      <artifactId>connector-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.aliyun.datahub</groupId>
+      <artifactId>aliyun-sdk-datahub</artifactId>
+    </dependency>
+
+  </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DataHubSink.java b/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DataHubSink.java
new file mode 100644
index 000000000..bed2a47db
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DataHubSink.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter.Context;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+
+/**
+ * Datahub sink class
+ */
+@AutoService(SeaTunnelSink.class)
+public class DataHubSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+
+    private Config pluginConfig;
+    private SeaTunnelRowType seaTunnelRowType;
+    private final String endpoint = "endpoint";
+    private final String accessId = "accessId";
+    private final String accessKey = "accessKey";
+    private final String project = "project";
+    private final String topic = "topic";
+    private final String timeout = "timeout";
+    private final String retryTimes = "retryTimes";
+
+    @Override
+    public String getPluginName() {
+        return "DataHub";
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
+                endpoint, accessId, accessKey, project, topic);
+        if (!result.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg());
+        }
+        this.pluginConfig = pluginConfig;
+    }
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+        return this.seaTunnelRowType;
+    }
+
+    @Override
+    public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(Context context) throws IOException {
+        return new DataHubWriter(seaTunnelRowType,
+                pluginConfig.getString(endpoint),
+                pluginConfig.getString(accessId),
+                pluginConfig.getString(accessKey),
+                pluginConfig.getString(project),
+                pluginConfig.getString(topic),
+                pluginConfig.getInt(timeout),
+                pluginConfig.getInt(retryTimes));
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DataHubWriter.java b/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DataHubWriter.java
new file mode 100644
index 000000000..93c323b51
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DataHubWriter.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import com.aliyun.datahub.client.DatahubClient;
+import com.aliyun.datahub.client.DatahubClientBuilder;
+import com.aliyun.datahub.client.auth.AliyunAccount;
+import com.aliyun.datahub.client.common.DatahubConfig;
+import com.aliyun.datahub.client.exception.DatahubClientException;
+import com.aliyun.datahub.client.http.HttpConfig;
+import com.aliyun.datahub.client.model.PutRecordsResult;
+import com.aliyun.datahub.client.model.RecordEntry;
+import com.aliyun.datahub.client.model.RecordSchema;
+import com.aliyun.datahub.client.model.TupleRecordData;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * Datahub write class
+ */
+@Slf4j
+public class DataHubWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private DatahubClient dataHubClient;
+    private String project;
+    private String topic;
+    private Integer retryTimes;
+    private SeaTunnelRowType seaTunnelRowType;
+
+    public DataHubWriter(SeaTunnelRowType seaTunnelRowType, String endpoint, String accessId, String accessKey, String project, String topic, Integer timeout, Integer retryTimes) {
+        this.dataHubClient = DatahubClientBuilder.newBuilder()
+                .setDatahubConfig(new DatahubConfig(endpoint,
+                        new AliyunAccount(accessId, accessKey), true))
+                .setHttpConfig(new HttpConfig().setCompressType(HttpConfig.CompressType.LZ4)
+                        .setConnTimeout(timeout))
+                .build();
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.project = project;
+        this.topic = topic;
+        this.retryTimes = retryTimes;
+    }
+
+    @Override
+    public void write(SeaTunnelRow element)  {
+        String[] fieldNames = seaTunnelRowType.getFieldNames();
+        Object[] fields = element.getFields();
+        List<RecordEntry> recordEntries = new ArrayList<>();
+        RecordSchema recordSchema = dataHubClient.getTopic(project, topic).getRecordSchema();
+        for (int i = 0; i < fieldNames.length; i++) {
+            TupleRecordData data = new TupleRecordData(recordSchema);
+            data.setField(fieldNames[i], fields[i]);
+            RecordEntry recordEntry = new RecordEntry();
+            recordEntry.setRecordData(data);
+            recordEntries.add(recordEntry);
+        }
+        try {
+            PutRecordsResult result = dataHubClient.putRecords(project, topic, recordEntries);
+            int failedRecordCount = result.getFailedRecordCount();
+            if (failedRecordCount > 0) {
+                log.info("begin to retry for putting failed record");
+                if (retry(result.getFailedRecords(), retryTimes, project, topic)) {
+                    log.info("retry putting record success");
+                } else {
+                    log.info("retry putting record failed");
+                }
+            } else {
+                log.info("put record success");
+            }
+        } catch (DatahubClientException e) {
+            log.error("requestId:" + e.getRequestId() + "\tmessage:" + e.getErrorMessage());
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        //the client does not need to be closed
+    }
+
+    private boolean retry(List<RecordEntry> records, int retryNums, String project, String topic) {
+        boolean success = false;
+        while (retryNums != 0) {
+            retryNums = retryNums - 1;
+            PutRecordsResult recordsResult = dataHubClient.putRecords(project, topic, records);
+            if (recordsResult.getFailedRecordCount() > 0) {
+                retry(recordsResult.getFailedRecords(), retryNums, project, topic);
+            }
+            success = true;
+            break;
+        }
+        return success;
+    }
+}
+
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index ae54d5232..3d2f3ea7a 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -50,6 +50,7 @@
         <module>connector-elasticsearch</module>
         <module>connector-iotdb</module>
         <module>connector-neo4j</module>
+        <module>connector-datahub</module>
     </modules>
 
     <dependencies>
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-new-connector.sh b/seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-new-connector.sh
index c530e364a..9dc9187d8 100755
--- a/seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-new-connector.sh
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-new-connector.sh
@@ -61,4 +61,4 @@ elif [ ${EXIT_CODE} -eq 0 ]; then
 else
     echo "${CMD}"
     exit ${EXIT_CODE}
-fi
+fi
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index 36df6d0d0..38399d10c 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -53,6 +53,7 @@
             <groupId>org.awaitility</groupId>
             <artifactId>awaitility</artifactId>
         </dependency>
+
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/datahub/FakeSourceToDatahubIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/datahub/FakeSourceToDatahubIT.java
new file mode 100644
index 000000000..4ee66c96b
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/datahub/FakeSourceToDatahubIT.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.datahub;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+@Disabled("Disabled because it needs user's personal datahub account to run this test")
+public class FakeSourceToDatahubIT extends FlinkContainer {
+
+    @Test
+    public void testFakeSourceToDatahub() throws IOException, InterruptedException {
+        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/datahub/fakesource_to_datahub.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/datahub/fakesource_to_datahub.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/datahub/fakesource_to_datahub.conf
new file mode 100644
index 000000000..87eb52fdf
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/datahub/fakesource_to_datahub.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+  FakeSource {
+    result_table_name = "fake"
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
+}
+
+transform {
+    sql {
+      sql = "select name,age from fake"
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+  DataHub {
+      endpoint="xxx"
+      accessId="xxx"
+      accessKey="xxx"
+      project="xxx"
+      topic="xxx"
+      timeout=3000
+      retryTimes=3
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/datahub/FakeSourceToDatahubIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/datahub/FakeSourceToDatahubIT.java
new file mode 100644
index 000000000..3f1518b72
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/datahub/FakeSourceToDatahubIT.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.datahub;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+@Disabled("Disabled because it needs user's personal datahub account to run this test")
+public class FakeSourceToDatahubIT extends SparkContainer {
+
+    @Test
+    public void testFakeSourceToDatahub() throws IOException, InterruptedException {
+        Container.ExecResult execResult = executeSeaTunnelSparkJob("/datahub/fakesource_to_datahub.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/datahub/fakesource_to_datahub.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/datahub/fakesource_to_datahub.conf
new file mode 100644
index 000000000..edc66d6a2
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/datahub/fakesource_to_datahub.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+  FakeSource {
+    result_table_name = "fake"
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
+}
+
+transform {
+    sql {
+      sql = "select name,age from fake"
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+  DataHub {
+    endpoint="xxx"
+    accessId="xxx"
+    accessKey="xxx"
+    project="xxx"
+    topic="xxx"
+    timeout=3000
+    retryTimes=3
+   }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink
+}
\ No newline at end of file
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
index d9eb08874..212646b3d 100644
--- a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
@@ -76,6 +76,11 @@
             <artifactId>connector-dingtalk</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-datahub</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <!--   seatunnel connectors   -->
 
         <!--flink-->