You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/05/20 04:04:09 UTC

[GitHub] [rocketmq] yuz10 commented on a change in pull request #2854: [Issue #2853] Refactoring remove duplicated code in CommitLog putMessage/asyncPutMessage

yuz10 commented on a change in pull request #2854:
URL: https://github.com/apache/rocketmq/pull/2854#discussion_r635739093



##########
File path: store/src/main/java/org/apache/rocketmq/store/CommitLog.java
##########
@@ -785,41 +786,24 @@ public long getBeginTimeInLock() {
     }
 
     public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
-        // Set the storage time
-        msg.setStoreTimestamp(System.currentTimeMillis());
-        // Set the message body BODY CRC (consider the most appropriate setting
-        // on the client)
-        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
+        setIPV6Flags(msg);
+
         // Back to Results
         AppendMessageResult result = null;
+        try {
+            result = appendPutMessageAndTrackStats(msg);
+        } catch (Exceptions.PutMessageException ex) {
+            return ex.toPutMessageResult();
+        }
+        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
 
-        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
-
-        String topic = msg.getTopic();
-        int queueId = msg.getQueueId();
-
-        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
-        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
-            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
-            // Delay Delivery
-            if (msg.getDelayTimeLevel() > 0) {
-                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
-                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
-                }
-
-                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
-                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
-
-                // Backup real topic, queueId
-                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
-                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
-                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
+        handleDiskFlush(result, putMessageResult, msg);
+        handleHA(result, putMessageResult, msg);
 
-                msg.setTopic(topic);
-                msg.setQueueId(queueId);
-            }
-        }
+        return putMessageResult;
+    }
 
+    private void setIPV6Flags(MessageExtBrokerInner msg) {

Review comment:
       setIPV6Flags can also extract lines in putMessages, because MessageExtBatch and MessageExtBrokerInner both extends  MessageExt 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org