You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2019/03/07 12:32:02 UTC

[GitHub] [rocketmq] zongtanghu commented on a change in pull request #633: [ISSUE #598] Enhance transaction by putting messages that exceed max check times to system topic

zongtanghu commented on a change in pull request #633: [ISSUE #598] Enhance transaction by putting messages that exceed max check times to system topic
URL: https://github.com/apache/rocketmq/pull/633#discussion_r263358522
 
 

 ##########
 File path: broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
 ##########
 @@ -17,20 +17,79 @@
 package org.apache.rocketmq.broker.transaction.queue;
 
 import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
+
+public class DefaultTransactionalMessageCheckListener extends
+        AbstractTransactionalMessageCheckListener {
+
+    private static final InternalLogger log = InternalLoggerFactory
+            .getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
 
-public class DefaultTransactionalMessageCheckListener extends AbstractTransactionalMessageCheckListener {
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
 
     public DefaultTransactionalMessageCheckListener() {
         super();
     }
 
     @Override
     public void resolveDiscardMsg(MessageExt msgExt) {
-        log.error("MsgExt:{} has been checked too many times, so discard it", msgExt);
+        log.error(
+                "MsgExt:{} has been checked too many times, so discard it by moving it to system topic TRANS_CHECK_MAXTIME_TOPIC",
+                msgExt);
+
+        try {
+            MessageExtBrokerInner brokerInner = toMessageExtBrokerInner(msgExt);
+            PutMessageResult putMessageResult = this.getBrokerController().getMessageStore()
+                    .putMessage(brokerInner);
+            if (putMessageResult != null
+                    && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
+                log.info(
+                        "Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC OK. Restored in queueOffset={}, "
+                                + "commitLogOffset={}, real topic={}",
+                        msgExt.getQueueOffset(), msgExt.getCommitLogOffset(),
+                        msgExt.getUserProperty(
+                                MessageConst.PROPERTY_REAL_TOPIC));
+            } else {
+                log.error(
+                        "Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC failed, real topic={}, msgId={}",
+                        msgExt.getTopic(), msgExt.getMsgId());
+            }
+        } catch (Exception e) {
+            log.warn("Put checked-too-many-time message to TRANS_CHECK_MAXTIME_TOPIC error. {}", e);
+        }
+
+    }
+
+    private MessageExtBrokerInner toMessageExtBrokerInner(MessageExt msgExt) {
+        TopicConfig topicConfig = this.getBrokerController().getTopicConfigManager()
+                .createTopicOfTranCheckMaxTime(TCMT_QUEUE_NUMS,
+                        PermName.PERM_READ | PermName.PERM_WRITE);
+        int queueId = Math.abs(random.nextInt() % 99999999) % TCMT_QUEUE_NUMS;
 
 Review comment:
   The TRANS_CHECK_MAX_TIME_TOPIC's queue num is 1,so the variable queueId should be set to 1,directly.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services