You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/08/27 09:12:35 UTC

[GitHub] [rocketmq-connect] Oliverwqcwrw opened a new pull request, #289: [ISSUE #288] redis source connector adapt to new api

Oliverwqcwrw opened a new pull request, #289:
URL: https://github.com/apache/rocketmq-connect/pull/289

   ## What is the purpose of the change
   
   Close #288 
   
   ## Brief changelog
   
   XX
   
   ## Verifying this change
   
   XXXX
   
   Follow this checklist to help us incorporate your contribution quickly and easily. Notice, `it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR`.
   
   - [x] Make sure there is a [Github issue](https://github.com/apache/rocketmq-connect/issues) filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue. 
   - [x] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
   - [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
   - [x] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/rocketmq/tree/master/test).
   - [x] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test`  to make sure integration-test pass.
   - [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] Oliverwqcwrw commented on a diff in pull request #289: [ISSUE #288] redis source connector adapt to new api

Posted by GitBox <gi...@apache.org>.
Oliverwqcwrw commented on code in PR #289:
URL: https://github.com/apache/rocketmq-connect/pull/289#discussion_r966487465


##########
connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/converter/RedisEntryConverter.java:
##########
@@ -17,53 +17,89 @@
 
 package org.apache.rocketmq.connect.redis.converter;
 
-import io.openmessaging.connector.api.data.DataEntryBuilder;
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.Field;
 import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import java.nio.ByteBuffer;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.connect.redis.common.Options;
-import org.apache.rocketmq.connect.redis.converter.KVEntryConverter;
 import org.apache.rocketmq.connect.redis.pojo.KVEntry;
 
 public class RedisEntryConverter implements KVEntryConverter {
     private final int maxValueSize = 500;
 
-    @Override public List<SourceDataEntry> kVEntryToDataEntries(KVEntry kvEntry) {
-        Schema schema = getRedisSchema(kvEntry.getValueType());
-
+    @Override public List<ConnectRecord> kVEntryToDataEntries(KVEntry kvEntry) {
         String partition = kvEntry.getPartition();
         if (partition == null) {
             throw new IllegalStateException("partition info error.");
         }
-        List<SourceDataEntry> res = new ArrayList<>();
+        List<ConnectRecord> res = new ArrayList<>();
         List<Object> values = splitValue(kvEntry.getValueType(), kvEntry.getValue(), this.maxValueSize);
         for (int i = 0; i < values.size(); i++) {
-            DataEntryBuilder builder = newDataEntryBuilderWithoutValue(schema, kvEntry);
-
-            builder.putFiled(Options.REDIS_VALUE.name(), values.get(i));
-            builder.timestamp(System.currentTimeMillis());
+            Schema schema = SchemaBuilder.struct().name(Options.REDIS_QEUEUE.name()).build();
+            final List<Field> fields = buildFields();
+            schema.setFields(fields);
 
-            SourceDataEntry entry = builder.buildSourceDataEntry(
-                ByteBuffer.wrap(kvEntry.getPartition().getBytes()),
-                ByteBuffer.wrap(RedisPositionConverter.longToJson(kvEntry.getOffset()).toJSONString().getBytes())
-            );
-            res.add(entry);
+            final Object data = values.get(i);
+            if (data == null  || data.toString().equals("")) {
+                continue;
+            }
+            res.add(new ConnectRecord(buildRecordPartition(partition),
+                buildRecordOffset(kvEntry.getOffset()),
+                System.currentTimeMillis(),
+                schema,
+                buildPayLoad(fields, kvEntry, schema)));
         }
         return res;
     }
 
+    private RecordOffset buildRecordOffset(Long offset)  {
+        Map<String, Long> offsetMap = new HashMap<>();
+        offsetMap.put("queueOffset", offset);
+        RecordOffset recordOffset = new RecordOffset(offsetMap);
+        return recordOffset;
+    }
+
+    private RecordPartition buildRecordPartition(String partition) {
+        Map<String, String> partitionMap = new HashMap<>();
+        partitionMap.put("partition", partition);
+        RecordPartition  recordPartition = new RecordPartition(partitionMap);
+        return recordPartition;
+    }
+
+    private List<Field> buildFields() {
+        final Schema stringSchema = SchemaBuilder.string().build();
+        List<Field> fields = new ArrayList<>();
+        fields.add(new Field(0, Options.REDIS_COMMAND.name(), stringSchema));
+        fields.add(new Field(1, Options.REDIS_KEY.name(), stringSchema));

Review Comment:
   Ok, i will fix it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] Oliverwqcwrw commented on a diff in pull request #289: [ISSUE #288] redis source connector adapt to new api

Posted by GitBox <gi...@apache.org>.
Oliverwqcwrw commented on code in PR #289:
URL: https://github.com/apache/rocketmq-connect/pull/289#discussion_r966487071


##########
connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSourceTask.java:
##########
@@ -94,11 +89,7 @@ public Config getConfig() {
         this.config.load(keyValue);
         LOGGER.info("task config msg: {}", this.config.toString());
 
-        // get position info
-        ByteBuffer byteBuffer = this.context.positionStorageReader().getPosition(
-            this.config.getPositionPartitionKey()
-        );
-        Long position = RedisPositionConverter.jsonToLong(byteBuffer);
+        final Long position = this.sourceTaskContext.configs().getLong("offset");

Review Comment:
   Thanks for your review,i will fix it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] Oliverwqcwrw commented on a diff in pull request #289: [ISSUE #288] redis source connector adapt to new api

Posted by GitBox <gi...@apache.org>.
Oliverwqcwrw commented on code in PR #289:
URL: https://github.com/apache/rocketmq-connect/pull/289#discussion_r971975775


##########
connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSourceTask.java:
##########
@@ -94,11 +107,7 @@ public Config getConfig() {
         this.config.load(keyValue);
         LOGGER.info("task config msg: {}", this.config.toString());
 
-        // get position info
-        ByteBuffer byteBuffer = this.context.positionStorageReader().getPosition(
-            this.config.getPositionPartitionKey()
-        );
-        Long position = RedisPositionConverter.jsonToLong(byteBuffer);
+        final Long position = this.sourceTaskContext.configs().getLong("offset");

Review Comment:
   Hello @odbozhou ,
   Agree with your opinion, But there's no problem with position here,
   In `org.apache.rocketmq.connect.redis.processor.DefaultRedisEventProcessor#start`
   
   > 
        // 如果是LAST_OFFSET,则将offset设置为当前Redis最新的offset值。
               // LAST_OFFSET、CUSTOM_OFFSET,优先使用connector runtime中的存储位点信息。
               if (SyncMod.LAST_OFFSET.equals(this.config.getSyncMod())) {
                   if (this.config.getPosition() != null) {
                       this.config.setOffset(this.config.getPosition());
                   } else if (StringUtils.isNotBlank(offset)) {
                       this.config.setOffset(Long.parseLong(offset));
                   }
               } else if(SyncMod.CUSTOM_OFFSET.equals(this.config.getSyncMod())){
                   if (this.config.getPosition() != null) {
                       this.config.setOffset(this.config.getPosition());
                   }
               }
   
               startReplicatorAsync(this.config.getReplId(), this.config.getOffset());
   
   The synchronization is done from position at the end



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] odbozhou commented on a diff in pull request #289: [ISSUE #288] redis source connector adapt to new api

