You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/02 10:25:21 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] Add Dingtalk Sink #2257 (#2285)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 88a26d5a2 [Feature][Connector-V2] Add Dingtalk Sink #2257 (#2285)
88a26d5a2 is described below

commit 88a26d5a29091c891e920ee5eac704423ac4b62e
Author: Coen <pp...@163.com>
AuthorDate: Tue Aug 2 18:25:14 2022 +0800

    [Feature][Connector-V2] Add Dingtalk Sink #2257 (#2285)
    
    * [Feature][Connector2] Add Dingtalk Sink #2257
    
    * [Feature][Connector2] Add Dingtalk Sink #2257
---
 docs/en/connector-v2/sink/dingtalk.md              |  31 ++++++
 plugin-mapping.properties                          |   1 +
 seatunnel-connectors-v2-dist/pom.xml               |   5 +
 seatunnel-connectors-v2/connector-dingtalk/pom.xml |  44 +++++++++
 .../connectors/seatunnel/sink/DingTalkSink.java    |  80 +++++++++++++++
 .../connectors/seatunnel/sink/DingTalkWriter.java  | 107 +++++++++++++++++++++
 seatunnel-connectors-v2/pom.xml                    |   1 +
 .../seatunnel-flink-connector-v2-example/pom.xml   |   5 +
 .../flink/v2/SeaTunnelDingTalkApiExample.java      |  51 ++++++++++
 .../main/resources/examples/fake_to_dingtalk.conf  |  57 +++++++++++
 10 files changed, 382 insertions(+)

diff --git a/docs/en/connector-v2/sink/dingtalk.md b/docs/en/connector-v2/sink/dingtalk.md
new file mode 100644
index 000000000..eb3ebef9d
--- /dev/null
+++ b/docs/en/connector-v2/sink/dingtalk.md
@@ -0,0 +1,31 @@
+# DingTalk
+
+## Description
+
+A sink plugin which use DingTalk robot send message
+
+## Options
+
+| name                         | type        | required | default value |
+|------------------------------| ----------  | -------- | ------------- |
+| url                            | string      | yes      | -             |
+| secret             | string      | yes       | -             |
+
+### url [string]
+
+DingTalk robot address format is https://oapi.dingtalk.com/robot/send?access_token=XXXXXX(string)
+
+### secret [string]
+
+DingTalk robot secret (string)
+
+## Example
+
+```hocon
+sink {
+ DingTalk {
+  url="https://oapi.dingtalk.com/robot/send?access_token=ec646cccd028d978a7156ceeac5b625ebd94f586ea0743fa501c100007890"
+  secret="SEC093249eef7aa57d4388aa635f678930c63db3d28b2829d5b2903fc1e5c10000"
+ }
+}
+```
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 51b7b9c3e..16b9a0882 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -106,3 +106,4 @@ seatunnel.sink.HdfsFile = connector-file-hadoop
 seatunnel.sink.LocalFile = connector-file-local
 seatunnel.source.Pulsar = connector-pulsar
 seatunnel.source.Hudi = connector-hudi
+seatunnel.sink.DingTalk = connector-dingtalk
diff --git a/seatunnel-connectors-v2-dist/pom.xml b/seatunnel-connectors-v2-dist/pom.xml
index fe37965a1..e8edcd953 100644
--- a/seatunnel-connectors-v2-dist/pom.xml
+++ b/seatunnel-connectors-v2-dist/pom.xml
@@ -96,6 +96,11 @@
             <artifactId>connector-hudi</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-dingtalk</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/seatunnel-connectors-v2/connector-dingtalk/pom.xml b/seatunnel-connectors-v2/connector-dingtalk/pom.xml
