You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/09/08 16:56:24 UTC

[GitHub] [incubator-seatunnel] MRYOG opened a new pull request, #2693: [Feature][Connector-V2] Add DingTalk Source

MRYOG opened a new pull request, #2693:
URL: https://github.com/apache/incubator-seatunnel/pull/2693

   Description
   support V2 Connector for DingTalk
   
   Usage Scenario
   use DingTalk API get data


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] MRYOG closed pull request #2693: [Feature][Connector-V2] Add DingTalk Source

Posted by GitBox <gi...@apache.org>.
MRYOG closed pull request #2693: [Feature][Connector-V2] Add DingTalk Source
URL: https://github.com/apache/incubator-seatunnel/pull/2693


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #2693: [Feature][Connector-V2] Add DingTalk Source

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #2693:
URL: https://github.com/apache/incubator-seatunnel/pull/2693#discussion_r971554121


##########
seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/DingTalkParameter.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common;
+
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * @program: incubator-seatunnel

Review Comment:
   Same as above.



##########
seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/DingTalkConstant.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common;
+
+/**
+ * @program: incubator-seatunnel

Review Comment:
   Can you remove `incubator-seatunnel`? In the feature this project name may change to `seatunnel`. So I suggest you remove this line. 



##########
docs/en/connector-v2/source/dingtalk.md:
##########
@@ -0,0 +1,76 @@
+# DingTalk
+
+> DinkTalk source connector
+
+## Description
+
+A source plugin which use DingTalk API
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)

Review Comment:
   Can you describe how this connector implements exactly-once? I can't find the relevant content from the code. If I miss something, please let me know.



##########
seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/DingTalkSourceReader.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.connectors.seatunnel.common.DingTalkConstant;
+import org.apache.seatunnel.connectors.seatunnel.common.DingTalkParameter;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+
+import com.dingtalk.api.DefaultDingTalkClient;
+import com.dingtalk.api.DingTalkClient;
+import com.dingtalk.api.request.OapiV2DepartmentListsubRequest;
+import com.dingtalk.api.response.OapiV2DepartmentListsubResponse;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class DingTalkSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(DingTalkSourceReader.class);
+    protected final SingleSplitReaderContext context;
+    protected final DingTalkParameter dtParameter;
+    protected DingTalkClient dtClient;
+    protected OapiV2DepartmentListsubRequest dtRequest;
+    protected final DeserializationSchema<SeaTunnelRow> deserializationSchema;
+
+    public DingTalkSourceReader(DingTalkParameter dtParameter, SingleSplitReaderContext context, DeserializationSchema<SeaTunnelRow> deserializationSchema) {
+        this.context = context;
+        this.dtParameter = dtParameter;
+        this.deserializationSchema = deserializationSchema;
+    }
+
+    @Override
+    public void open() {
+        dtClient = new DefaultDingTalkClient(dtParameter.getApiClient());
+        dtRequest = new OapiV2DepartmentListsubRequest();
+        LOGGER.info("Ding Talk Access Token is :" + dtParameter.getAccessToken());
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) {
+        try {
+            OapiV2DepartmentListsubResponse response = dtClient.execute(dtRequest, dtParameter.getAccessToken());
+            if (DingTalkConstant.STATUS_OK.equals(response.getErrmsg())) {
+                String tmpContent = response.getBody();
+                JsonNode bodyJson = JsonUtils.stringToJsonNode(tmpContent);
+                JsonNode resJson = bodyJson.get(DingTalkConstant.BODY_RESULT);
+                if (resJson.isArray()) {
+                    for (JsonNode tmpJson : resJson) {
+                        output.collect(new SeaTunnelRow(new Object[]{tmpJson.toString()}));
+                    }
+                }
+            }
+            LOGGER.error("Ding Talk client execute exception, response status code:[{}], content:[{}]", response.getErrorCode(), response.getBody());
+        } catch (Exception e) {
+            LOGGER.error(e.getMessage(), e);
+        } finally {
+            if (Boundedness.BOUNDED.equals(context.getBoundedness())) {

Review Comment:
   It can be seen from this code that the connector supports both batch and stream mode. But from the document I see the `stream` is not been selected. Please check this.



##########
seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/DingTalkSource.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.source;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.DingTalkConstant;
+import org.apache.seatunnel.connectors.seatunnel.common.DingTalkParameter;
+import org.apache.seatunnel.connectors.seatunnel.common.DingTalkUtil;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSource.class)
+public class DingTalkSource extends AbstractSingleSplitSource<SeaTunnelRow> {
+
+    protected final DingTalkParameter dtParameter = new DingTalkParameter();
+    protected SeaTunnelRowType rowType;
+    protected SeaTunnelContext seaTunnelContext;
+    protected DeserializationSchema<SeaTunnelRow> deserializationSchema;
+
+    @Override
+    public String getPluginName() {
+        return "DingTalk";
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ? Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        CheckResult hasClient = CheckConfigUtil.checkAllExists(pluginConfig, DingTalkConstant.API_CLIENT);
+        if (!hasClient.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, hasClient.getMsg());
+        }
+        CheckResult hasToken = CheckConfigUtil.checkAllExists(pluginConfig, DingTalkConstant.ACCESS_TOKEN);
+        if (!hasToken.isSuccess()) {
+            CheckResult hasKey = CheckConfigUtil.checkAllExists(pluginConfig, DingTalkConstant.APP_KEY);

Review Comment:
   `CheckResult hasKey = CheckConfigUtil.checkAllExists(pluginConfig, DingTalkConstant.APP_KEY, DingTalkConstant.APP_SECRET);` will better.



##########
seatunnel-connectors-v2/connector-dingtalk/pom.xml:
##########
@@ -48,6 +49,19 @@
         </exclusion>
       </exclusions>
     </dependency>
+
+    <dependency>
+      <groupId>com.aliyun</groupId>
+      <artifactId>dingtalk</artifactId>
+      <version>1.4.26</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.seatunnel</groupId>
+      <artifactId>connector-http-base</artifactId>
+      <version>2.1.3-SNAPSHOT</version>
+      <scope>compile</scope>

Review Comment:
   compile scope is default value, so you don't need add it.



##########
seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelDingTalkApiExample.java:
##########
@@ -41,6 +45,17 @@ public static void main(String[] args) throws FileNotFoundException, URISyntaxEx
         Seatunnel.run(flinkCommand);
     }
 
+    public static void testSourceDingTalk() throws FileNotFoundException, URISyntaxException, CommandException {
+        String configFile = getTestConfigFile("/examples/fake_source_to_dingtalk.conf");

Review Comment:
   `fake_source_to_dingtalk.conf` rename to `dingtalk_source_to_console.conf` is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] MRYOG commented on a diff in pull request #2693: [Feature][Connector-V2] Add DingTalk Source

Posted by GitBox <gi...@apache.org>.
MRYOG commented on code in PR #2693:
URL: https://github.com/apache/incubator-seatunnel/pull/2693#discussion_r966638552


##########
docs/en/connector-v2/source/dingtalk.md:
##########
@@ -0,0 +1,54 @@
+# DingTalk
+
+> DinkTalk source connector
+
+## Description

Review Comment:
   already add  Key features



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] CalvinKirs commented on pull request #2693: [Feature][Connector-V2] Add DingTalk Source

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on PR #2693:
URL: https://github.com/apache/incubator-seatunnel/pull/2693#issuecomment-1241386693

   hi, could you update your code from upstream?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2693: [Feature][Connector-V2] Add DingTalk Source

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2693:
URL: https://github.com/apache/incubator-seatunnel/pull/2693#discussion_r966560463


##########
docs/en/connector-v2/source/dingtalk.md:
##########
@@ -0,0 +1,54 @@
+# DingTalk
+
+> DinkTalk source connector
+
+## Description

Review Comment:
   Update document style
   Reference #2625



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org