You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/03/02 07:15:17 UTC

[GitHub] [rocketmq-connect] zhaohai1299002788 opened a new pull request #5: add fc sink task

zhaohai1299002788 opened a new pull request #5:
URL: https://github.com/apache/rocketmq-connect/pull/5


   add fc sink task


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-connect] zhaohai1299002788 commented on a change in pull request #5: [ISSUE #8] Add fc sink task

Posted by GitBox <gi...@apache.org>.
zhaohai1299002788 commented on a change in pull request #5:
URL: https://github.com/apache/rocketmq-connect/pull/5#discussion_r821375011



##########
File path: connectors/aliyun/rocketmq-connect-fc/src/main/java/com/aliyun/rocketmq/connect/fc/sink/FcSinkConnector.java
##########
@@ -0,0 +1,82 @@
+package com.aliyun.rocketmq.connect.fc.sink;
+
+import com.aliyun.rocketmq.connect.fc.sink.constant.FcConstant;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.internal.DefaultKeyValue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FcSinkConnector extends SinkConnector {
+
+    private String region;
+
+    private String accessKey;
+
+    private String accessSecretKey;
+
+    private String accountId;
+
+    private String serviceName;
+
+    private String functionName;
+
+    private String invocationType;
+
+    private String qualifier;
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs(int maxTasks) {
+        List<KeyValue> keyValueList = new ArrayList<>(11);
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(FcConstant.REGION_CONSTANT, region);
+        keyValue.put(FcConstant.ACCESS_KEY_CONSTANT, accessKey);
+        keyValue.put(FcConstant.ACCESS_SECRET_KEY_CONSTANT, accessSecretKey);
+        keyValue.put(FcConstant.ACCOUNT_ID_CONSTANT, accountId);
+        keyValue.put(FcConstant.SERVICE_NAME_CONSTANT, serviceName);
+        keyValue.put(FcConstant.FUNCTION_NAME_CONSTANT, functionName);
+        keyValue.put(FcConstant.INVOCATION_TYPE_CONSTANT, invocationType);
+        keyValue.put(FcConstant.QUALIFIER_CONSTANT, qualifier);
+        keyValueList.add(keyValue);
+        return keyValueList;
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return FcSinkTask.class;
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+
+    }
+
+    @Override
+    public void init(KeyValue config) {
+        region = config.getString(FcConstant.REGION_CONSTANT);
+        accessKey = config.getString(FcConstant.ACCESS_KEY_CONSTANT);
+        accessSecretKey = config.getString(FcConstant.ACCESS_SECRET_KEY_CONSTANT);
+        accountId = config.getString(FcConstant.ACCOUNT_ID_CONSTANT);
+        serviceName = config.getString(FcConstant.SERVICE_NAME_CONSTANT);
+        functionName = config.getString(FcConstant.FUNCTION_NAME_CONSTANT);
+        invocationType = config.getString(FcConstant.INVOCATION_TYPE_CONSTANT, null);

Review comment:
       delete duplicate code




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-connect] 2011shenlin commented on a change in pull request #5: [ISSUE #8] Add fc sink task

Posted by GitBox <gi...@apache.org>.
2011shenlin commented on a change in pull request #5:
URL: https://github.com/apache/rocketmq-connect/pull/5#discussion_r833199043



##########
File path: connectors/aliyun/rocketmq-connect-fc/src/main/java/org/apache/rocketmq/connect/fc/sink/FcSinkConnector.java
##########
@@ -0,0 +1,82 @@
+package org.apache.rocketmq.connect.fc.sink;
+
+import org.apache.rocketmq.connect.fc.sink.constant.FcConstant;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.internal.DefaultKeyValue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FcSinkConnector extends SinkConnector {
+
+    private String region;
+
+    private String accessKey;

Review comment:
       使用官方名称:accessKeyId 和accessKeySecret




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-connect] 2011shenlin commented on a change in pull request #5: [ISSUE #8] Add fc sink task

Posted by GitBox <gi...@apache.org>.
2011shenlin commented on a change in pull request #5:
URL: https://github.com/apache/rocketmq-connect/pull/5#discussion_r821289924



##########
File path: connectors/aliyun/rocketmq-connect-fc/src/main/java/com/aliyun/rocketmq/connect/fc/sink/FcSinkConnector.java
##########
@@ -0,0 +1,82 @@
+package com.aliyun.rocketmq.connect.fc.sink;
+
+import com.aliyun.rocketmq.connect.fc.sink.constant.FcConstant;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.internal.DefaultKeyValue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FcSinkConnector extends SinkConnector {
+
+    private String region;
+
+    private String accessKey;
+
+    private String accessSecretKey;
+
+    private String accountId;
+
+    private String serviceName;
+
+    private String functionName;
+
+    private String invocationType;
+
+    private String qualifier;
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs(int maxTasks) {
+        List<KeyValue> keyValueList = new ArrayList<>(11);
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(FcConstant.REGION_CONSTANT, region);
+        keyValue.put(FcConstant.ACCESS_KEY_CONSTANT, accessKey);
+        keyValue.put(FcConstant.ACCESS_SECRET_KEY_CONSTANT, accessSecretKey);
+        keyValue.put(FcConstant.ACCOUNT_ID_CONSTANT, accountId);
+        keyValue.put(FcConstant.SERVICE_NAME_CONSTANT, serviceName);
+        keyValue.put(FcConstant.FUNCTION_NAME_CONSTANT, functionName);
+        keyValue.put(FcConstant.INVOCATION_TYPE_CONSTANT, invocationType);
+        keyValue.put(FcConstant.QUALIFIER_CONSTANT, qualifier);
+        keyValueList.add(keyValue);
+        return keyValueList;
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return FcSinkTask.class;
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+
+    }
+
+    @Override
+    public void init(KeyValue config) {
+        region = config.getString(FcConstant.REGION_CONSTANT);
+        accessKey = config.getString(FcConstant.ACCESS_KEY_CONSTANT);
+        accessSecretKey = config.getString(FcConstant.ACCESS_SECRET_KEY_CONSTANT);
+        accountId = config.getString(FcConstant.ACCOUNT_ID_CONSTANT);
+        serviceName = config.getString(FcConstant.SERVICE_NAME_CONSTANT);
+        functionName = config.getString(FcConstant.FUNCTION_NAME_CONSTANT);
+        invocationType = config.getString(FcConstant.INVOCATION_TYPE_CONSTANT, null);

Review comment:
       If the configuration items of SinkConnector and SinkTask are the same, could return the SinkConnector config directly,Avoid duplicate code between SinkConnector and SinkTask about configuration item extraction

##########
File path: connectors/aliyun/rocketmq-connect-fc/src/main/java/com/aliyun/rocketmq/connect/fc/sink/FcSinkTask.java
##########
@@ -0,0 +1,131 @@
+package com.aliyun.rocketmq.connect.fc.sink;
+
+import com.alibaba.fastjson.JSON;
+import com.aliyun.rocketmq.connect.fc.sink.constant.FcConstant;
+import com.aliyuncs.fc.client.FunctionComputeClient;
+import com.aliyuncs.fc.constants.Const;
+import com.aliyuncs.fc.exceptions.ClientException;
+import com.aliyuncs.fc.request.GetFunctionRequest;
+import com.aliyuncs.fc.request.GetServiceRequest;
+import com.aliyuncs.fc.request.InvokeFunctionRequest;
+import com.aliyuncs.fc.response.InvokeFunctionResponse;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.HttpURLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+public class FcSinkTask extends SinkTask {
+
+    private static final Logger log = LoggerFactory.getLogger(FcSinkTask.class);
+
+    private String region;
+
+    private String accessKey;
+
+    private String accessSecretKey;
+
+    private String accountId;
+
+    private String serviceName;
+
+    private String functionName;
+
+    private String invocationType;
+
+    private String qualifier;
+
+    private FunctionComputeClient functionComputeClient;
+
+    @Override
+    public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
+        try {
+            sinkRecords.forEach(connectRecord -> {
+                InvokeFunctionRequest invokeFunctionRequest = new InvokeFunctionRequest(serviceName, functionName);
+                invokeFunctionRequest.setPayload(JSON.toJSONString(connectRecord.getData()).getBytes(StandardCharsets.UTF_8));
+                if (!StringUtils.isBlank(invocationType)) {
+                    invokeFunctionRequest.setInvocationType(Const.INVOCATION_TYPE_ASYNC);
+                }
+                invokeFunctionRequest.setQualifier(qualifier);
+                InvokeFunctionResponse invokeFunctionResponse = functionComputeClient.invokeFunction(invokeFunctionRequest);
+                if (Const.INVOCATION_TYPE_ASYNC.equals(invocationType)) {
+                    if (HttpURLConnection.HTTP_ACCEPTED == invokeFunctionResponse.getStatus()) {
+                        log.info("Async invocation has been queued for execution, request ID: {}", invokeFunctionResponse.getRequestId());
+                    }else {
+                        log.info("Async invocation was not accepted");
+                    }
+                }
+            });
+        } catch (Exception e) {
+            log.error("FcSinkTask | put | error => ", e);
+        }
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        if (StringUtils.isBlank(config.getString(FcConstant.REGION_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.ACCESS_KEY_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.ACCESS_SECRET_KEY_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.ACCOUNT_ID_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.SERVICE_NAME_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.FUNCTION_NAME_CONSTANT))) {
+            throw new RuntimeException("fc required parameter is null !");
+        }
+        try {

Review comment:
       should check the access key and the secret key efficient.

##########
File path: connectors/aliyun/rocketmq-connect-fc/src/main/java/com/aliyun/rocketmq/connect/fc/sink/FcSinkTask.java
##########
@@ -0,0 +1,131 @@
+package com.aliyun.rocketmq.connect.fc.sink;
+
+import com.alibaba.fastjson.JSON;
+import com.aliyun.rocketmq.connect.fc.sink.constant.FcConstant;
+import com.aliyuncs.fc.client.FunctionComputeClient;
+import com.aliyuncs.fc.constants.Const;
+import com.aliyuncs.fc.exceptions.ClientException;
+import com.aliyuncs.fc.request.GetFunctionRequest;
+import com.aliyuncs.fc.request.GetServiceRequest;
+import com.aliyuncs.fc.request.InvokeFunctionRequest;
+import com.aliyuncs.fc.response.InvokeFunctionResponse;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.HttpURLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+public class FcSinkTask extends SinkTask {
+
+    private static final Logger log = LoggerFactory.getLogger(FcSinkTask.class);
+
+    private String region;
+
+    private String accessKey;
+
+    private String accessSecretKey;
+
+    private String accountId;
+
+    private String serviceName;
+
+    private String functionName;
+
+    private String invocationType;
+
+    private String qualifier;
+
+    private FunctionComputeClient functionComputeClient;
+
+    @Override
+    public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
+        try {
+            sinkRecords.forEach(connectRecord -> {
+                InvokeFunctionRequest invokeFunctionRequest = new InvokeFunctionRequest(serviceName, functionName);
+                invokeFunctionRequest.setPayload(JSON.toJSONString(connectRecord.getData()).getBytes(StandardCharsets.UTF_8));
+                if (!StringUtils.isBlank(invocationType)) {
+                    invokeFunctionRequest.setInvocationType(Const.INVOCATION_TYPE_ASYNC);
+                }
+                invokeFunctionRequest.setQualifier(qualifier);
+                InvokeFunctionResponse invokeFunctionResponse = functionComputeClient.invokeFunction(invokeFunctionRequest);
+                if (Const.INVOCATION_TYPE_ASYNC.equals(invocationType)) {
+                    if (HttpURLConnection.HTTP_ACCEPTED == invokeFunctionResponse.getStatus()) {
+                        log.info("Async invocation has been queued for execution, request ID: {}", invokeFunctionResponse.getRequestId());
+                    }else {
+                        log.info("Async invocation was not accepted");
+                    }
+                }
+            });
+        } catch (Exception e) {
+            log.error("FcSinkTask | put | error => ", e);
+        }
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        if (StringUtils.isBlank(config.getString(FcConstant.REGION_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.ACCESS_KEY_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.ACCESS_SECRET_KEY_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.ACCOUNT_ID_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.SERVICE_NAME_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.FUNCTION_NAME_CONSTANT))) {
+            throw new RuntimeException("fc required parameter is null !");
+        }
+        try {
+            GetServiceRequest getServiceRequest = new GetServiceRequest(config.getString(FcConstant.SERVICE_NAME_CONSTANT));
+            getServiceRequest.setQualifier(config.getString(FcConstant.QUALIFIER_CONSTANT));
+            functionComputeClient.getService(getServiceRequest);
+            GetFunctionRequest getFunctionRequest = new GetFunctionRequest(config.getString(FcConstant.SERVICE_NAME_CONSTANT), config.getString(FcConstant.FUNCTION_NAME_CONSTANT));
+            getFunctionRequest.setQualifier(config.getString(FcConstant.QUALIFIER_CONSTANT));
+            functionComputeClient.getFunction(getFunctionRequest);
+        } catch (ClientException e) {
+            log.error("FcSinkTask | validate | error => ", e);

Review comment:
       should check the serviceName exist, function name exist.

##########
File path: connectors/aliyun/rocketmq-connect-fc/README.md
##########
@@ -0,0 +1,48 @@
+# rocketmq-connect-fc
+* **rocketmq-connect-fc** 说明
+```
+Be responsible for consuming messages from producer and writing data to function calculation FC.
+```
+
+## rocketmq-connect-fc 打包
+```
+mvn clean install -Dmaven.test.skip=true
+```
+
+## rocketmq-connect-fc 启动
+
+* **fc-sink-connector** 启动
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-fc-sink-connector-name}
+?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"com.aliyun.rocketmq.connect.fc.sink.FcSinkConnector",“region”:"${region}",accessKey”:"${accessKey}",accessSecretKey”:"${accessSecretKey}",accountId”:"${accountId}","serviceName":"${serviceName}","functionName":"${functionName}","invocationType":"${invocationType}", "qualifier":"${qualifier}"}
+```
+
+例子 
+```
+http://localhost:8081/connectors/fcConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
+"connector-class":"com.aliyun.rocketmq.connect.fc.sink.FcSinkConnector",“region”:"cn-hangzhou",accessKey”:"xxxx",accessSecretKey”:"xxxx",accountId”:"xxxx","serviceName":"xxxx","functionName":"xxxx","invocationType":"", "qualifier":"LATEST"}
+```
+
+>**注:** `rocketmq-fc-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
+
+## rocketmq-connect-fc 停止
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-fc-connector-name}/stop
+```
+
+## rocketmq-connect-fc 参数说明
+* **fc-sink-connector 参数说明**
+
+|         KEY            |  TYPE   | Must be filled | Description                      | Example
+|------------------------|---------|----------------|----------------------------------|--|
+|region                  | String  | YES            | 地域                               | cn-hangzhou|
+|accessKey               | String  | YES            | 阿里云身份验证,在阿里云用户信息管理控制台获取                    | xxxx |
+|accessSecretKey         | String  | YES            | 阿里云身份验证,在阿里云用户信息管理控制台获取                     | xxx |
+|accountId               | String  | YES            | 阿里云yourAccountId                      | xxxx |
+|serviceName             | String  | YES            | 服务名称 | xxxx |
+|functionName            | String  | YES            | 函数名称 | xxxx |
+|invocationType    | String | NO             | 同步或者异步                           | null |
+|qualifier        | String | NO             | 服务版本和别名                          | LATEST |
+

Review comment:
       FC's Payload needs to support trasform configuration.

##########
File path: connectors/aliyun/rocketmq-connect-fc/src/main/resources/logback.xml
##########
@@ -0,0 +1,90 @@
+<configuration>

Review comment:
       why need logback config?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-connect] zhaohai1299002788 commented on a change in pull request #5: add fc sink task

Posted by GitBox <gi...@apache.org>.
zhaohai1299002788 commented on a change in pull request #5:
URL: https://github.com/apache/rocketmq-connect/pull/5#discussion_r818280660



##########
File path: aliyun/rocketmq-connect-fc/src/main/resources/log4j.properties
##########
@@ -0,0 +1,28 @@
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+
+# Set the default spark-shell log level to WARN. When running the spark-shell, the
+# log level for this class is used to overwrite the root logger's log level, so that

Review comment:
       Irrelevant information deleted




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-connect] zhaohai1299002788 commented on a change in pull request #5: [ISSUE #8] Add fc sink task

Posted by GitBox <gi...@apache.org>.
zhaohai1299002788 commented on a change in pull request #5:
URL: https://github.com/apache/rocketmq-connect/pull/5#discussion_r834900746



##########
File path: connectors/aliyun/rocketmq-connect-fc/src/main/java/org/apache/rocketmq/connect/fc/sink/FcSinkConnector.java
##########
@@ -0,0 +1,82 @@
+package org.apache.rocketmq.connect.fc.sink;
+
+import org.apache.rocketmq.connect.fc.sink.constant.FcConstant;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.internal.DefaultKeyValue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FcSinkConnector extends SinkConnector {
+
+    private String region;
+
+    private String accessKey;

Review comment:
       已处理

##########
File path: connectors/aliyun/rocketmq-connect-fc/src/main/java/org/apache/rocketmq/connect/fc/sink/FcSinkConnector.java
##########
@@ -0,0 +1,82 @@
+package org.apache.rocketmq.connect.fc.sink;
+
+import org.apache.rocketmq.connect.fc.sink.constant.FcConstant;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.internal.DefaultKeyValue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FcSinkConnector extends SinkConnector {
+
+    private String region;

Review comment:
       已处理




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-connect] zhaohai1299002788 commented on a change in pull request #5: [ISSUE #8] Add fc sink task

Posted by GitBox <gi...@apache.org>.
zhaohai1299002788 commented on a change in pull request #5:
URL: https://github.com/apache/rocketmq-connect/pull/5#discussion_r821374106



##########
File path: connectors/aliyun/rocketmq-connect-fc/src/main/java/com/aliyun/rocketmq/connect/fc/sink/FcSinkTask.java
##########
@@ -0,0 +1,131 @@
+package com.aliyun.rocketmq.connect.fc.sink;
+
+import com.alibaba.fastjson.JSON;
+import com.aliyun.rocketmq.connect.fc.sink.constant.FcConstant;
+import com.aliyuncs.fc.client.FunctionComputeClient;
+import com.aliyuncs.fc.constants.Const;
+import com.aliyuncs.fc.exceptions.ClientException;
+import com.aliyuncs.fc.request.GetFunctionRequest;
+import com.aliyuncs.fc.request.GetServiceRequest;
+import com.aliyuncs.fc.request.InvokeFunctionRequest;
+import com.aliyuncs.fc.response.InvokeFunctionResponse;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.HttpURLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+public class FcSinkTask extends SinkTask {
+
+    private static final Logger log = LoggerFactory.getLogger(FcSinkTask.class);
+
+    private String region;
+
+    private String accessKey;
+
+    private String accessSecretKey;
+
+    private String accountId;
+
+    private String serviceName;
+
+    private String functionName;
+
+    private String invocationType;
+
+    private String qualifier;
+
+    private FunctionComputeClient functionComputeClient;
+
+    @Override
+    public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
+        try {
+            sinkRecords.forEach(connectRecord -> {
+                InvokeFunctionRequest invokeFunctionRequest = new InvokeFunctionRequest(serviceName, functionName);
+                invokeFunctionRequest.setPayload(JSON.toJSONString(connectRecord.getData()).getBytes(StandardCharsets.UTF_8));
+                if (!StringUtils.isBlank(invocationType)) {
+                    invokeFunctionRequest.setInvocationType(Const.INVOCATION_TYPE_ASYNC);
+                }
+                invokeFunctionRequest.setQualifier(qualifier);
+                InvokeFunctionResponse invokeFunctionResponse = functionComputeClient.invokeFunction(invokeFunctionRequest);
+                if (Const.INVOCATION_TYPE_ASYNC.equals(invocationType)) {
+                    if (HttpURLConnection.HTTP_ACCEPTED == invokeFunctionResponse.getStatus()) {
+                        log.info("Async invocation has been queued for execution, request ID: {}", invokeFunctionResponse.getRequestId());
+                    }else {
+                        log.info("Async invocation was not accepted");
+                    }
+                }
+            });
+        } catch (Exception e) {
+            log.error("FcSinkTask | put | error => ", e);
+        }
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        if (StringUtils.isBlank(config.getString(FcConstant.REGION_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.ACCESS_KEY_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.ACCESS_SECRET_KEY_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.ACCOUNT_ID_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.SERVICE_NAME_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.FUNCTION_NAME_CONSTANT))) {
+            throw new RuntimeException("fc required parameter is null !");
+        }
+        try {
+            GetServiceRequest getServiceRequest = new GetServiceRequest(config.getString(FcConstant.SERVICE_NAME_CONSTANT));
+            getServiceRequest.setQualifier(config.getString(FcConstant.QUALIFIER_CONSTANT));
+            functionComputeClient.getService(getServiceRequest);
+            GetFunctionRequest getFunctionRequest = new GetFunctionRequest(config.getString(FcConstant.SERVICE_NAME_CONSTANT), config.getString(FcConstant.FUNCTION_NAME_CONSTANT));
+            getFunctionRequest.setQualifier(config.getString(FcConstant.QUALIFIER_CONSTANT));
+            functionComputeClient.getFunction(getFunctionRequest);
+        } catch (ClientException e) {
+            log.error("FcSinkTask | validate | error => ", e);

Review comment:
       This is to check whether the servicename and function name exist. Because there is no exception thrown.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-connect] 2011shenlin commented on a change in pull request #5: [ISSUE #8] Add fc sink task

Posted by GitBox <gi...@apache.org>.
2011shenlin commented on a change in pull request #5:
URL: https://github.com/apache/rocketmq-connect/pull/5#discussion_r821366880



##########
File path: connectors/aliyun/rocketmq-connect-fc/pom.xml
##########
@@ -0,0 +1,211 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>com.aliyun</groupId>

Review comment:
       change groupId  to   org.apache.rocketmq




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-connect] zhaohai1299002788 commented on a change in pull request #5: [ISSUE #8] Add fc sink task

Posted by GitBox <gi...@apache.org>.
zhaohai1299002788 commented on a change in pull request #5:
URL: https://github.com/apache/rocketmq-connect/pull/5#discussion_r821372301



##########
File path: connectors/aliyun/rocketmq-connect-fc/src/main/resources/logback.xml
##########
@@ -0,0 +1,90 @@
+<configuration>

Review comment:
       delete logback




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-connect] zhaohai1299002788 commented on a change in pull request #5: [ISSUE #8] Add fc sink task

Posted by GitBox <gi...@apache.org>.
zhaohai1299002788 commented on a change in pull request #5:
URL: https://github.com/apache/rocketmq-connect/pull/5#discussion_r823217353



##########
File path: connectors/aliyun/rocketmq-connect-fc/README.md
##########
@@ -0,0 +1,48 @@
+# rocketmq-connect-fc
+* **rocketmq-connect-fc** 说明
+```
+Be responsible for consuming messages from producer and writing data to function calculation FC.
+```
+
+## rocketmq-connect-fc 打包
+```
+mvn clean install -Dmaven.test.skip=true
+```
+
+## rocketmq-connect-fc 启动
+
+* **fc-sink-connector** 启动
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-fc-sink-connector-name}
+?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"com.aliyun.rocketmq.connect.fc.sink.FcSinkConnector",“region”:"${region}",accessKey”:"${accessKey}",accessSecretKey”:"${accessSecretKey}",accountId”:"${accountId}","serviceName":"${serviceName}","functionName":"${functionName}","invocationType":"${invocationType}", "qualifier":"${qualifier}"}
+```
+
+例子 
+```
+http://localhost:8081/connectors/fcConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
+"connector-class":"com.aliyun.rocketmq.connect.fc.sink.FcSinkConnector",“region”:"cn-hangzhou",accessKey”:"xxxx",accessSecretKey”:"xxxx",accountId”:"xxxx","serviceName":"xxxx","functionName":"xxxx","invocationType":"", "qualifier":"LATEST"}
+```
+
+>**注:** `rocketmq-fc-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
+
+## rocketmq-connect-fc 停止
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-fc-connector-name}/stop
+```
+
+## rocketmq-connect-fc 参数说明
+* **fc-sink-connector 参数说明**
+
+|         KEY            |  TYPE   | Must be filled | Description                      | Example
+|------------------------|---------|----------------|----------------------------------|--|
+|region                  | String  | YES            | 地域                               | cn-hangzhou|
+|accessKey               | String  | YES            | 阿里云身份验证,在阿里云用户信息管理控制台获取                    | xxxx |
+|accessSecretKey         | String  | YES            | 阿里云身份验证,在阿里云用户信息管理控制台获取                     | xxx |
+|accountId               | String  | YES            | 阿里云yourAccountId                      | xxxx |
+|serviceName             | String  | YES            | 服务名称 | xxxx |
+|functionName            | String  | YES            | 函数名称 | xxxx |
+|invocationType    | String | NO             | 同步或者异步                           | null |
+|qualifier        | String | NO             | 服务版本和别名                          | LATEST |
+

Review comment:
       Added trasform configuration.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-connect] zhouxinyu commented on pull request #5: add fc sink task

Posted by GitBox <gi...@apache.org>.
zhouxinyu commented on pull request #5:
URL: https://github.com/apache/rocketmq-connect/pull/5#issuecomment-1056563981


   It seems that we should move the directory to `connectors` instead of the root?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-connect] zhaohai1299002788 commented on pull request #5: add fc sink task

Posted by GitBox <gi...@apache.org>.
zhaohai1299002788 commented on pull request #5:
URL: https://github.com/apache/rocketmq-connect/pull/5#issuecomment-1057621222


   > 看来我们应该将目录移动到`connectors`而不是根目录?
   
   Modified engineering structure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-connect] zhaohai1299002788 commented on a change in pull request #5: [ISSUE #8] Add fc sink task

