You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/03/01 02:37:42 UTC

[GitHub] [rocketmq-connect] zhaohai1299002788 opened a new pull request #2: add ding talk sink

zhaohai1299002788 opened a new pull request #2:
URL: https://github.com/apache/rocketmq-connect/pull/2


   add ding talk sink


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-connect] zhaohai1299002788 commented on a change in pull request #2: [ISSUE #10] Add ding talk sink

Posted by GitBox <gi...@apache.org>.
zhaohai1299002788 commented on a change in pull request #2:
URL: https://github.com/apache/rocketmq-connect/pull/2#discussion_r834883646



##########
File path: connectors/aliyun/rocketmq-connect-dingtalk/src/main/java/org/apache/rocketmq/connect/dingtalk/sink/common/OkHttpUtils.java
##########
@@ -0,0 +1,281 @@
+package org.apache.rocketmq.connect.dingtalk.sink.common;
+
+import com.alibaba.fastjson.JSON;
+import okhttp3.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.security.SecureRandom;
+import java.security.cert.X509Certificate;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+public class OkHttpUtils {
+    private static final Logger log = LoggerFactory.getLogger(OkHttpUtils.class);
+
+    private static volatile OkHttpClient okHttpClient = null;
+    private static volatile Semaphore semaphore = null;
+    private Map<String, String> headerMap;
+    private Map<String, String> paramMap;
+    private String url;
+    private Request.Builder request;
+
+    private OkHttpUtils() {
+        if (okHttpClient == null) {
+            synchronized (OkHttpUtils.class) {
+                if (okHttpClient == null) {
+                    TrustManager[] trustManagers = buildTrustManagers();
+                    okHttpClient = new OkHttpClient.Builder()
+                            .connectTimeout(15, TimeUnit.SECONDS)
+                            .writeTimeout(20, TimeUnit.SECONDS)
+                            .readTimeout(20, TimeUnit.SECONDS)
+                            .sslSocketFactory(createSSLSocketFactory(trustManagers), (X509TrustManager) trustManagers[0])
+                            .hostnameVerifier((hostName, session) -> true)
+                            .retryOnConnectionFailure(true)
+                            .build();
+                    addHeader("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
+                }
+            }
+        }
+    }
+
+    private static Semaphore getSemaphoreInstance() {
+        synchronized (OkHttpUtils.class) {
+            if (semaphore == null) {
+                semaphore = new Semaphore(0);
+            }
+        }
+        return semaphore;
+    }
+
+    public static OkHttpUtils builder() {
+        return new OkHttpUtils();
+    }
+
+    public OkHttpUtils url(String url) {
+        this.url = url;
+        return this;
+    }
+
+    /**
+     * 添加参数
+     *
+     * @param key   参数名
+     * @param value 参数值
+     * @return
+     */
+    public OkHttpUtils addParam(String key, String value) {
+        if (paramMap == null) {
+            paramMap = new LinkedHashMap<>(16);
+        }
+        paramMap.put(key, value);
+        return this;
+    }
+
+    /**
+     * 添加请求头
+     *
+     * @param key   参数名
+     * @param value 参数值
+     * @return
+     */
+    public OkHttpUtils addHeader(String key, String value) {
+        if (headerMap == null) {
+            headerMap = new LinkedHashMap<>(16);
+        }
+        headerMap.put(key, value);
+        return this;
+    }
+
+    public OkHttpUtils get() {
+        request = new Request.Builder().get();
+        StringBuilder urlBuilder = new StringBuilder(url);
+        if (paramMap != null) {
+            urlBuilder.append("?");
+            try {
+                for (Map.Entry<String, String> entry : paramMap.entrySet()) {
+                    urlBuilder.append(URLEncoder.encode(entry.getKey(), "utf-8")).
+                            append("=").
+                            append(URLEncoder.encode(entry.getValue(), "utf-8")).
+                            append("&");
+                }
+            } catch (Exception e) {
+                log.error("OkHttpUtils | get | error => ", e);
+            }
+            urlBuilder.deleteCharAt(urlBuilder.length() - 1);
+        }
+        request.url(urlBuilder.toString());
+        return this;
+    }
+
+    /**
+     * 初始化post方法
+     *
+     * @param isJsonPost true等于json的方式提交数据,类似postman里post方法的raw
+     *                   false等于普通的表单提交
+     * @return
+     */
+    public OkHttpUtils post(boolean isJsonPost) {
+        RequestBody requestBody;
+        if (isJsonPost) {
+            String json = "";
+            if (paramMap != null) {
+                json = JSON.toJSONString(paramMap);
+            }
+            requestBody = RequestBody.create(MediaType.parse("application/json; charset=utf-8"), json);
+        } else {
+            FormBody.Builder formBody = new FormBody.Builder();
+            if (paramMap != null) {
+                paramMap.forEach(formBody::add);
+            }
+            requestBody = formBody.build();
+        }
+        request = new Request.Builder().post(requestBody).url(url);
+        return this;
+    }
+
+    public OkHttpUtils postForStringBody(Object data) {
+        String json = JSON.toJSONString(data);

Review comment:
       已修改

##########
File path: connectors/aliyun/rocketmq-connect-dingtalk/src/main/java/org/apache/rocketmq/connect/dingtalk/sink/DingTalkSinkConnector.java
##########
@@ -0,0 +1,71 @@
+package org.apache.rocketmq.connect.dingtalk.sink;
+
+import org.apache.rocketmq.connect.dingtalk.sink.constant.DingTalkConstant;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.lang3.StringUtils;
+
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DingTalkSinkConnector extends SinkConnector {
+
+    private String webHook;
+
+    private String secretKey;
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs(int maxTasks) {
+        List<KeyValue> taskConfigList = new ArrayList<>(11);
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(DingTalkConstant.WEB_HOOK, webHook);
+        keyValue.put(DingTalkConstant.SECRET_KEY, secretKey);
+        taskConfigList.add(keyValue);
+        return taskConfigList;
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return DingTalkSinkTask.class;
+    }
+
+    @Override
+    public void validate(KeyValue config) {

Review comment:
       已添加




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-connect] 2011shenlin commented on a change in pull request #2: [ISSUE #10] Add ding talk sink

Posted by GitBox <gi...@apache.org>.
2011shenlin commented on a change in pull request #2:
URL: https://github.com/apache/rocketmq-connect/pull/2#discussion_r821296507



##########
File path: connectors/aliyun/rocketmq-connect-dingtalk/src/main/java/com/aliyun/rocketmq/connect/dingtalk/sink/DingTalkSinkConnector.java
##########
@@ -0,0 +1,61 @@
+package com.aliyun.rocketmq.connect.dingtalk.sink;
+
+import com.aliyun.rocketmq.connect.dingtalk.sink.constant.DingTalkConstant;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DingTalkSinkConnector extends SinkConnector {
+
+    private String webHook;
+
+    private String msgType;
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs(int maxTasks) {
+        List<KeyValue> taskConfigList = new ArrayList<>(11);
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(DingTalkConstant.WEB_HOOK, webHook);
+        keyValue.put(DingTalkConstant.MSG_TYPE_CONSTANT, msgType);
+        taskConfigList.add(keyValue);

Review comment:
       If the configuration items of SinkConnector and SinkTask are the same, could return the SinkConnector config directly,Avoid duplicate code between SinkConnector and SinkTask about configuration item extraction

##########
File path: connectors/aliyun/rocketmq-connect-dingtalk/src/main/java/com/aliyun/rocketmq/connect/dingtalk/sink/DingTalkSinkTask.java
##########
@@ -0,0 +1,77 @@
+package com.aliyun.rocketmq.connect.dingtalk.sink;
+
+import com.alibaba.fastjson.JSON;
+import com.aliyun.rocketmq.connect.dingtalk.sink.common.OkHttpUtils;
+import com.aliyun.rocketmq.connect.dingtalk.sink.constant.DingTalkConstant;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DingTalkSinkTask extends SinkTask {
+    private static final Logger log = LoggerFactory.getLogger(DingTalkSinkTask.class);
+
+    private String webHook;
+
+    private String msgType;
+
+    @Override
+    public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
+        try {
+            sinkRecords.forEach(sinkRecord -> {
+                Map<String, Object> objectMap = new HashMap<>();
+                objectMap.put(DingTalkConstant.CONTENT_CONSTANT, sinkRecord.getData());

Review comment:
       connect data needs to support trasform configuration.

##########
File path: connectors/aliyun/rocketmq-connect-dingtalk/src/main/java/com/aliyun/rocketmq/connect/dingtalk/sink/DingTalkSinkTask.java
##########
@@ -0,0 +1,77 @@
+package com.aliyun.rocketmq.connect.dingtalk.sink;
+
+import com.alibaba.fastjson.JSON;
+import com.aliyun.rocketmq.connect.dingtalk.sink.common.OkHttpUtils;
+import com.aliyun.rocketmq.connect.dingtalk.sink.constant.DingTalkConstant;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DingTalkSinkTask extends SinkTask {
+    private static final Logger log = LoggerFactory.getLogger(DingTalkSinkTask.class);
+
+    private String webHook;
+
+    private String msgType;
+
+    @Override
+    public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
+        try {
+            sinkRecords.forEach(sinkRecord -> {
+                Map<String, Object> objectMap = new HashMap<>();
+                objectMap.put(DingTalkConstant.CONTENT_CONSTANT, sinkRecord.getData());
+                OkHttpUtils.builder()
+                        .url(webHook)
+                        .addParam(DingTalkConstant.MSG_TYPE_CONSTANT, msgType)
+                        .addParam(msgType, JSON.toJSONString(objectMap))
+                        .addHeader(DingTalkConstant.CONTENT_TYPE, DingTalkConstant.APPLICATION_JSON_UTF_8_TYPE)
+                        .post(true)
+                        .sync();
+            });
+        } catch (Exception e) {
+            log.error("DingTalkSinkTask | put | error => ", e);
+        }
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        if (StringUtils.isBlank(config.getString(DingTalkConstant.WEB_HOOK))) {
+            throw new RuntimeException("ding talk required parameter is null !");
+        }

Review comment:
       should check the endpoint、param of webhook.

##########
File path: connectors/aliyun/rocketmq-connect-dingtalk/src/main/resources/log4j.properties
##########
@@ -0,0 +1,22 @@
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+
+log4j.logger.org.apache.spark.repl.Main=WARN
+
+# Settings to quiet third party logs that are too verbose
+log4j.logger.org.sparkproject.jetty=WARN
+log4j.logger.org.sparkproject.jetty.util.component.AbstractLifeCycle=ERROR
+log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO

Review comment:
       Remove logback config if not necessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-connect] zhaohai1299002788 commented on a change in pull request #2: [ISSUE #10] Add ding talk sink

Posted by GitBox <gi...@apache.org>.
zhaohai1299002788 commented on a change in pull request #2:
URL: https://github.com/apache/rocketmq-connect/pull/2#discussion_r821388948



##########
File path: connectors/aliyun/rocketmq-connect-dingtalk/src/main/java/com/aliyun/rocketmq/connect/dingtalk/sink/DingTalkSinkConnector.java
##########
@@ -0,0 +1,61 @@
+package com.aliyun.rocketmq.connect.dingtalk.sink;
+
+import com.aliyun.rocketmq.connect.dingtalk.sink.constant.DingTalkConstant;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DingTalkSinkConnector extends SinkConnector {
+
+    private String webHook;
+
+    private String msgType;
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs(int maxTasks) {
+        List<KeyValue> taskConfigList = new ArrayList<>(11);
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(DingTalkConstant.WEB_HOOK, webHook);
+        keyValue.put(DingTalkConstant.MSG_TYPE_CONSTANT, msgType);
+        taskConfigList.add(keyValue);

Review comment:
       delete duplicate code




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-connect] 2011shenlin commented on a change in pull request #2: [ISSUE #10] Add ding talk sink

Posted by GitBox <gi...@apache.org>.
2011shenlin commented on a change in pull request #2:
URL: https://github.com/apache/rocketmq-connect/pull/2#discussion_r833175980



##########
File path: connectors/aliyun/rocketmq-connect-dingtalk/src/main/java/org/apache/rocketmq/connect/dingtalk/sink/common/OkHttpUtils.java
##########
@@ -0,0 +1,281 @@
+package org.apache.rocketmq.connect.dingtalk.sink.common;
+
+import com.alibaba.fastjson.JSON;
+import okhttp3.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.security.SecureRandom;
+import java.security.cert.X509Certificate;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+public class OkHttpUtils {
+    private static final Logger log = LoggerFactory.getLogger(OkHttpUtils.class);
+
+    private static volatile OkHttpClient okHttpClient = null;
+    private static volatile Semaphore semaphore = null;
+    private Map<String, String> headerMap;
+    private Map<String, String> paramMap;
+    private String url;
+    private Request.Builder request;
+
+    private OkHttpUtils() {
+        if (okHttpClient == null) {
+            synchronized (OkHttpUtils.class) {
+                if (okHttpClient == null) {
+                    TrustManager[] trustManagers = buildTrustManagers();
+                    okHttpClient = new OkHttpClient.Builder()
+                            .connectTimeout(15, TimeUnit.SECONDS)
+                            .writeTimeout(20, TimeUnit.SECONDS)
+                            .readTimeout(20, TimeUnit.SECONDS)
+                            .sslSocketFactory(createSSLSocketFactory(trustManagers), (X509TrustManager) trustManagers[0])
+                            .hostnameVerifier((hostName, session) -> true)
+                            .retryOnConnectionFailure(true)
+                            .build();
+                    addHeader("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
+                }
+            }
+        }
+    }
+
+    private static Semaphore getSemaphoreInstance() {
+        synchronized (OkHttpUtils.class) {
+            if (semaphore == null) {
+                semaphore = new Semaphore(0);
+            }
+        }
+        return semaphore;
+    }
+
+    public static OkHttpUtils builder() {
+        return new OkHttpUtils();
+    }
+
+    public OkHttpUtils url(String url) {
+        this.url = url;
+        return this;
+    }
+
+    /**
+     * 添加参数
+     *
+     * @param key   参数名
+     * @param value 参数值
+     * @return
+     */
+    public OkHttpUtils addParam(String key, String value) {
+        if (paramMap == null) {
+            paramMap = new LinkedHashMap<>(16);
+        }
+        paramMap.put(key, value);
+        return this;
+    }
+
+    /**
+     * 添加请求头
+     *
+     * @param key   参数名
+     * @param value 参数值
+     * @return
+     */
+    public OkHttpUtils addHeader(String key, String value) {
+        if (headerMap == null) {
+            headerMap = new LinkedHashMap<>(16);
+        }
+        headerMap.put(key, value);
+        return this;
+    }
+
+    public OkHttpUtils get() {
+        request = new Request.Builder().get();
+        StringBuilder urlBuilder = new StringBuilder(url);
+        if (paramMap != null) {
+            urlBuilder.append("?");
+            try {
+                for (Map.Entry<String, String> entry : paramMap.entrySet()) {
+                    urlBuilder.append(URLEncoder.encode(entry.getKey(), "utf-8")).
+                            append("=").
+                            append(URLEncoder.encode(entry.getValue(), "utf-8")).
+                            append("&");
+                }
+            } catch (Exception e) {
+                log.error("OkHttpUtils | get | error => ", e);
+            }
+            urlBuilder.deleteCharAt(urlBuilder.length() - 1);
+        }
+        request.url(urlBuilder.toString());
+        return this;
+    }
+
+    /**
+     * 初始化post方法
+     *
+     * @param isJsonPost true等于json的方式提交数据,类似postman里post方法的raw
+     *                   false等于普通的表单提交
+     * @return
+     */
+    public OkHttpUtils post(boolean isJsonPost) {
+        RequestBody requestBody;
+        if (isJsonPost) {
+            String json = "";
+            if (paramMap != null) {
+                json = JSON.toJSONString(paramMap);
+            }
+            requestBody = RequestBody.create(MediaType.parse("application/json; charset=utf-8"), json);
+        } else {
+            FormBody.Builder formBody = new FormBody.Builder();
+            if (paramMap != null) {
+                paramMap.forEach(formBody::add);
+            }
+            requestBody = formBody.build();
+        }
+        request = new Request.Builder().post(requestBody).url(url);
+        return this;
+    }
+
+    public OkHttpUtils postForStringBody(Object data) {
+        String json = JSON.toJSONString(data);

Review comment:
       这里只需要原样将数据传递即可,不需要再JSON序列化:
    RequestBody requestBody = RequestBody.create(MediaType.parse("application/json; charset=utf-8"), data);




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-connect] 2011shenlin commented on a change in pull request #2: [ISSUE #10] Add ding talk sink

Posted by GitBox <gi...@apache.org>.
2011shenlin commented on a change in pull request #2:
URL: https://github.com/apache/rocketmq-connect/pull/2#discussion_r833178858



##########
File path: connectors/aliyun/rocketmq-connect-dingtalk/src/main/java/org/apache/rocketmq/connect/dingtalk/sink/DingTalkSinkConnector.java
##########
@@ -0,0 +1,71 @@
+package org.apache.rocketmq.connect.dingtalk.sink;
+
+import org.apache.rocketmq.connect.dingtalk.sink.constant.DingTalkConstant;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.lang3.StringUtils;
+
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DingTalkSinkConnector extends SinkConnector {
+
+    private String webHook;
+
+    private String secretKey;
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs(int maxTasks) {
+        List<KeyValue> taskConfigList = new ArrayList<>(11);
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(DingTalkConstant.WEB_HOOK, webHook);
+        keyValue.put(DingTalkConstant.SECRET_KEY, secretKey);
+        taskConfigList.add(keyValue);
+        return taskConfigList;
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return DingTalkSinkTask.class;
+    }
+
+    @Override
+    public void validate(KeyValue config) {

Review comment:
       这个方法加个单测




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-connect] zhaohai1299002788 commented on a change in pull request #2: [ISSUE #10] Add ding talk sink

Posted by GitBox <gi...@apache.org>.
zhaohai1299002788 commented on a change in pull request #2:
URL: https://github.com/apache/rocketmq-connect/pull/2#discussion_r823218272



##########
File path: connectors/aliyun/rocketmq-connect-dingtalk/src/main/java/com/aliyun/rocketmq/connect/dingtalk/sink/DingTalkSinkTask.java
##########
@@ -0,0 +1,77 @@
+package com.aliyun.rocketmq.connect.dingtalk.sink;
+
+import com.alibaba.fastjson.JSON;
+import com.aliyun.rocketmq.connect.dingtalk.sink.common.OkHttpUtils;
+import com.aliyun.rocketmq.connect.dingtalk.sink.constant.DingTalkConstant;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DingTalkSinkTask extends SinkTask {
+    private static final Logger log = LoggerFactory.getLogger(DingTalkSinkTask.class);
+
+    private String webHook;
+
+    private String msgType;
+
+    @Override
+    public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
+        try {
+            sinkRecords.forEach(sinkRecord -> {
+                Map<String, Object> objectMap = new HashMap<>();
+                objectMap.put(DingTalkConstant.CONTENT_CONSTANT, sinkRecord.getData());
+                OkHttpUtils.builder()
+                        .url(webHook)
+                        .addParam(DingTalkConstant.MSG_TYPE_CONSTANT, msgType)
+                        .addParam(msgType, JSON.toJSONString(objectMap))
+                        .addHeader(DingTalkConstant.CONTENT_TYPE, DingTalkConstant.APPLICATION_JSON_UTF_8_TYPE)
+                        .post(true)
+                        .sync();
+            });
+        } catch (Exception e) {
+            log.error("DingTalkSinkTask | put | error => ", e);
+        }
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        if (StringUtils.isBlank(config.getString(DingTalkConstant.WEB_HOOK))) {
+            throw new RuntimeException("ding talk required parameter is null !");
+        }

Review comment:
       Added check webhook.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-connect] zhaohai1299002788 commented on a change in pull request #2: [ISSUE #10] Add ding talk sink

Posted by GitBox <gi...@apache.org>.
zhaohai1299002788 commented on a change in pull request #2:
URL: https://github.com/apache/rocketmq-connect/pull/2#discussion_r821387252



##########
File path: connectors/aliyun/rocketmq-connect-dingtalk/src/main/resources/log4j.properties
##########
@@ -0,0 +1,22 @@
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+
+log4j.logger.org.apache.spark.repl.Main=WARN
+
+# Settings to quiet third party logs that are too verbose
+log4j.logger.org.sparkproject.jetty=WARN
+log4j.logger.org.sparkproject.jetty.util.component.AbstractLifeCycle=ERROR
+log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO

Review comment:
       delete logback




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-connect] 2011shenlin commented on a change in pull request #2: [ISSUE #10] Add ding talk sink

Posted by GitBox <gi...@apache.org>.
2011shenlin commented on a change in pull request #2:
URL: https://github.com/apache/rocketmq-connect/pull/2#discussion_r833177116



##########
File path: connectors/aliyun/rocketmq-connect-dingtalk/src/main/java/org/apache/rocketmq/connect/dingtalk/sink/common/OkHttpUtils.java
##########
@@ -0,0 +1,281 @@
+package org.apache.rocketmq.connect.dingtalk.sink.common;
+
+import com.alibaba.fastjson.JSON;
+import okhttp3.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.security.SecureRandom;
+import java.security.cert.X509Certificate;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+public class OkHttpUtils {
+    private static final Logger log = LoggerFactory.getLogger(OkHttpUtils.class);
+
+    private static volatile OkHttpClient okHttpClient = null;
+    private static volatile Semaphore semaphore = null;
+    private Map<String, String> headerMap;
+    private Map<String, String> paramMap;
+    private String url;
+    private Request.Builder request;
+
+    private OkHttpUtils() {
+        if (okHttpClient == null) {
+            synchronized (OkHttpUtils.class) {
+                if (okHttpClient == null) {
+                    TrustManager[] trustManagers = buildTrustManagers();
+                    okHttpClient = new OkHttpClient.Builder()
+                            .connectTimeout(15, TimeUnit.SECONDS)
+                            .writeTimeout(20, TimeUnit.SECONDS)
+                            .readTimeout(20, TimeUnit.SECONDS)
+                            .sslSocketFactory(createSSLSocketFactory(trustManagers), (X509TrustManager) trustManagers[0])
+                            .hostnameVerifier((hostName, session) -> true)
+                            .retryOnConnectionFailure(true)
+                            .build();
+                    addHeader("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
+                }
+            }
+        }
+    }
+
+    private static Semaphore getSemaphoreInstance() {
+        synchronized (OkHttpUtils.class) {
+            if (semaphore == null) {
+                semaphore = new Semaphore(0);
+            }
+        }
+        return semaphore;
+    }
+
+    public static OkHttpUtils builder() {
+        return new OkHttpUtils();
+    }
+
+    public OkHttpUtils url(String url) {
+        this.url = url;
+        return this;
+    }
+
+    /**
+     * 添加参数
+     *
+     * @param key   参数名
+     * @param value 参数值
+     * @return
+     */
+    public OkHttpUtils addParam(String key, String value) {
+        if (paramMap == null) {
+            paramMap = new LinkedHashMap<>(16);
+        }
+        paramMap.put(key, value);
+        return this;
+    }
+
+    /**
+     * 添加请求头
+     *
+     * @param key   参数名
+     * @param value 参数值
+     * @return
+     */
+    public OkHttpUtils addHeader(String key, String value) {
+        if (headerMap == null) {
+            headerMap = new LinkedHashMap<>(16);
+        }
+        headerMap.put(key, value);
+        return this;
+    }
+
+    public OkHttpUtils get() {
+        request = new Request.Builder().get();
+        StringBuilder urlBuilder = new StringBuilder(url);
+        if (paramMap != null) {
+            urlBuilder.append("?");
+            try {
+                for (Map.Entry<String, String> entry : paramMap.entrySet()) {
+                    urlBuilder.append(URLEncoder.encode(entry.getKey(), "utf-8")).
+                            append("=").
+                            append(URLEncoder.encode(entry.getValue(), "utf-8")).
+                            append("&");
+                }
+            } catch (Exception e) {
+                log.error("OkHttpUtils | get | error => ", e);
+            }
+            urlBuilder.deleteCharAt(urlBuilder.length() - 1);
+        }
+        request.url(urlBuilder.toString());
+        return this;
+    }
+
+    /**
+     * 初始化post方法
+     *
+     * @param isJsonPost true等于json的方式提交数据,类似postman里post方法的raw
+     *                   false等于普通的表单提交
+     * @return
+     */
+    public OkHttpUtils post(boolean isJsonPost) {
+        RequestBody requestBody;
+        if (isJsonPost) {
+            String json = "";
+            if (paramMap != null) {
+                json = JSON.toJSONString(paramMap);
+            }
+            requestBody = RequestBody.create(MediaType.parse("application/json; charset=utf-8"), json);
+        } else {
+            FormBody.Builder formBody = new FormBody.Builder();
+            if (paramMap != null) {
+                paramMap.forEach(formBody::add);
+            }
+            requestBody = formBody.build();
+        }
+        request = new Request.Builder().post(requestBody).url(url);
+        return this;
+    }
+
+    public OkHttpUtils postForStringBody(Object data) {
+        String json = JSON.toJSONString(data);

Review comment:
       Object使用它默认的ToString 方法




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-connect] zhaohai1299002788 commented on a change in pull request #2: [ISSUE #10] Add ding talk sink

Posted by GitBox <gi...@apache.org>.
zhaohai1299002788 commented on a change in pull request #2:
URL: https://github.com/apache/rocketmq-connect/pull/2#discussion_r823218357



##########
File path: connectors/aliyun/rocketmq-connect-dingtalk/src/main/java/com/aliyun/rocketmq/connect/dingtalk/sink/DingTalkSinkTask.java
##########
@@ -0,0 +1,77 @@
+package com.aliyun.rocketmq.connect.dingtalk.sink;
+
+import com.alibaba.fastjson.JSON;
+import com.aliyun.rocketmq.connect.dingtalk.sink.common.OkHttpUtils;
+import com.aliyun.rocketmq.connect.dingtalk.sink.constant.DingTalkConstant;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DingTalkSinkTask extends SinkTask {
+    private static final Logger log = LoggerFactory.getLogger(DingTalkSinkTask.class);
+
+    private String webHook;
+
+    private String msgType;
+
+    @Override
+    public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
+        try {
+            sinkRecords.forEach(sinkRecord -> {
+                Map<String, Object> objectMap = new HashMap<>();
+                objectMap.put(DingTalkConstant.CONTENT_CONSTANT, sinkRecord.getData());

Review comment:
       Added trasform configuration.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org