You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/06/19 01:04:36 UTC
[rocketmq] branch develop updated: [ISSUE #598] Enhance transaction
by putting messages that exceed max check times to system topic (#633)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d66243c [ISSUE #598] Enhance transaction by putting messages that exceed max check times to system topic (#633)
d66243c is described below
commit d66243c03cb903588c0089b7152fe8c8f33bcc84
Author: 程向往 <ch...@cmss.chinamobile.com>
AuthorDate: Wed Jun 19 09:04:29 2019 +0800
[ISSUE #598] Enhance transaction by putting messages that exceed max check times to system topic (#633)
* add logic of putting message that exceeds max-check-times to system topic TRANS_CHECK_MAXTIME_TOPIC
* add test case:testResolveDiscardMsg
* add @After logic to test case
* comment brokerController.shutdown and use mock
* add logic of resuming half message check
* add test case:resumeCheckHalfMessage
* delete commented codes
---
.../broker/processor/AdminBrokerProcessor.java | 70 +++++++++++-
.../rocketmq/broker/topic/TopicConfigManager.java | 48 +++++++-
.../AbstractTransactionalMessageCheckListener.java | 5 +
.../DefaultTransactionalMessageCheckListener.java | 45 +++++++-
.../broker/processor/AdminBrokerProcessorTest.java | 125 +++++++++++++++++++++
...faultTransactionalMessageCheckListenerTest.java | 48 ++++++--
.../rocketmq/client/impl/MQClientAPIImpl.java | 21 ++++
.../rocketmq/client/impl/MQClientAPIImplTest.java | 46 +++++++-
.../java/org/apache/rocketmq/common/MixAll.java | 1 +
.../rocketmq/common/protocol/RequestCode.java | 5 +
.../ResumeCheckHalfMessageRequestHeader.java | 33 ++++--
pom.xml | 2 +-
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 13 +++
.../tools/admin/DefaultMQAdminExtImpl.java | 19 ++++
.../apache/rocketmq/tools/admin/MQAdminExt.java | 5 +
.../message/QueryMsgByUniqueKeySubCommandTest.java | 15 +++
16 files changed, 468 insertions(+), 33 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index f23cca6..76a051b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
+import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
@@ -53,7 +54,10 @@ import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHead
import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
@@ -100,6 +104,7 @@ import org.apache.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHea
import org.apache.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader;
import org.apache.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader;
import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ResumeCheckHalfMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader;
@@ -120,8 +125,11 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
public class AdminBrokerProcessor implements NettyRequestProcessor {
@@ -216,6 +224,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return getBrokerAclConfigVersion(ctx, request);
case RequestCode.UPDATE_GLOBAL_WHITE_ADDRS_CONFIG:
return updateGlobalWhiteAddrsConfig(ctx, request);
+ case RequestCode.RESUME_CHECK_HALF_MESSAGE:
+ return resumeCheckHalfMessage(ctx, request);
default:
break;
}
@@ -262,7 +272,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
- this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion());
+ this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
return null;
}
@@ -1518,4 +1528,62 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
+
+ private RemotingCommand resumeCheckHalfMessage(ChannelHandlerContext ctx,
+ RemotingCommand request)
+ throws RemotingCommandException {
+ final ResumeCheckHalfMessageRequestHeader requestHeader = (ResumeCheckHalfMessageRequestHeader) request
+ .decodeCommandCustomHeader(ResumeCheckHalfMessageRequestHeader.class);
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ SelectMappedBufferResult selectMappedBufferResult = null;
+ try {
+ MessageId messageId = MessageDecoder.decodeMessageId(requestHeader.getMsgId());
+ selectMappedBufferResult = this.brokerController.getMessageStore()
+ .selectOneMessageByOffset(messageId.getOffset());
+ MessageExt msg = MessageDecoder.decode(selectMappedBufferResult.getByteBuffer());
+ msg.putUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, String.valueOf(0));
+ PutMessageResult putMessageResult = this.brokerController.getMessageStore()
+ .putMessage(toMessageExtBrokerInner(msg));
+ if (putMessageResult != null
+ && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
+ log.info(
+ "Put message back to RMQ_SYS_TRANS_HALF_TOPIC. real topic={}",
+ msg.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ } else {
+ log.error("Put message back to RMQ_SYS_TRANS_HALF_TOPIC failed.");
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("Put message back to RMQ_SYS_TRANS_HALF_TOPIC failed.");
+ }
+ } catch (Exception e) {
+ log.error("Exception was thrown when putting message back to RMQ_SYS_TRANS_HALF_TOPIC.");
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("Exception was thrown when putting message back to RMQ_SYS_TRANS_HALF_TOPIC.");
+ } finally {
+ if (selectMappedBufferResult != null) {
+ selectMappedBufferResult.release();
+ }
+ }
+ return response;
+ }
+
+ private MessageExtBrokerInner toMessageExtBrokerInner(MessageExt msgExt) {
+ MessageExtBrokerInner inner = new MessageExtBrokerInner();
+ inner.setTopic(TransactionalMessageUtil.buildHalfTopic());
+ inner.setBody(msgExt.getBody());
+ inner.setFlag(msgExt.getFlag());
+ MessageAccessor.setProperties(inner, msgExt.getProperties());
+ inner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
+ inner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(msgExt.getTags()));
+ inner.setQueueId(0);
+ inner.setSysFlag(msgExt.getSysFlag());
+ inner.setBornHost(msgExt.getBornHost());
+ inner.setBornTimestamp(msgExt.getBornTimestamp());
+ inner.setStoreHost(msgExt.getStoreHost());
+ inner.setReconsumeTimes(msgExt.getReconsumeTimes());
+ inner.setMsgId(msgExt.getMsgId());
+ inner.setWaitStoreMsgOK(false);
+ return inner;
+ }
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index ed353da..8f215cd 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -220,7 +220,7 @@ public class TopicConfigManager extends ConfigManager {
}
if (createNew) {
- this.brokerController.registerBrokerAll(false, true,true);
+ this.brokerController.registerBrokerAll(false, true, true);
}
return topicConfig;
@@ -264,7 +264,47 @@ public class TopicConfigManager extends ConfigManager {
}
if (createNew) {
- this.brokerController.registerBrokerAll(false, true,true);
+ this.brokerController.registerBrokerAll(false, true, true);
+ }
+
+ return topicConfig;
+ }
+
+ public TopicConfig createTopicOfTranCheckMaxTime(final int clientDefaultTopicQueueNums, final int perm) {
+ TopicConfig topicConfig = this.topicConfigTable.get(MixAll.TRANS_CHECK_MAX_TIME_TOPIC);
+ if (topicConfig != null)
+ return topicConfig;
+
+ boolean createNew = false;
+
+ try {
+ if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ topicConfig = this.topicConfigTable.get(MixAll.TRANS_CHECK_MAX_TIME_TOPIC);
+ if (topicConfig != null)
+ return topicConfig;
+
+ topicConfig = new TopicConfig(MixAll.TRANS_CHECK_MAX_TIME_TOPIC);
+ topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);
+ topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);
+ topicConfig.setPerm(perm);
+ topicConfig.setTopicSysFlag(0);
+
+ log.info("create new topic {}", topicConfig);
+ this.topicConfigTable.put(MixAll.TRANS_CHECK_MAX_TIME_TOPIC, topicConfig);
+ createNew = true;
+ this.dataVersion.nextVersion();
+ this.persist();
+ } finally {
+ this.lockTopicConfigTable.unlock();
+ }
+ }
+ } catch (InterruptedException e) {
+ log.error("create TRANS_CHECK_MAX_TIME_TOPIC exception", e);
+ }
+
+ if (createNew) {
+ this.brokerController.registerBrokerAll(false, true, true);
}
return topicConfig;
@@ -289,7 +329,7 @@ public class TopicConfigManager extends ConfigManager {
this.dataVersion.nextVersion();
this.persist();
- this.brokerController.registerBrokerAll(false, true,true);
+ this.brokerController.registerBrokerAll(false, true, true);
}
}
@@ -309,7 +349,7 @@ public class TopicConfigManager extends ConfigManager {
this.dataVersion.nextVersion();
this.persist();
- this.brokerController.registerBrokerAll(false, true,true);
+ this.brokerController.registerBrokerAll(false, true, true);
}
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
index 659c6af..62507cd 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.broker.transaction;
import io.netty.channel.Channel;
+import java.util.Random;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
@@ -36,6 +37,10 @@ public abstract class AbstractTransactionalMessageCheckListener {
private BrokerController brokerController;
+ //queue nums of topic TRANS_CHECK_MAX_TIME_TOPIC
+ protected final static int TCMT_QUEUE_NUMS = 1;
+ protected final Random random = new Random(System.currentTimeMillis());
+
private static ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
index 529bfe4..ee87bd3 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
@@ -17,10 +17,18 @@
package org.apache.rocketmq.broker.transaction.queue;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
public class DefaultTransactionalMessageCheckListener extends AbstractTransactionalMessageCheckListener {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
@@ -31,6 +39,41 @@ public class DefaultTransactionalMessageCheckListener extends AbstractTransactio
@Override
public void resolveDiscardMsg(MessageExt msgExt) {
- log.error("MsgExt:{} has been checked too many times, so discard it", msgExt);
+ log.error("MsgExt:{} has been checked too many times, so discard it by moving it to system topic TRANS_CHECK_MAXTIME_TOPIC", msgExt);
+
+ try {
+ MessageExtBrokerInner brokerInner = toMessageExtBrokerInner(msgExt);
+ PutMessageResult putMessageResult = this.getBrokerController().getMessageStore().putMessage(brokerInner);
+ if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
+ log.info("Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC OK. Restored in queueOffset={}, " +
+ "commitLogOffset={}, real topic={}", msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
+ } else {
+ log.error("Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC failed, real topic={}, msgId={}", msgExt.getTopic(), msgExt.getMsgId());
+ }
+ } catch (Exception e) {
+ log.warn("Put checked-too-many-time message to TRANS_CHECK_MAXTIME_TOPIC error. {}", e);
+ }
+
+ }
+
+ private MessageExtBrokerInner toMessageExtBrokerInner(MessageExt msgExt) {
+ TopicConfig topicConfig = this.getBrokerController().getTopicConfigManager().createTopicOfTranCheckMaxTime(TCMT_QUEUE_NUMS, PermName.PERM_READ | PermName.PERM_WRITE);
+ int queueId = Math.abs(random.nextInt() % 99999999) % TCMT_QUEUE_NUMS;
+ MessageExtBrokerInner inner = new MessageExtBrokerInner();
+ inner.setTopic(topicConfig.getTopicName());
+ inner.setBody(msgExt.getBody());
+ inner.setFlag(msgExt.getFlag());
+ MessageAccessor.setProperties(inner, msgExt.getProperties());
+ inner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
+ inner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(msgExt.getTags()));
+ inner.setQueueId(queueId);
+ inner.setSysFlag(msgExt.getSysFlag());
+ inner.setBornHost(msgExt.getBornHost());
+ inner.setBornTimestamp(msgExt.getBornTimestamp());
+ inner.setStoreHost(msgExt.getStoreHost());
+ inner.setReconsumeTimes(msgExt.getReconsumeTimes());
+ inner.setMsgId(msgExt.getMsgId());
+ inner.setWaitStoreMsgOK(false);
+ return inner;
}
}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
new file mode 100644
index 0000000..ec0a879
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.ResumeCheckHalfMessageRequestHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.AppendMessageResult;
+import org.apache.rocketmq.store.AppendMessageStatus;
+import org.apache.rocketmq.store.MappedFile;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class AdminBrokerProcessorTest {
+
+ private AdminBrokerProcessor adminBrokerProcessor;
+
+ @Mock
+ private ChannelHandlerContext handlerContext;
+
+ @Spy
+ private BrokerController
+ brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(),
+ new MessageStoreConfig());
+
+ @Mock
+ private MessageStore messageStore;
+
+
+ @Before
+ public void init() {
+ brokerController.setMessageStore(messageStore);
+ adminBrokerProcessor = new AdminBrokerProcessor(brokerController);
+ }
+
+ @Test
+ public void testProcessRequest_success() throws RemotingCommandException, UnknownHostException {
+ RemotingCommand request = createResumeCheckHalfMessageCommand();
+ when(messageStore.selectOneMessageByOffset(any(Long.class))).thenReturn(createSelectMappedBufferResult());
+ when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
+ (PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
+ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testProcessRequest_fail() throws RemotingCommandException, UnknownHostException {
+ RemotingCommand request = createResumeCheckHalfMessageCommand();
+ when(messageStore.selectOneMessageByOffset(any(Long.class))).thenReturn(createSelectMappedBufferResult());
+ when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
+ (PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+ }
+
+ private MessageExt createDefaultMessageExt() {
+ MessageExt messageExt = new MessageExt();
+ messageExt.setMsgId("12345678");
+ messageExt.setQueueId(0);
+ messageExt.setCommitLogOffset(123456789L);
+ messageExt.setQueueOffset(1234);
+ MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_QUEUE_ID, "0");
+ MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_TOPIC, "testTopic");
+ MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, "15");
+ return messageExt;
+ }
+
+ private SelectMappedBufferResult createSelectMappedBufferResult(){
+ SelectMappedBufferResult result = new SelectMappedBufferResult(0, ByteBuffer.allocate(1024) ,0, new MappedFile());
+ return result;
+ }
+ private ResumeCheckHalfMessageRequestHeader createResumeCheckHalfMessageRequestHeader() {
+ ResumeCheckHalfMessageRequestHeader header = new ResumeCheckHalfMessageRequestHeader();
+ header.setMsgId("C0A803CA00002A9F0000000000031367");
+ return header;
+ }
+
+ private RemotingCommand createResumeCheckHalfMessageCommand() {
+ ResumeCheckHalfMessageRequestHeader header = createResumeCheckHalfMessageRequestHeader();
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RESUME_CHECK_HALF_MESSAGE, header);
+ request.makeCustomHeaderToNet();
+ return request;
+ }
+}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java
index 17bf00b..653a969 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java
@@ -16,18 +16,23 @@
*/
package org.apache.rocketmq.broker.transaction.queue;
+import java.net.InetSocketAddress;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
@@ -35,16 +40,25 @@ import org.mockito.junit.MockitoJUnitRunner;
public class DefaultTransactionalMessageCheckListenerTest {
private DefaultTransactionalMessageCheckListener listener;
+ @Mock
+ private MessageStore messageStore;
@Spy
- private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(),
+ private BrokerController brokerController = new BrokerController(new BrokerConfig(),
+ new NettyServerConfig(),
new NettyClientConfig(), new MessageStoreConfig());
-
@Before
- public void init() {
+ public void init() throws Exception {
listener = new DefaultTransactionalMessageCheckListener();
listener.setBrokerController(brokerController);
+ brokerController.setMessageStore(messageStore);
+
+ }
+
+ @After
+ public void destroy() {
+// brokerController.shutdown();
}
@Test
@@ -53,21 +67,21 @@ public class DefaultTransactionalMessageCheckListenerTest {
}
@Test
- public void testSendCheckMessage() throws Exception{
+ public void testSendCheckMessage() throws Exception {
MessageExt messageExt = createMessageExt();
listener.sendCheckMessage(messageExt);
}
@Test
- public void sendCheckMessage(){
+ public void sendCheckMessage() {
listener.resolveDiscardMsg(createMessageExt());
}
private MessageExtBrokerInner createMessageExt() {
MessageExtBrokerInner inner = new MessageExtBrokerInner();
- MessageAccessor.putProperty(inner,MessageConst.PROPERTY_REAL_QUEUE_ID,"1");
- MessageAccessor.putProperty(inner,MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,"1234255");
- MessageAccessor.putProperty(inner,MessageConst.PROPERTY_REAL_TOPIC,"realTopic");
+ MessageAccessor.putProperty(inner, MessageConst.PROPERTY_REAL_QUEUE_ID, "1");
+ MessageAccessor.putProperty(inner, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "1234255");
+ MessageAccessor.putProperty(inner, MessageConst.PROPERTY_REAL_TOPIC, "realTopic");
inner.setTransactionId(inner.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
inner.setBody("check".getBytes());
inner.setMsgId("12344567890");
@@ -75,4 +89,22 @@ public class DefaultTransactionalMessageCheckListenerTest {
return inner;
}
+ @Test
+ public void testResolveDiscardMsg() {
+ MessageExt messageExt = new MessageExt();
+ messageExt.setTopic(MixAll.RMQ_SYS_TRANS_HALF_TOPIC);
+ messageExt.setQueueId(0);
+ messageExt.setBody("test resolve discard msg".getBytes());
+ messageExt.setStoreHost(new InetSocketAddress("127.0.0.1", 10911));
+ messageExt.setBornHost(new InetSocketAddress("127.0.0.1", 54270));
+ MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_TOPIC, "test_topic");
+ MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_PRODUCER_GROUP, "PID_TEST_DISCARD_MSG");
+ MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
+ MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, "15");
+ MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_QUEUE_ID, "2");
+ MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TAGS, "test_discard_msg");
+ MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "AC14157E4F1C18B4AAC27EB1A0F30000");
+ listener.resolveDiscardMsg(messageExt);
+ }
+
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index c3382ca..b743af9 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -122,6 +122,7 @@ import org.apache.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader;
import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader;
import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ResumeCheckHalfMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
@@ -2207,4 +2208,24 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
}
+
+ public boolean resumeCheckHalfMessage(final String addr, String msgId,
+ final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+ ResumeCheckHalfMessageRequestHeader requestHeader = new ResumeCheckHalfMessageRequestHeader();
+ requestHeader.setMsgId(msgId);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RESUME_CHECK_HALF_MESSAGE, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return true;
+ }
+ default:
+ log.error("Failed to resume half message check logic. Remark={}", response.getRemark());
+ return false;
+ }
+ }
}
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index 2855595..84af632 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -26,17 +26,12 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
-import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
-import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader;
-import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
-import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -294,6 +289,45 @@ public class MQClientAPIImplTest {
assertThat(ex.getErrorMessage()).isEqualTo("corresponding to accessConfig has been deleted failed");
}
}
+ @Test
+ public void testResumeCheckHalfMessage_WithException() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock mock) throws Throwable {
+ RemotingCommand request = mock.getArgument(1);
+ RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setOpaque(request.getOpaque());
+ response.setRemark("Put message back to RMQ_SYS_TRANS_HALF_TOPIC failed.");
+ return response;
+ }
+ }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+ boolean result = mqClientAPI.resumeCheckHalfMessage(brokerAddr, "test", 3000);
+ assertThat(result).isEqualTo(false);
+ }
+
+ @Test
+ public void testResumeCheckHalfMessage_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock mock) throws Throwable {
+ RemotingCommand request = mock.getArgument(1);
+ return createResumeSuccessResponse(request);
+ }
+ }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+ boolean result = mqClientAPI.resumeCheckHalfMessage(brokerAddr, "test", 3000);
+
+ assertThat(result).isEqualTo(true);
+ }
+
+ private RemotingCommand createResumeSuccessResponse(RemotingCommand request) {
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setOpaque(request.getOpaque());
+ return response;
+ }
private RemotingCommand createSuccessResponse(RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
@@ -329,7 +363,7 @@ public class MQClientAPIImplTest {
response.setOpaque(request.getOpaque());
response.markResponseType();
response.setRemark(null);
-
+
return response;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index 550b0b6..0af65df 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -92,6 +92,7 @@ public class MixAll {
public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC";
public static final String RMQ_SYS_TRACE_TOPIC = "RMQ_SYS_TRACE_TOPIC";
public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC";
+ public static final String TRANS_CHECK_MAX_TIME_TOPIC = "TRANS_CHECK_MAX_TIME_TOPIC";
public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS";
public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml";
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index b771b77..58c4b9f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -175,4 +175,9 @@ public class RequestCode {
public static final int QUERY_CONSUME_QUEUE = 321;
public static final int QUERY_DATA_VERSION = 322;
+
+ /**
+ * resume logic of checking half messages that have been put in TRANS_CHECK_MAXTIME_TOPIC before
+ */
+ public static final int RESUME_CHECK_HALF_MESSAGE = 323;
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ResumeCheckHalfMessageRequestHeader.java
similarity index 54%
copy from broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/ResumeCheckHalfMessageRequestHeader.java
index 529bfe4..14dacd5 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ResumeCheckHalfMessageRequestHeader.java
@@ -14,23 +14,32 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.broker.transaction.queue;
-import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
+package org.apache.rocketmq.common.protocol.header;
-public class DefaultTransactionalMessageCheckListener extends AbstractTransactionalMessageCheckListener {
- private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNullable;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
- public DefaultTransactionalMessageCheckListener() {
- super();
+public class ResumeCheckHalfMessageRequestHeader implements CommandCustomHeader {
+ @CFNullable
+ private String msgId;
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+
+ }
+
+ public String getMsgId() {
+ return msgId;
+ }
+
+ public void setMsgId(String msgId) {
+ this.msgId = msgId;
}
@Override
- public void resolveDiscardMsg(MessageExt msgExt) {
- log.error("MsgExt:{} has been checked too many times, so discard it", msgExt);
+ public String toString() {
+ return "ResumeCheckHalfMessageRequestHeader [msgId=" + msgId + "]";
}
}
diff --git a/pom.xml b/pom.xml
index 7ab8991..7d183e7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -455,7 +455,7 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
- <version>2.6.3</version>
+ <version>2.23.0</version>
<scope>test</scope>
</dependency>
</dependencies>
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index f00dcef..92371f1 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -537,4 +537,17 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
brokerAddr, topic, queueId, index, count, consumerGroup
);
}
+
+ @Override
+ public boolean resumeCheckHalfMessage(String msgId)
+ throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ return this.defaultMQAdminExtImpl.resumeCheckHalfMessage(msgId);
+ }
+
+ @Override
+ public boolean resumeCheckHalfMessage(String topic,
+ String msgId)
+ throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ return this.defaultMQAdminExtImpl.resumeCheckHalfMessage(topic, msgId);
+ }
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 502e9da..210d5a9 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -1025,4 +1025,23 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
brokerAddr, topic, queueId, index, count, consumerGroup, timeoutMillis
);
}
+
+ @Override
+ public boolean resumeCheckHalfMessage(String msgId)
+ throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ MessageExt msg = this.viewMessage(msgId);
+
+ return this.mqClientInstance.getMQClientAPIImpl().resumeCheckHalfMessage(RemotingUtil.socketAddress2String(msg.getStoreHost()), msgId, timeoutMillis);
+ }
+
+ @Override
+ public boolean resumeCheckHalfMessage(final String topic, final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ MessageExt msg = this.viewMessage(topic, msgId);
+ if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
+ return this.mqClientInstance.getMQClientAPIImpl().resumeCheckHalfMessage(RemotingUtil.socketAddress2String(msg.getStoreHost()), msgId, timeoutMillis);
+ } else {
+ MessageClientExt msgClient = (MessageClientExt) msg;
+ return this.mqClientInstance.getMQClientAPIImpl().resumeCheckHalfMessage(RemotingUtil.socketAddress2String(msg.getStoreHost()), msgClient.getOffsetMsgId(), timeoutMillis);
+ }
+ }
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 930785e..d5c75f0 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -273,4 +273,9 @@ public interface MQAdminExt extends MQAdmin {
final String topic, final int queueId,
final long index, final int count, final String consumerGroup)
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException;
+
+ boolean resumeCheckHalfMessage(String msgId)
+ throws RemotingException, MQClientException, InterruptedException, MQBrokerException;
+
+ boolean resumeCheckHalfMessage(final String topic, final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException;
}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java
index be6b636..e757608 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java
@@ -226,5 +226,20 @@ public class QueryMsgByUniqueKeySubCommandTest {
}
+ @Test
+ public void testExecute() throws SubCommandException {
+
+ System.setProperty("rocketmq.namesrv.addr", "127.0.0.1:9876");
+ QueryMsgByUniqueKeySubCommand cmd = new QueryMsgByUniqueKeySubCommand();
+ String[] args = new String[]{"-t myTopicTest", "-i 0A3A54F7BF7D18B4AAC28A3FA2CF0000"};
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin ", args, cmd.buildCommandlineOptions(options), new PosixParser());
+ cmd.execute(commandLine, options, null);
+
+ args = new String[]{"-t myTopicTest", "-i 0A3A54F7BF7D18B4AAC28A3FA2CF0000", "-g producerGroupName", "-d clientId"};
+ commandLine = ServerUtil.parseCmdLine("mqadmin ", args, cmd.buildCommandlineOptions(options), new PosixParser());
+ cmd.execute(commandLine, options, null);
+
+ }
}