You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/03/08 06:29:00 UTC

[GitHub] [rocketmq-connect] 2011shenlin commented on a change in pull request #5: [ISSUE #8] Add fc sink task

2011shenlin commented on a change in pull request #5:
URL: https://github.com/apache/rocketmq-connect/pull/5#discussion_r821289924



##########
File path: connectors/aliyun/rocketmq-connect-fc/src/main/java/com/aliyun/rocketmq/connect/fc/sink/FcSinkConnector.java
##########
@@ -0,0 +1,82 @@
+package com.aliyun.rocketmq.connect.fc.sink;
+
+import com.aliyun.rocketmq.connect.fc.sink.constant.FcConstant;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.internal.DefaultKeyValue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FcSinkConnector extends SinkConnector {
+
+    private String region;
+
+    private String accessKey;
+
+    private String accessSecretKey;
+
+    private String accountId;
+
+    private String serviceName;
+
+    private String functionName;
+
+    private String invocationType;
+
+    private String qualifier;
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs(int maxTasks) {
+        List<KeyValue> keyValueList = new ArrayList<>(11);
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(FcConstant.REGION_CONSTANT, region);
+        keyValue.put(FcConstant.ACCESS_KEY_CONSTANT, accessKey);
+        keyValue.put(FcConstant.ACCESS_SECRET_KEY_CONSTANT, accessSecretKey);
+        keyValue.put(FcConstant.ACCOUNT_ID_CONSTANT, accountId);
+        keyValue.put(FcConstant.SERVICE_NAME_CONSTANT, serviceName);
+        keyValue.put(FcConstant.FUNCTION_NAME_CONSTANT, functionName);
+        keyValue.put(FcConstant.INVOCATION_TYPE_CONSTANT, invocationType);
+        keyValue.put(FcConstant.QUALIFIER_CONSTANT, qualifier);
+        keyValueList.add(keyValue);
+        return keyValueList;
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return FcSinkTask.class;
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+
+    }
+
+    @Override
+    public void init(KeyValue config) {
+        region = config.getString(FcConstant.REGION_CONSTANT);
+        accessKey = config.getString(FcConstant.ACCESS_KEY_CONSTANT);
+        accessSecretKey = config.getString(FcConstant.ACCESS_SECRET_KEY_CONSTANT);
+        accountId = config.getString(FcConstant.ACCOUNT_ID_CONSTANT);
+        serviceName = config.getString(FcConstant.SERVICE_NAME_CONSTANT);
+        functionName = config.getString(FcConstant.FUNCTION_NAME_CONSTANT);
+        invocationType = config.getString(FcConstant.INVOCATION_TYPE_CONSTANT, null);

Review comment:
       If the configuration items of SinkConnector and SinkTask are the same, could return the SinkConnector config directly,Avoid duplicate code between SinkConnector and SinkTask about configuration item extraction

##########
File path: connectors/aliyun/rocketmq-connect-fc/src/main/java/com/aliyun/rocketmq/connect/fc/sink/FcSinkTask.java
##########
@@ -0,0 +1,131 @@
+package com.aliyun.rocketmq.connect.fc.sink;
+
+import com.alibaba.fastjson.JSON;
+import com.aliyun.rocketmq.connect.fc.sink.constant.FcConstant;
+import com.aliyuncs.fc.client.FunctionComputeClient;
+import com.aliyuncs.fc.constants.Const;
+import com.aliyuncs.fc.exceptions.ClientException;
+import com.aliyuncs.fc.request.GetFunctionRequest;
+import com.aliyuncs.fc.request.GetServiceRequest;
+import com.aliyuncs.fc.request.InvokeFunctionRequest;
+import com.aliyuncs.fc.response.InvokeFunctionResponse;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.HttpURLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+public class FcSinkTask extends SinkTask {
+
+    private static final Logger log = LoggerFactory.getLogger(FcSinkTask.class);
+
+    private String region;
+
+    private String accessKey;
+
+    private String accessSecretKey;
+
+    private String accountId;
+
+    private String serviceName;
+
+    private String functionName;
+
+    private String invocationType;
+
+    private String qualifier;
+
+    private FunctionComputeClient functionComputeClient;
+
+    @Override
+    public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
+        try {
+            sinkRecords.forEach(connectRecord -> {
+                InvokeFunctionRequest invokeFunctionRequest = new InvokeFunctionRequest(serviceName, functionName);
+                invokeFunctionRequest.setPayload(JSON.toJSONString(connectRecord.getData()).getBytes(StandardCharsets.UTF_8));
+                if (!StringUtils.isBlank(invocationType)) {
+                    invokeFunctionRequest.setInvocationType(Const.INVOCATION_TYPE_ASYNC);
+                }
+                invokeFunctionRequest.setQualifier(qualifier);
+                InvokeFunctionResponse invokeFunctionResponse = functionComputeClient.invokeFunction(invokeFunctionRequest);
+                if (Const.INVOCATION_TYPE_ASYNC.equals(invocationType)) {
+                    if (HttpURLConnection.HTTP_ACCEPTED == invokeFunctionResponse.getStatus()) {
+                        log.info("Async invocation has been queued for execution, request ID: {}", invokeFunctionResponse.getRequestId());
+                    }else {
+                        log.info("Async invocation was not accepted");
+                    }
+                }
+            });
+        } catch (Exception e) {
+            log.error("FcSinkTask | put | error => ", e);
+        }
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        if (StringUtils.isBlank(config.getString(FcConstant.REGION_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.ACCESS_KEY_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.ACCESS_SECRET_KEY_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.ACCOUNT_ID_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.SERVICE_NAME_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.FUNCTION_NAME_CONSTANT))) {
+            throw new RuntimeException("fc required parameter is null !");
+        }
+        try {

Review comment:
       should check the access key and the secret key efficient.

##########
File path: connectors/aliyun/rocketmq-connect-fc/src/main/java/com/aliyun/rocketmq/connect/fc/sink/FcSinkTask.java
##########
@@ -0,0 +1,131 @@
+package com.aliyun.rocketmq.connect.fc.sink;
+
+import com.alibaba.fastjson.JSON;
+import com.aliyun.rocketmq.connect.fc.sink.constant.FcConstant;
+import com.aliyuncs.fc.client.FunctionComputeClient;
+import com.aliyuncs.fc.constants.Const;
+import com.aliyuncs.fc.exceptions.ClientException;
+import com.aliyuncs.fc.request.GetFunctionRequest;
+import com.aliyuncs.fc.request.GetServiceRequest;
+import com.aliyuncs.fc.request.InvokeFunctionRequest;
+import com.aliyuncs.fc.response.InvokeFunctionResponse;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.HttpURLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+public class FcSinkTask extends SinkTask {
+
+    private static final Logger log = LoggerFactory.getLogger(FcSinkTask.class);
+
+    private String region;
+
+    private String accessKey;
+
+    private String accessSecretKey;
+
+    private String accountId;
+
+    private String serviceName;
+
+    private String functionName;
+
+    private String invocationType;
+
+    private String qualifier;
+
+    private FunctionComputeClient functionComputeClient;
+
+    @Override
+    public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
+        try {
+            sinkRecords.forEach(connectRecord -> {
+                InvokeFunctionRequest invokeFunctionRequest = new InvokeFunctionRequest(serviceName, functionName);
+                invokeFunctionRequest.setPayload(JSON.toJSONString(connectRecord.getData()).getBytes(StandardCharsets.UTF_8));
+                if (!StringUtils.isBlank(invocationType)) {
+                    invokeFunctionRequest.setInvocationType(Const.INVOCATION_TYPE_ASYNC);
+                }
+                invokeFunctionRequest.setQualifier(qualifier);
+                InvokeFunctionResponse invokeFunctionResponse = functionComputeClient.invokeFunction(invokeFunctionRequest);
+                if (Const.INVOCATION_TYPE_ASYNC.equals(invocationType)) {
+                    if (HttpURLConnection.HTTP_ACCEPTED == invokeFunctionResponse.getStatus()) {
+                        log.info("Async invocation has been queued for execution, request ID: {}", invokeFunctionResponse.getRequestId());
+                    }else {
+                        log.info("Async invocation was not accepted");
+                    }
+                }
+            });
+        } catch (Exception e) {
+            log.error("FcSinkTask | put | error => ", e);
+        }
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        if (StringUtils.isBlank(config.getString(FcConstant.REGION_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.ACCESS_KEY_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.ACCESS_SECRET_KEY_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.ACCOUNT_ID_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.SERVICE_NAME_CONSTANT))
+            || StringUtils.isBlank(config.getString(FcConstant.FUNCTION_NAME_CONSTANT))) {
+            throw new RuntimeException("fc required parameter is null !");
+        }
+        try {
+            GetServiceRequest getServiceRequest = new GetServiceRequest(config.getString(FcConstant.SERVICE_NAME_CONSTANT));
+            getServiceRequest.setQualifier(config.getString(FcConstant.QUALIFIER_CONSTANT));
+            functionComputeClient.getService(getServiceRequest);
+            GetFunctionRequest getFunctionRequest = new GetFunctionRequest(config.getString(FcConstant.SERVICE_NAME_CONSTANT), config.getString(FcConstant.FUNCTION_NAME_CONSTANT));
+            getFunctionRequest.setQualifier(config.getString(FcConstant.QUALIFIER_CONSTANT));
+            functionComputeClient.getFunction(getFunctionRequest);
+        } catch (ClientException e) {
+            log.error("FcSinkTask | validate | error => ", e);

Review comment:
       should check the serviceName exist, function name exist.

##########
File path: connectors/aliyun/rocketmq-connect-fc/README.md
##########
@@ -0,0 +1,48 @@
+# rocketmq-connect-fc
+* **rocketmq-connect-fc** 说明
+```
+Be responsible for consuming messages from producer and writing data to function calculation FC.
+```
+
+## rocketmq-connect-fc 打包
+```
+mvn clean install -Dmaven.test.skip=true
+```
+
+## rocketmq-connect-fc 启动
+
+* **fc-sink-connector** 启动
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-fc-sink-connector-name}
+?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"com.aliyun.rocketmq.connect.fc.sink.FcSinkConnector",“region”:"${region}",accessKey”:"${accessKey}",accessSecretKey”:"${accessSecretKey}",accountId”:"${accountId}","serviceName":"${serviceName}","functionName":"${functionName}","invocationType":"${invocationType}", "qualifier":"${qualifier}"}
+```
+
+例子 
+```
+http://localhost:8081/connectors/fcConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
+"connector-class":"com.aliyun.rocketmq.connect.fc.sink.FcSinkConnector",“region”:"cn-hangzhou",accessKey”:"xxxx",accessSecretKey”:"xxxx",accountId”:"xxxx","serviceName":"xxxx","functionName":"xxxx","invocationType":"", "qualifier":"LATEST"}
+```
+
+>**注:** `rocketmq-fc-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
+
+## rocketmq-connect-fc 停止
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-fc-connector-name}/stop
+```
+
+## rocketmq-connect-fc 参数说明
+* **fc-sink-connector 参数说明**
+
+|         KEY            |  TYPE   | Must be filled | Description                      | Example
+|------------------------|---------|----------------|----------------------------------|--|
+|region                  | String  | YES            | 地域                               | cn-hangzhou|
+|accessKey               | String  | YES            | 阿里云身份验证,在阿里云用户信息管理控制台获取                    | xxxx |
+|accessSecretKey         | String  | YES            | 阿里云身份验证,在阿里云用户信息管理控制台获取                     | xxx |
+|accountId               | String  | YES            | 阿里云yourAccountId                      | xxxx |
+|serviceName             | String  | YES            | 服务名称 | xxxx |
+|functionName            | String  | YES            | 函数名称 | xxxx |
+|invocationType    | String | NO             | 同步或者异步                           | null |
+|qualifier        | String | NO             | 服务版本和别名                          | LATEST |
+

Review comment:
       FC's Payload needs to support trasform configuration.

##########
File path: connectors/aliyun/rocketmq-connect-fc/src/main/resources/logback.xml
##########
@@ -0,0 +1,90 @@
+<configuration>

Review comment:
       why need logback config?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org