Posted by GitBox <gi...@apache.org>.
zhaohai1299002788 commented on a change in pull request #5:
URL: https://github.com/apache/rocketmq-connect/pull/5#discussion_r821443527



##########
File path: connectors/aliyun/rocketmq-connect-fc/pom.xml
##########
@@ -0,0 +1,211 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>com.aliyun</groupId>
+    <artifactId>rocketmq-connect-fc</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+
+    <name>connect-fc</name>
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <slf4j.version>1.7.7</slf4j.version>
+        <logback.version>1.0.13</logback.version>
+        <openmessaging-connector.version>0.1.2-SNAPSHOT</openmessaging-connector.version>
+        <junit.version>4.11</junit.version>
+        <assertj.version>2.6.0</assertj.version>
+        <mockito.version>3.2.4</mockito.version>
+        <openmessaging-connector.version>0.1.2-SNAPSHOT</openmessaging-connector.version>

Review comment:
       Processed

##########
File path: connectors/aliyun/rocketmq-connect-fc/pom.xml
##########
@@ -0,0 +1,211 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>com.aliyun</groupId>

Review comment:
       Processed

##########
File path: connectors/aliyun/rocketmq-connect-fc/src/main/java/com/aliyun/rocketmq/connect/fc/sink/FcSinkConnector.java
##########
@@ -0,0 +1,82 @@
+package com.aliyun.rocketmq.connect.fc.sink;

