You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/06/23 08:02:09 UTC

[GitHub] [rocketmq-connect] sunxiaojian commented on a diff in pull request #158: [ISSUE #155] Optimize the record converter parser

sunxiaojian commented on code in PR #158:
URL: https://github.com/apache/rocketmq-connect/pull/158#discussion_r904703189


##########
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java:
##########
@@ -583,28 +581,34 @@ private void receiveMessages(List<MessageExt> messages) {
 
     private ConnectRecord convertToSinkDataEntry(MessageExt message) {
         Map<String, String> properties = message.getProperties();
-        Schema schema;
-        Long timestamp;
-        ConnectRecord sinkDataEntry = null;
-        if (null == recordConverter || recordConverter instanceof RocketMQConverter) {
+        ConnectRecord sinkDataEntry;
+        
+        // start convert
+        if (recordConverter instanceof RecordConverter) {
+            // timestamp
             String connectTimestamp = properties.get(RuntimeConfigDefine.CONNECT_TIMESTAMP);
-            timestamp = StringUtils.isNotEmpty(connectTimestamp) ? Long.valueOf(connectTimestamp) : null;
-            String connectSchema = properties.get(RuntimeConfigDefine.CONNECT_SCHEMA);
-            schema = StringUtils.isNotEmpty(connectSchema) ? JSON.parseObject(connectSchema, Schema.class) : null;
-            byte[] body = message.getBody();
-            RecordPartition recordPartition = ConnectUtil.convertToRecordPartition(message.getTopic(), message.getBrokerName(), message.getQueueId());
+            Long timestamp = StringUtils.isNotEmpty(connectTimestamp) ? Long.valueOf(connectTimestamp) : null;
 
+            // partition and offset
+            RecordPartition recordPartition = ConnectUtil.convertToRecordPartition(message.getTopic(), message.getBrokerName(), message.getQueueId());
             RecordOffset recordOffset = ConnectUtil.convertToRecordOffset(message.getQueueOffset());
 
-            String bodyStr = new String(body, StandardCharsets.UTF_8);

Review Comment:
   > If the converter is null, is it possible to use the previous one as the default processing logic, the default StringConverter?
   
   已处理



##########
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java:
##########
@@ -321,25 +321,27 @@ private void sendRecord() throws InterruptedException, RemotingException, MQClie
                 throw new ConnectException("source connect lack of topic config");
             }
             sourceMessage.setTopic(topic);
-            putExtendMsgProperty(sourceDataEntry, sourceMessage, topic);
-            if (null == recordConverter || recordConverter instanceof RocketMQConverter) {
-                Object payload = sourceDataEntry.getData();
-                if (null != payload) {
-                    final byte[] messageBody = (String.valueOf(payload)).getBytes();
-                    if (messageBody.length > RuntimeConfigDefine.MAX_MESSAGE_SIZE) {
-                        log.error("Send record, message size is greater than {} bytes, sourceDataEntry: {}", RuntimeConfigDefine.MAX_MESSAGE_SIZE, JSON.toJSONString(sourceDataEntry));
-                        continue;
-                    }
-                    sourceMessage.setBody(messageBody);
+            // converter
+            if (recordConverter instanceof RecordConverter) {

Review Comment:
   > If recordConverter is null,Will it throw a null pointer?
   
   已处理



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org