You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/08/31 05:17:17 UTC
[incubator-seatunnel] branch dev updated: [Feature][Connector-V2]Support datahub sink (#2558)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 43600a704 [Feature][Connector-V2]Support datahub sink (#2558)
43600a704 is described below
commit 43600a70499cf220485cc8a32604c89e464e8c04
Author: chessplay <27...@qq.com>
AuthorDate: Wed Aug 31 13:17:13 2022 +0800
[Feature][Connector-V2]Support datahub sink (#2558)
Co-authored-by: zhoulw11 <zh...@chinatelecom.cn>
---
docs/en/connector-v2/sink/Datahub.md | 63 +++++++++++
plugin-mapping.properties | 1 +
pom.xml | 7 ++
seatunnel-connectors-v2-dist/pom.xml | 5 +
seatunnel-connectors-v2/connector-datahub/pom.xml | 44 ++++++++
.../connectors/seatunnel/sink/DataHubSink.java | 91 ++++++++++++++++
.../connectors/seatunnel/sink/DataHubWriter.java | 116 +++++++++++++++++++++
seatunnel-connectors-v2/pom.xml | 1 +
.../bin/start-seatunnel-flink-new-connector.sh | 2 +-
.../seatunnel-flink-connector-v2-e2e/pom.xml | 1 +
.../flink/v2/datahub/FakeSourceToDatahubIT.java | 37 +++++++
.../resources/datahub/fakesource_to_datahub.conf | 66 ++++++++++++
.../spark/v2/datahub/FakeSourceToDatahubIT.java | 37 +++++++
.../resources/datahub/fakesource_to_datahub.conf | 66 ++++++++++++
.../seatunnel-flink-connector-v2-example/pom.xml | 5 +
15 files changed, 541 insertions(+), 1 deletion(-)
diff --git a/docs/en/connector-v2/sink/Datahub.md b/docs/en/connector-v2/sink/Datahub.md
new file mode 100644
index 000000000..292944cd5
--- /dev/null
+++ b/docs/en/connector-v2/sink/Datahub.md
@@ -0,0 +1,63 @@
+# Datahub
+
+> Datahub sink connector
+
+## Description
+
+A sink plugin which use send message to datahub
+
+## Options
+
+| name | type | required | default value |
+|------------|--------|----------|---------------|
+| endpoint | string | yes | - |
+| accessId | string | yes | - |
+| accessKey | string | yes | - |
+| project | string | yes | - |
+| topic | string | yes | - |
+| timeout | int | yes | - |
+| retryTimes | int | yes | - |
+
+### url [string]
+
+your datahub endpoint start with http (string)
+
+### accessId [string]
+
+your datahub accessId which cloud be access from Alibaba Cloud (string)
+
+### accessKey[string]
+
+your datahub accessKey which cloud be access from Alibaba Cloud (string)
+
+### project [string]
+
+your datahub project which is created in Alibaba Cloud (string)
+
+### topic [string]
+
+your datahub topic (string)
+
+### timeout [int]
+
+the max connection timeout (int)
+
+### retryTimes [int]
+
+the max retry times when your client put record failed (int)
+
+## Example
+
+```hocon
+sink {
+ DataHub {
+ endpoint="yourendpoint"
+ accessId="xxx"
+ accessKey="xxx"
+ project="projectname"
+ topic="topicname"
+ timeout=3000
+ retryTimes=3
+ }
+}
+```
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 7d92053ad..7f06da42d 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -121,4 +121,5 @@ seatunnel.sink.IoTDB = connector-iotdb
seatunnel.sink.Neo4j = connector-neo4j
seatunnel.sink.FtpFile = connector-file-ftp
seatunnel.sink.Socket = connector-socket
+seatunnel.sink.DataHub = connector-datahub
diff --git a/pom.xml b/pom.xml
index 6369654fe..6982068de 100644
--- a/pom.xml
+++ b/pom.xml
@@ -231,6 +231,7 @@
<phoenix.version>5.2.5-HBase-2.x</phoenix.version>
<awaitility.version>4.2.0</awaitility.version>
<neo4j-java-driver.version>4.4.9</neo4j-java-driver.version>
+ <datahub.version>2.19.0-public</datahub.version>
</properties>
<dependencyManagement>
@@ -983,6 +984,12 @@
<artifactId>commons-net</artifactId>
<version>${commons-net.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>com.aliyun.datahub</groupId>
+ <artifactId>aliyun-sdk-datahub</artifactId>
+ <version>${datahub.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/seatunnel-connectors-v2-dist/pom.xml b/seatunnel-connectors-v2-dist/pom.xml
index 0b90e13cd..ad63551c2 100644
--- a/seatunnel-connectors-v2-dist/pom.xml
+++ b/seatunnel-connectors-v2-dist/pom.xml
@@ -146,6 +146,11 @@
<artifactId>connector-neo4j</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-datahub</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
diff --git a/seatunnel-connectors-v2/connector-datahub/pom.xml b/seatunnel-connectors-v2/connector-datahub/pom.xml
new file mode 100644
index 000000000..f928b216c
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-datahub/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>seatunnel-connectors-v2</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>connector-datahub</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.aliyun.datahub</groupId>
+ <artifactId>aliyun-sdk-datahub</artifactId>
+ </dependency>
+
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DataHubSink.java b/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DataHubSink.java
new file mode 100644
index 000000000..bed2a47db
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DataHubSink.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter.Context;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+
+/**
+ * Datahub sink class
+ */
+@AutoService(SeaTunnelSink.class)
+public class DataHubSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+
+ private Config pluginConfig;
+ private SeaTunnelRowType seaTunnelRowType;
+ private final String endpoint = "endpoint";
+ private final String accessId = "accessId";
+ private final String accessKey = "accessKey";
+ private final String project = "project";
+ private final String topic = "topic";
+ private final String timeout = "timeout";
+ private final String retryTimes = "retryTimes";
+
+ @Override
+ public String getPluginName() {
+ return "DataHub";
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
+ endpoint, accessId, accessKey, project, topic);
+ if (!result.isSuccess()) {
+ throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg());
+ }
+ this.pluginConfig = pluginConfig;
+ }
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+ return this.seaTunnelRowType;
+ }
+
+ @Override
+ public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(Context context) throws IOException {
+ return new DataHubWriter(seaTunnelRowType,
+ pluginConfig.getString(endpoint),
+ pluginConfig.getString(accessId),
+ pluginConfig.getString(accessKey),
+ pluginConfig.getString(project),
+ pluginConfig.getString(topic),
+ pluginConfig.getInt(timeout),
+ pluginConfig.getInt(retryTimes));
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DataHubWriter.java b/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DataHubWriter.java
new file mode 100644
index 000000000..93c323b51
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DataHubWriter.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import com.aliyun.datahub.client.DatahubClient;
+import com.aliyun.datahub.client.DatahubClientBuilder;
+import com.aliyun.datahub.client.auth.AliyunAccount;
+import com.aliyun.datahub.client.common.DatahubConfig;
+import com.aliyun.datahub.client.exception.DatahubClientException;
+import com.aliyun.datahub.client.http.HttpConfig;
+import com.aliyun.datahub.client.model.PutRecordsResult;
+import com.aliyun.datahub.client.model.RecordEntry;
+import com.aliyun.datahub.client.model.RecordSchema;
+import com.aliyun.datahub.client.model.TupleRecordData;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * Datahub write class
+ */
+@Slf4j
+public class DataHubWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+ private DatahubClient dataHubClient;
+ private String project;
+ private String topic;
+ private Integer retryTimes;
+ private SeaTunnelRowType seaTunnelRowType;
+
+ public DataHubWriter(SeaTunnelRowType seaTunnelRowType, String endpoint, String accessId, String accessKey, String project, String topic, Integer timeout, Integer retryTimes) {
+ this.dataHubClient = DatahubClientBuilder.newBuilder()
+ .setDatahubConfig(new DatahubConfig(endpoint,
+ new AliyunAccount(accessId, accessKey), true))
+ .setHttpConfig(new HttpConfig().setCompressType(HttpConfig.CompressType.LZ4)
+ .setConnTimeout(timeout))
+ .build();
+ this.seaTunnelRowType = seaTunnelRowType;
+ this.project = project;
+ this.topic = topic;
+ this.retryTimes = retryTimes;
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) {
+ String[] fieldNames = seaTunnelRowType.getFieldNames();
+ Object[] fields = element.getFields();
+ List<RecordEntry> recordEntries = new ArrayList<>();
+ RecordSchema recordSchema = dataHubClient.getTopic(project, topic).getRecordSchema();
+ for (int i = 0; i < fieldNames.length; i++) {
+ TupleRecordData data = new TupleRecordData(recordSchema);
+ data.setField(fieldNames[i], fields[i]);
+ RecordEntry recordEntry = new RecordEntry();
+ recordEntry.setRecordData(data);
+ recordEntries.add(recordEntry);
+ }
+ try {
+ PutRecordsResult result = dataHubClient.putRecords(project, topic, recordEntries);
+ int failedRecordCount = result.getFailedRecordCount();
+ if (failedRecordCount > 0) {
+ log.info("begin to retry for putting failed record");
+ if (retry(result.getFailedRecords(), retryTimes, project, topic)) {
+ log.info("retry putting record success");
+ } else {
+ log.info("retry putting record failed");
+ }
+ } else {
+ log.info("put record success");
+ }
+ } catch (DatahubClientException e) {
+ log.error("requestId:" + e.getRequestId() + "\tmessage:" + e.getErrorMessage());
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ //the client does not need to be closed
+ }
+
+ private boolean retry(List<RecordEntry> records, int retryNums, String project, String topic) {
+ boolean success = false;
+ while (retryNums != 0) {
+ retryNums = retryNums - 1;
+ PutRecordsResult recordsResult = dataHubClient.putRecords(project, topic, records);
+ if (recordsResult.getFailedRecordCount() > 0) {
+ retry(recordsResult.getFailedRecords(), retryNums, project, topic);
+ }
+ success = true;
+ break;
+ }
+ return success;
+ }
+}
+
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index ae54d5232..3d2f3ea7a 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -50,6 +50,7 @@
<module>connector-elasticsearch</module>
<module>connector-iotdb</module>
<module>connector-neo4j</module>
+ <module>connector-datahub</module>
</modules>
<dependencies>
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-new-connector.sh b/seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-new-connector.sh
index c530e364a..9dc9187d8 100755
--- a/seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-new-connector.sh
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-new-connector.sh
@@ -61,4 +61,4 @@ elif [ ${EXIT_CODE} -eq 0 ]; then
else
echo "${CMD}"
exit ${EXIT_CODE}
-fi
+fi
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index 36df6d0d0..38399d10c 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -53,6 +53,7 @@
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>
+
</dependencies>
</project>
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/datahub/FakeSourceToDatahubIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/datahub/FakeSourceToDatahubIT.java
new file mode 100644
index 000000000..4ee66c96b
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/datahub/FakeSourceToDatahubIT.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.datahub;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+@Disabled("Disabled because it needs user's personal datahub account to run this test")
+public class FakeSourceToDatahubIT extends FlinkContainer {
+
+ @Test
+ public void testFakeSourceToDatahub() throws IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelFlinkJob("/datahub/fakesource_to_datahub.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/datahub/fakesource_to_datahub.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/datahub/fakesource_to_datahub.conf
new file mode 100644
index 000000000..87eb52fdf
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/datahub/fakesource_to_datahub.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
+}
+
+transform {
+ sql {
+ sql = "select name,age from fake"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+ DataHub {
+ endpoint="xxx"
+ accessId="xxx"
+ accessKey="xxx"
+ project="xxx"
+ topic="xxx"
+ timeout=3000
+ retryTimes=3
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/datahub/FakeSourceToDatahubIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/datahub/FakeSourceToDatahubIT.java
new file mode 100644
index 000000000..3f1518b72
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/datahub/FakeSourceToDatahubIT.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.datahub;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+@Disabled("Disabled because it needs user's personal datahub account to run this test")
+public class FakeSourceToDatahubIT extends SparkContainer {
+
+ @Test
+ public void testFakeSourceToDatahub() throws IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelSparkJob("/datahub/fakesource_to_datahub.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/datahub/fakesource_to_datahub.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/datahub/fakesource_to_datahub.conf
new file mode 100644
index 000000000..edc66d6a2
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/datahub/fakesource_to_datahub.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
+}
+
+transform {
+ sql {
+ sql = "select name,age from fake"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+ DataHub {
+ endpoint="xxx"
+ accessId="xxx"
+ accessKey="xxx"
+ project="xxx"
+ topic="xxx"
+ timeout=3000
+ retryTimes=3
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink
+}
\ No newline at end of file
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
index d9eb08874..212646b3d 100644
--- a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
@@ -76,6 +76,11 @@
<artifactId>connector-dingtalk</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-datahub</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- seatunnel connectors -->
<!--flink-->