Review comment:
       Processed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-connect] 2011shenlin commented on a change in pull request #5: [ISSUE #8] Add fc sink task

Posted by GitBox <gi...@apache.org>.
2011shenlin commented on a change in pull request #5:
URL: https://github.com/apache/rocketmq-connect/pull/5#discussion_r833200043



##########
File path: connectors/aliyun/rocketmq-connect-fc/src/main/java/org/apache/rocketmq/connect/fc/sink/FcSinkConnector.java
##########
@@ -0,0 +1,82 @@
+package org.apache.rocketmq.connect.fc.sink;
+
+import org.apache.rocketmq.connect.fc.sink.constant.FcConstant;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.internal.DefaultKeyValue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FcSinkConnector extends SinkConnector {
+
+    private String region;

Review comment:
       改用RegionId

##########
File path: connectors/aliyun/rocketmq-connect-fc/src/main/java/org/apache/rocketmq/connect/fc/sink/FcSinkConnector.java
##########
@@ -0,0 +1,82 @@
+package org.apache.rocketmq.connect.fc.sink;
+
+import org.apache.rocketmq.connect.fc.sink.constant.FcConstant;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.internal.DefaultKeyValue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FcSinkConnector extends SinkConnector {
+
+    private String region;

Review comment:
       改用regionId




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-connect] 2011shenlin commented on a change in pull request #5: [ISSUE #8] Add fc sink task

Posted by GitBox <gi...@apache.org>.
2011shenlin commented on a change in pull request #5:
URL: https://github.com/apache/rocketmq-connect/pull/5#discussion_r821365252



##########
File path: connectors/aliyun/rocketmq-connect-fc/src/main/java/com/aliyun/rocketmq/connect/fc/sink/FcSinkConnector.java
##########
@@ -0,0 +1,82 @@
+package com.aliyun.rocketmq.connect.fc.sink;

Review comment:
       change package to org.apache.rocketmq.connect.xxx




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-connect] 2011shenlin commented on a change in pull request #5: [ISSUE #8] Add fc sink task

