You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/02 10:25:21 UTC
[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] Add Dingtalk Sink #2257 (#2285)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 88a26d5a2 [Feature][Connector-V2] Add Dingtalk Sink #2257 (#2285)
88a26d5a2 is described below
commit 88a26d5a29091c891e920ee5eac704423ac4b62e
Author: Coen <pp...@163.com>
AuthorDate: Tue Aug 2 18:25:14 2022 +0800
[Feature][Connector-V2] Add Dingtalk Sink #2257 (#2285)
* [Feature][Connector2] Add Dingtalk Sink #2257
* [Feature][Connector2] Add Dingtalk Sink #2257
---
docs/en/connector-v2/sink/dingtalk.md | 31 ++++++
plugin-mapping.properties | 1 +
seatunnel-connectors-v2-dist/pom.xml | 5 +
seatunnel-connectors-v2/connector-dingtalk/pom.xml | 44 +++++++++
.../connectors/seatunnel/sink/DingTalkSink.java | 80 +++++++++++++++
.../connectors/seatunnel/sink/DingTalkWriter.java | 107 +++++++++++++++++++++
seatunnel-connectors-v2/pom.xml | 1 +
.../seatunnel-flink-connector-v2-example/pom.xml | 5 +
.../flink/v2/SeaTunnelDingTalkApiExample.java | 51 ++++++++++
.../main/resources/examples/fake_to_dingtalk.conf | 57 +++++++++++
10 files changed, 382 insertions(+)
diff --git a/docs/en/connector-v2/sink/dingtalk.md b/docs/en/connector-v2/sink/dingtalk.md
new file mode 100644
index 000000000..eb3ebef9d
--- /dev/null
+++ b/docs/en/connector-v2/sink/dingtalk.md
@@ -0,0 +1,31 @@
+# DingTalk
+
+## Description
+
+A sink plugin which use DingTalk robot send message
+
+## Options
+
+| name | type | required | default value |
+|------------------------------| ---------- | -------- | ------------- |
+| url | string | yes | - |
+| secret | string | yes | - |
+
+### url [string]
+
+DingTalk robot address format is https://oapi.dingtalk.com/robot/send?access_token=XXXXXX(string)
+
+### secret [string]
+
+DingTalk robot secret (string)
+
+## Example
+
+```hocon
+sink {
+ DingTalk {
+ url="https://oapi.dingtalk.com/robot/send?access_token=ec646cccd028d978a7156ceeac5b625ebd94f586ea0743fa501c100007890"
+ secret="SEC093249eef7aa57d4388aa635f678930c63db3d28b2829d5b2903fc1e5c10000"
+ }
+}
+```
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 51b7b9c3e..16b9a0882 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -106,3 +106,4 @@ seatunnel.sink.HdfsFile = connector-file-hadoop
seatunnel.sink.LocalFile = connector-file-local
seatunnel.source.Pulsar = connector-pulsar
seatunnel.source.Hudi = connector-hudi
+seatunnel.sink.DingTalk = connector-dingtalk
diff --git a/seatunnel-connectors-v2-dist/pom.xml b/seatunnel-connectors-v2-dist/pom.xml
index fe37965a1..e8edcd953 100644
--- a/seatunnel-connectors-v2-dist/pom.xml
+++ b/seatunnel-connectors-v2-dist/pom.xml
@@ -96,6 +96,11 @@
<artifactId>connector-hudi</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-dingtalk</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
diff --git a/seatunnel-connectors-v2/connector-dingtalk/pom.xml b/seatunnel-connectors-v2/connector-dingtalk/pom.xml
new file mode 100644
index 000000000..f15aa9aee
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-dingtalk/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>seatunnel-connectors-v2</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>connector-dingtalk</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.aliyun</groupId>
+ <artifactId>alibaba-dingtalk-service-sdk</artifactId>
+ <version>2.0.0</version>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
new file mode 100644
index 000000000..4a8d3cefa
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter.Context;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+
+/**
+ * DingTalk sink class
+ */
+@AutoService(SeaTunnelSink.class)
+public class DingTalkSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+
+ private Config pluginConfig;
+ private SeaTunnelRowType seaTunnelRowType;
+ private final String dtURL = "url";
+ private final String dtSecret = "secret";
+
+ @Override
+ public String getPluginName() {
+ return "DingTalk";
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ if (pluginConfig.getIsNull(dtURL)) {
+ throw new PrepareFailException(getPluginName(), PluginType.SINK,
+ String.format("Config must include column : %s", dtURL));
+ }
+ if (pluginConfig.getIsNull(dtSecret)) {
+ throw new PrepareFailException(getPluginName(), PluginType.SINK,
+ String.format("Config must include column : %s", dtSecret));
+ }
+ this.pluginConfig = pluginConfig;
+ }
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+ return this.seaTunnelRowType;
+ }
+
+ @Override
+ public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(Context context) throws IOException {
+ return new DingTalkWriter(pluginConfig.getString(dtURL), pluginConfig.getString(dtSecret));
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkWriter.java b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkWriter.java
new file mode 100644
index 000000000..ab2b3468c
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkWriter.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import com.dingtalk.api.DefaultDingTalkClient;
+import com.dingtalk.api.request.OapiRobotSendRequest;
+import com.dingtalk.api.response.OapiRobotSendResponse;
+import com.taobao.api.ApiException;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+/**
+ * DingTalk write class
+ */
+public class DingTalkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+ private RobotClient robotClient;
+
+ public DingTalkWriter(String url, String secret) {
+ this.robotClient = new RobotClient(url, secret);
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {
+ robotClient.send(element.toString());
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ private static class RobotClient implements Serializable {
+
+ private String url;
+
+ private String secret;
+
+ private DefaultDingTalkClient client;
+
+ public RobotClient(String url, String secret) {
+ this.url = url;
+ this.secret = secret;
+ }
+
+ public OapiRobotSendResponse send(String message) throws IOException {
+ if (null == client) {
+ client = new DefaultDingTalkClient(getUrl());
+ }
+ OapiRobotSendRequest request = new OapiRobotSendRequest();
+ request.setMsgtype("text");
+ OapiRobotSendRequest.Text text = new OapiRobotSendRequest.Text();
+ text.setContent(message);
+ request.setText(text);
+ try {
+ return this.client.execute(request);
+ } catch (ApiException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public String getUrl() throws IOException {
+ Long timestamp = System.currentTimeMillis();
+ String sign = getSign(timestamp);
+ return url + "×tamp=" + timestamp + "&sign=" + sign;
+ }
+
+ public String getSign(Long timestamp) throws IOException {
+ try {
+ String stringToSign = timestamp + "\n" + secret;
+ Mac mac = Mac.getInstance("HmacSHA256");
+ mac.init(new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "HmacSHA256"));
+ byte[] signData = mac.doFinal(stringToSign.getBytes(StandardCharsets.UTF_8));
+ return URLEncoder.encode(Base64.getEncoder().encodeToString(signData), "UTF-8");
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+}
+
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 42f67df2c..325325cf0 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -44,6 +44,7 @@
<module>connector-file</module>
<module>connector-hudi</module>
<module>connector-assert</module>
+ <module>connector-dingtalk</module>
</modules>
<dependencies>
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
index 215a586f8..28756ccbf 100644
--- a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
@@ -66,6 +66,11 @@
<artifactId>connector-socket</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-dingtalk</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- seatunnel connectors -->
<!--flink-->
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelDingTalkApiExample.java b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelDingTalkApiExample.java
new file mode 100644
index 000000000..d72d70b2a
--- /dev/null
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelDingTalkApiExample.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.example.flink.v2;
+
+import org.apache.seatunnel.core.starter.Seatunnel;
+import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.exception.CommandException;
+import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.starter.flink.command.FlinkCommandBuilder;
+
+import java.io.FileNotFoundException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+
+public class SeaTunnelDingTalkApiExample {
+
+ public static void main(String[] args) throws FileNotFoundException, URISyntaxException, CommandException {
+ String configFile = getTestConfigFile("/examples/fake_to_dingtalk.conf");
+ FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
+ flinkCommandArgs.setConfigFile(configFile);
+ flinkCommandArgs.setCheckConfig(false);
+ flinkCommandArgs.setVariables(null);
+ Command<FlinkCommandArgs> flinkCommand =
+ new FlinkCommandBuilder().buildCommand(flinkCommandArgs);
+ Seatunnel.run(flinkCommand);
+ }
+
+ public static String getTestConfigFile(String configFile) throws FileNotFoundException, URISyntaxException {
+ URL resource = SeaTunnelDingTalkApiExample.class.getResource(configFile);
+ if (resource == null) {
+ throw new FileNotFoundException("Can't find config file: " + configFile);
+ }
+ return Paths.get(resource.toURI()).toString();
+ }
+}
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_dingtalk.conf b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_dingtalk.conf
new file mode 100644
index 000000000..aed134290
--- /dev/null
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_dingtalk.conf
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ #job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ field_name = "name,age"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+ sql {
+ sql = "select name,age from fake"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ DingTalk {
+ url="https://oapi.dingtalk.com/robot/send?access_token=ec646cccd028d978a7156ceeac5b625ebd94f586ea0743fa501c100007890"
+ secret="SEC093249eef7aa57d4388aa635f678930c63db3d28b2829d5b2903fc1e5c10000"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file