Posted by GitBox <gi...@apache.org>.
odbozhou commented on code in PR #289:
URL: https://github.com/apache/rocketmq-connect/pull/289#discussion_r965525651


##########
connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSourceTask.java:
##########
@@ -94,11 +89,7 @@ public Config getConfig() {
         this.config.load(keyValue);
         LOGGER.info("task config msg: {}", this.config.toString());
 
-        // get position info
-        ByteBuffer byteBuffer = this.context.positionStorageReader().getPosition(
-            this.config.getPositionPartitionKey()
-        );
-        Long position = RedisPositionConverter.jsonToLong(byteBuffer);
+        final Long position = this.sourceTaskContext.configs().getLong("offset");

Review Comment:
   To get the position, you can refer to FileSourceTask



##########
connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/converter/RedisEntryConverter.java:
##########
@@ -17,53 +17,89 @@
 
 package org.apache.rocketmq.connect.redis.converter;
 
-import io.openmessaging.connector.api.data.DataEntryBuilder;
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.Field;
 import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import java.nio.ByteBuffer;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.connect.redis.common.Options;
-import org.apache.rocketmq.connect.redis.converter.KVEntryConverter;
 import org.apache.rocketmq.connect.redis.pojo.KVEntry;
 
 public class RedisEntryConverter implements KVEntryConverter {
     private final int maxValueSize = 500;
 
-    @Override public List<SourceDataEntry> kVEntryToDataEntries(KVEntry kvEntry) {
-        Schema schema = getRedisSchema(kvEntry.getValueType());
-
+    @Override public List<ConnectRecord> kVEntryToDataEntries(KVEntry kvEntry) {
         String partition = kvEntry.getPartition();
         if (partition == null) {
             throw new IllegalStateException("partition info error.");
         }
-        List<SourceDataEntry> res = new ArrayList<>();
+        List<ConnectRecord> res = new ArrayList<>();
         List<Object> values = splitValue(kvEntry.getValueType(), kvEntry.getValue(), this.maxValueSize);
         for (int i = 0; i < values.size(); i++) {
-            DataEntryBuilder builder = newDataEntryBuilderWithoutValue(schema, kvEntry);
-
-            builder.putFiled(Options.REDIS_VALUE.name(), values.get(i));
-            builder.timestamp(System.currentTimeMillis());
+            Schema schema = SchemaBuilder.struct().name(Options.REDIS_QEUEUE.name()).build();
+            final List<Field> fields = buildFields();
+            schema.setFields(fields);
 
-            SourceDataEntry entry = builder.buildSourceDataEntry(
-                ByteBuffer.wrap(kvEntry.getPartition().getBytes()),
-                ByteBuffer.wrap(RedisPositionConverter.longToJson(kvEntry.getOffset()).toJSONString().getBytes())
-            );
-            res.add(entry);
+            final Object data = values.get(i);
+            if (data == null  || data.toString().equals("")) {
+                continue;
+            }
+            res.add(new ConnectRecord(buildRecordPartition(partition),
+                buildRecordOffset(kvEntry.getOffset()),
+                System.currentTimeMillis(),
+                schema,
+                buildPayLoad(fields, kvEntry, schema)));
         }
         return res;
     }
 
+    private RecordOffset buildRecordOffset(Long offset)  {
+        Map<String, Long> offsetMap = new HashMap<>();
+        offsetMap.put("queueOffset", offset);
+        RecordOffset recordOffset = new RecordOffset(offsetMap);
+        return recordOffset;
+    }
+
+    private RecordPartition buildRecordPartition(String partition) {
+        Map<String, String> partitionMap = new HashMap<>();
+        partitionMap.put("partition", partition);
+        RecordPartition  recordPartition = new RecordPartition(partitionMap);
+        return recordPartition;
+    }
+
+    private List<Field> buildFields() {
+        final Schema stringSchema = SchemaBuilder.string().build();
+        List<Field> fields = new ArrayList<>();
+        fields.add(new Field(0, Options.REDIS_COMMAND.name(), stringSchema));
+        fields.add(new Field(1, Options.REDIS_KEY.name(), stringSchema));

Review Comment:
   There is already a key attribute in ConnectRecord. Should the redis key be placed in the key attribute in ConnectRecord, and the value should be assigned to the data in ConnectRecord?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] odbozhou commented on a diff in pull request #289: [ISSUE #288] redis source connector adapt to new api

Posted by GitBox <gi...@apache.org>.
odbozhou commented on code in PR #289:
URL: https://github.com/apache/rocketmq-connect/pull/289#discussion_r971570840


##########
connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/converter/RedisEntryConverter.java:
##########
@@ -17,53 +17,81 @@
 
 package org.apache.rocketmq.connect.redis.converter;
 
-import io.openmessaging.connector.api.data.DataEntryBuilder;
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.Field;
 import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import java.nio.ByteBuffer;
+import io.openmessaging.connector.api.data.SchemaBuilder;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.connect.redis.common.Options;
-import org.apache.rocketmq.connect.redis.converter.KVEntryConverter;
 import org.apache.rocketmq.connect.redis.pojo.KVEntry;
 
 public class RedisEntryConverter implements KVEntryConverter {
     private final int maxValueSize = 500;
 
-    @Override public List<SourceDataEntry> kVEntryToDataEntries(KVEntry kvEntry) {
-        Schema schema = getRedisSchema(kvEntry.getValueType());
-
+    @Override
+    public List<ConnectRecord> kVEntryToConnectRecord(KVEntry kvEntry) {
         String partition = kvEntry.getPartition();
         if (partition == null) {
             throw new IllegalStateException("partition info error.");
         }
-        List<SourceDataEntry> res = new ArrayList<>();
+
+        List<ConnectRecord> res = new ArrayList<>();
         List<Object> values = splitValue(kvEntry.getValueType(), kvEntry.getValue(), this.maxValueSize);
         for (int i = 0; i < values.size(); i++) {
-            DataEntryBuilder builder = newDataEntryBuilderWithoutValue(schema, kvEntry);
-
-            builder.putFiled(Options.REDIS_VALUE.name(), values.get(i));
-            builder.timestamp(System.currentTimeMillis());
-
-            SourceDataEntry entry = builder.buildSourceDataEntry(
-                ByteBuffer.wrap(kvEntry.getPartition().getBytes()),
-                ByteBuffer.wrap(RedisPositionConverter.longToJson(kvEntry.getOffset()).toJSONString().getBytes())
-            );
-            res.add(entry);
+            Schema keySchema = SchemaBuilder.string().name(Options.REDIS_KEY.name()).build();
+            keySchema.setFields(buildFields());
+            final Object data = values.get(i);
+            if (data == null  || data.toString().equals("")) {
+                continue;
+            }
+            res.add(new ConnectRecord(
+                buildRecordPartition(partition),
+                buildRecordOffset(kvEntry.getOffset()),
+                System.currentTimeMillis(),
+                keySchema,
+                kvEntry.getKey(),
+                null,

Review Comment:
   Is the value schema missing?



##########
connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSourceTask.java:
##########
@@ -94,11 +107,7 @@ public Config getConfig() {
         this.config.load(keyValue);
         LOGGER.info("task config msg: {}", this.config.toString());
 
-        // get position info
-        ByteBuffer byteBuffer = this.context.positionStorageReader().getPosition(
-            this.config.getPositionPartitionKey()
-        );
-        Long position = RedisPositionConverter.jsonToLong(byteBuffer);
+        final Long position = this.sourceTaskContext.configs().getLong("offset");

Review Comment:
   The acquisition and use of position seems to be a bit problematic. The main function of position is to record the processing site of the connector, which ensures that when restarting or re-load balancing, it can continue processing where the previous processing site was successful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] odbozhou merged pull request #289: [ISSUE #288] redis source connector adapt to new api

Posted by GitBox <gi...@apache.org>.
odbozhou merged PR #289:
URL: https://github.com/apache/rocketmq-connect/pull/289


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org