Posted by GitBox <gi...@apache.org>.
2011shenlin commented on a change in pull request #5:
URL: https://github.com/apache/rocketmq-connect/pull/5#discussion_r821367283



##########
File path: connectors/aliyun/rocketmq-connect-fc/pom.xml
##########
@@ -0,0 +1,211 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>com.aliyun</groupId>
+    <artifactId>rocketmq-connect-fc</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+
+    <name>connect-fc</name>
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <slf4j.version>1.7.7</slf4j.version>
+        <logback.version>1.0.13</logback.version>
+        <openmessaging-connector.version>0.1.2-SNAPSHOT</openmessaging-connector.version>
+        <junit.version>4.11</junit.version>
+        <assertj.version>2.6.0</assertj.version>
+        <mockito.version>3.2.4</mockito.version>
+        <openmessaging-connector.version>0.1.2-SNAPSHOT</openmessaging-connector.version>

Review comment:
       <openmessaging-connector.version>0.1.2-SNAPSHOT</openmessaging-connector.version>
   
   duplicate




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-connect] zhaohai1299002788 commented on a change in pull request #5: [ISSUE #8] Add fc sink task

Posted by GitBox <gi...@apache.org>.
zhaohai1299002788 commented on a change in pull request #5:
URL: https://github.com/apache/rocketmq-connect/pull/5#discussion_r821390922