new file mode 100644
index 000000000..f15aa9aee
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-dingtalk/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>seatunnel-connectors-v2</artifactId>
+    <groupId>org.apache.seatunnel</groupId>
+    <version>${revision}</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>connector-dingtalk</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.seatunnel</groupId>
+      <artifactId>connector-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.aliyun</groupId>
+      <artifactId>alibaba-dingtalk-service-sdk</artifactId>
+      <version>2.0.0</version>
+    </dependency>
+  </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
new file mode 100644
index 000000000..4a8d3cefa
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter.Context;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+
+/**
+ * DingTalk sink class
+ */
+@AutoService(SeaTunnelSink.class)
+public class DingTalkSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+
+    private Config pluginConfig;
+    private SeaTunnelRowType seaTunnelRowType;
+    private final String dtURL = "url";
+    private final String dtSecret = "secret";
+
+    @Override
+    public String getPluginName() {
+        return "DingTalk";
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        if (pluginConfig.getIsNull(dtURL)) {
+            throw new PrepareFailException(getPluginName(), PluginType.SINK,
+                String.format("Config must include column : %s", dtURL));
+        }
+        if (pluginConfig.getIsNull(dtSecret)) {
+            throw new PrepareFailException(getPluginName(), PluginType.SINK,
+                String.format("Config must include column : %s", dtSecret));
+        }
+        this.pluginConfig = pluginConfig;
+    }
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+        return this.seaTunnelRowType;
+    }
+
+    @Override
+    public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(Context context) throws IOException {
+        return new DingTalkWriter(pluginConfig.getString(dtURL), pluginConfig.getString(dtSecret));
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkWriter.java b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkWriter.java
new file mode 100644
index 000000000..ab2b3468c
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkWriter.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import com.dingtalk.api.DefaultDingTalkClient;
+import com.dingtalk.api.request.OapiRobotSendRequest;
+import com.dingtalk.api.response.OapiRobotSendResponse;
+import com.taobao.api.ApiException;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+/**
+ * DingTalk write class
+ */
+public class DingTalkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private RobotClient robotClient;
+
+    public DingTalkWriter(String url, String secret) {
+        this.robotClient = new RobotClient(url, secret);
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        robotClient.send(element.toString());
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    private static class RobotClient implements Serializable {
+
+        private String url;
+
+        private String secret;
+
+        private DefaultDingTalkClient client;
+
+        public RobotClient(String url, String secret) {
+            this.url = url;
+            this.secret = secret;
+        }
+
+        public OapiRobotSendResponse send(String message) throws IOException {
+            if (null == client) {
+                client = new DefaultDingTalkClient(getUrl());
+            }
+            OapiRobotSendRequest request = new OapiRobotSendRequest();
+            request.setMsgtype("text");
+            OapiRobotSendRequest.Text text = new OapiRobotSendRequest.Text();
+            text.setContent(message);
+            request.setText(text);
+            try {
+                return this.client.execute(request);
+            } catch (ApiException e) {
+                throw new IOException(e);
+            }
+        }
+
+        public String getUrl() throws IOException {
+            Long timestamp = System.currentTimeMillis();
+            String sign = getSign(timestamp);
+            return url + "&timestamp=" + timestamp + "&sign=" + sign;
+        }
+
+        public String getSign(Long timestamp) throws IOException {
+            try {
+                String stringToSign = timestamp + "\n" + secret;
+                Mac mac = Mac.getInstance("HmacSHA256");
+                mac.init(new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "HmacSHA256"));
+                byte[] signData = mac.doFinal(stringToSign.getBytes(StandardCharsets.UTF_8));
+                return URLEncoder.encode(Base64.getEncoder().encodeToString(signData), "UTF-8");
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+        }
+    }
+
+}
+
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 42f67df2c..325325cf0 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -44,6 +44,7 @@
         <module>connector-file</module>
         <module>connector-hudi</module>
         <module>connector-assert</module>
+        <module>connector-dingtalk</module>
     </modules>
 
     <dependencies>
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
index 215a586f8..28756ccbf 100644
--- a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
@@ -66,6 +66,11 @@
             <artifactId>connector-socket</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-dingtalk</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <!--   seatunnel connectors   -->
 
         <!--flink-->
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelDingTalkApiExample.java b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelDingTalkApiExample.java
new file mode 100644
index 000000000..d72d70b2a
--- /dev/null
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelDingTalkApiExample.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.example.flink.v2;
+
+import org.apache.seatunnel.core.starter.Seatunnel;
+import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.exception.CommandException;
+import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.starter.flink.command.FlinkCommandBuilder;
+
+import java.io.FileNotFoundException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+
+public class SeaTunnelDingTalkApiExample {
+
+    public static void main(String[] args) throws FileNotFoundException, URISyntaxException, CommandException {
+        String configFile = getTestConfigFile("/examples/fake_to_dingtalk.conf");
+        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
+        flinkCommandArgs.setConfigFile(configFile);
+        flinkCommandArgs.setCheckConfig(false);
+        flinkCommandArgs.setVariables(null);
+        Command<FlinkCommandArgs> flinkCommand =
+            new FlinkCommandBuilder().buildCommand(flinkCommandArgs);
+        Seatunnel.run(flinkCommand);
+    }
+
+    public static String getTestConfigFile(String configFile) throws FileNotFoundException, URISyntaxException {
+        URL resource = SeaTunnelDingTalkApiExample.class.getResource(configFile);
+        if (resource == null) {
+            throw new FileNotFoundException("Can't find config file: " + configFile);
+        }
+        return Paths.get(resource.toURI()).toString();
+    }
+}
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_dingtalk.conf b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_dingtalk.conf
new file mode 100644
index 000000000..aed134290
--- /dev/null
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_dingtalk.conf
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  #job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    FakeSource {
+      result_table_name = "fake"
+      field_name = "name,age"
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+    sql {
+      sql = "select name,age from fake"
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+  DingTalk {
+  url="https://oapi.dingtalk.com/robot/send?access_token=ec646cccd028d978a7156ceeac5b625ebd94f586ea0743fa501c100007890"
+  secret="SEC093249eef7aa57d4388aa635f678930c63db3d28b2829d5b2903fc1e5c10000"
+  }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file