##########
File path: connectors/aliyun/rocketmq-connect-fc/src/main/java/com/aliyun/rocketmq/connect/fc/sink/FcSinkTask.java
##########
@@ -0,0 +1,131 @@
+package com.aliyun.rocketmq.connect.fc.sink;
+
+import com.alibaba.fastjson.JSON;
+import com.aliyun.rocketmq.connect.fc.sink.constant.FcConstant;
+import com.aliyuncs.fc.client.FunctionComputeClient;
+import com.aliyuncs.fc.constants.Const;
+import com.aliyuncs.fc.exceptions.ClientException;
+import com.aliyuncs.fc.request.GetFunctionRequest;
+import com.aliyuncs.fc.request.GetServiceRequest;
+import com.aliyuncs.fc.request.InvokeFunctionRequest;
+import com.aliyuncs.fc.response.InvokeFunctionResponse;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.HttpURLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+public class FcSinkTask extends SinkTask {
+
+    private static final Logger log = LoggerFactory.getLogger(FcSinkTask.class);
+
+    private String region;
+
+    private String accessKey;
+
+    private String accessSecretKey;
+
+    private String accountId;
+
+    private String serviceName;
+
+    private String functionName;
+
+    private String invocationType;
+
+    private String qualifier;
+
+    private FunctionComputeClient functionComputeClient;
+
+    @Override
+    public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
+        try {
+            sinkRecords.forEach(connectRecord -> {
+                InvokeFunctionRequest invokeFunctionRequest = new InvokeFunctionRequest(serviceName, functionName);
+                invokeFunctionRequest.setPayload(JSON.toJSONString(connectRecord.getData()).getBytes(StandardCharsets.UTF_8));
+                if (!StringUtils.isBlank(invocationType)) {
+                    invokeFunctionRequest.setInvocationType(Const.INVOCATION_TYPE_ASYNC);
+                }
+                invokeFunctionRequest.setQualifier(qualifier);
+                InvokeFunctionResponse invokeFunctionResponse = functionComputeClient.invokeFunction(invokeFunctionRequest);
+                if (Const.INVOCATION_TYPE_ASYNC.equals(invocationType)) {
+                    if (HttpURLConnection.HTTP_ACCEPTED == invokeFunctionResponse.getStatus()) {
+                        log.info("Async invocation has been queued for execution, request ID: {}", invokeFunctionResponse.getRequestId());
+                    }else {
+                        log.info("Async invocation was not accepted");
+                    }
+                }
+            });
+        } catch (Exception e) {
+            log.error("FcSinkTask | put | error => ", e);
+        }
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        if (StringUtils.isBlank(config.getString(FcConstant.REGION_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.ACCESS_KEY_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.ACCESS_SECRET_KEY_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.ACCOUNT_ID_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.SERVICE_NAME_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.FUNCTION_NAME_CONSTANT))) {
+            throw new RuntimeException("fc required parameter is null !");
+        }
+        try {

Review comment:
       Added queue name check, and checked the legitimacy of access key and secret key.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-connect] zhouxinyu commented on a change in pull request #5: add fc sink task

Posted by GitBox <gi...@apache.org>.
zhouxinyu commented on a change in pull request #5:
URL: https://github.com/apache/rocketmq-connect/pull/5#discussion_r817453656



##########
File path: aliyun/rocketmq-connect-fc/src/main/resources/log4j.properties
##########
@@ -0,0 +1,28 @@
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+
+# Set the default spark-shell log level to WARN. When running the spark-shell, the
+# log level for this class is used to overwrite the root logger's log level, so that

Review comment:
       Why there are spark-related configs?

##########
File path: aliyun/rocketmq-connect-fc/src/test/java/com/aliyun/rocketmq/connect/fc/sink/FcSinkConnectorTest.java
##########
@@ -0,0 +1,14 @@
+package com.aliyun.rocketmq.connect.fc.sink;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FcSinkConnectorTest {
+
+    private final FcSinkConnector fcSinkConnector = new FcSinkConnector();
+
+    @Test
+    public void testTaskConfigs() {
+        Assert.assertEquals(fcSinkConnector.taskConfigs(1).size(), 1);

Review comment:
       Could you please add more unit tests for the core features?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org