You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/01/06 02:57:41 UTC
[rocketmq] branch 5.0.0-alpha updated: [ISSUE #3708] Both CQ and BCQ need to be supported in DefaultMessageStore. (#3712)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-alpha
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha by this push:
new e923a7e [ISSUE #3708] Both CQ and BCQ need to be supported in DefaultMessageStore. (#3712)
e923a7e is described below
commit e923a7ea328e5d62cf27cf96e446faa3bcfe25f0
Author: Hongjian Fei <er...@163.com>
AuthorDate: Thu Jan 6 10:56:05 2022 +0800
[ISSUE #3708] Both CQ and BCQ need to be supported in DefaultMessageStore. (#3712)
* [RIP-26] Both CQ and BCQ will be supported in DefaultMessageStore.
* [RIP-26] Both CQ and BCQ will be supported in DefaultMessageStore.
---
.../apache/rocketmq/broker/BrokerController.java | 11 +-
.../broker/processor/AdminBrokerProcessor.java | 1 -
.../broker/processor/ConsumerManageProcessor.java | 2 -
.../broker/processor/SendMessageProcessor.java | 7 +-
.../broker/topic/TopicConfigManagerTest.java | 9 +-
.../java/org/apache/rocketmq/client/MQAdmin.java | 14 +-
.../client/consumer/DefaultMQPullConsumer.java | 7 +-
.../client/consumer/DefaultMQPushConsumer.java | 6 +-
.../apache/rocketmq/client/impl/MQAdminImpl.java | 6 +-
.../impl/consumer/DefaultMQPullConsumerImpl.java | 2 +-
.../impl/consumer/DefaultMQPushConsumerImpl.java | 2 +-
.../impl/producer/DefaultMQProducerImpl.java | 2 +-
.../client/producer/DefaultMQProducer.java | 9 +-
.../apache/rocketmq/common/TopicAttributes.java | 4 +-
.../apache/rocketmq/common/attribute}/CQType.java | 2 +-
.../rocketmq/common/utils/QueueTypeUtils.java | 51 +
.../java/org/apache/rocketmq/store/CommitLog.java | 45 +-
.../org/apache/rocketmq/store/ConsumeQueue.java | 19 +-
.../apache/rocketmq/store/DefaultMessageStore.java | 223 +-
.../org/apache/rocketmq/store/MappedFileQueue.java | 16 +-
.../org/apache/rocketmq/store/MessageStore.java | 17 +-
.../apache/rocketmq/store/PutMessageContext.java | 67 +-
.../java/org/apache/rocketmq/store/StoreUtil.java | 4 -
.../apache/rocketmq/store/StreamMessageStore.java | 2573 --------------------
.../org/apache/rocketmq/store/TopicQueueLock.java | 11 +-
.../rocketmq/store/config/MessageStoreConfig.java | 11 -
.../rocketmq/store/dledger/DLedgerCommitLog.java | 3 +-
.../apache/rocketmq/store/logfile/MappedFile.java | 1 -
.../rocketmq/store/queue/BatchConsumeQueue.java | 28 +-
.../store/queue/ConsumeQueueInterface.java | 2 +
.../rocketmq/store/queue/ConsumeQueueStore.java | 290 ++-
.../rocketmq/store/queue/FileQueueLifeCycle.java | 2 +
.../rocketmq/store/queue/QueueOffsetAssigner.java | 60 +
.../apache/rocketmq/store/util/QueueTypeUtils.java | 55 -
.../store/DefaultMessageStoreShutDownTest.java | 3 +-
.../rocketmq/store/DefaultMessageStoreTest.java | 3 +-
.../store/queue/BatchConsumeMessageTest.java | 274 ++-
.../store/queue/BatchConsumeQueueTest.java | 27 +-
.../rocketmq/store/queue/ConsumeQueueTest.java | 3 +-
.../apache/rocketmq/store/queue/QueueTestBase.java | 28 +-
.../rocketmq/test/util/MQAdminTestUtils.java | 8 +-
.../org/apache/rocketmq/test/base/BaseConf.java | 35 +-
.../rocketmq/test/base/IntegrationTestBase.java | 49 +-
.../base/dledger/DLedgerProduceAndConsumeIT.java | 5 +-
.../test/client/producer/batch/BatchSendIT.java | 37 +-
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 8 +-
.../tools/admin/DefaultMQAdminExtImpl.java | 16 +-
.../tools/command/topic/UpdateTopicSubCommand.java | 1 -
48 files changed, 912 insertions(+), 3147 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 6f9563b..7cf48c9 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -96,11 +96,9 @@ import org.apache.rocketmq.srvutil.FileWatchService;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.store.MessageStore;
-import org.apache.rocketmq.store.StreamMessageStore;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
-import org.apache.rocketmq.store.queue.CQType;
import org.apache.rocketmq.store.stats.BrokerStats;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
@@ -298,14 +296,9 @@ public class BrokerController {
if (result) {
try {
- MessageStore messageStore;
- if (Objects.equals(CQType.BatchCQ.toString(), this.messageStoreConfig.getDefaultCQType())) {
- messageStore = new StreamMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig);
- } else {
- messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig);
- }
+ this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig);
+ ((DefaultMessageStore) this.messageStore).setTopicConfigTable(topicConfigManager.getTopicConfigTable());
- this.messageStore = messageStore;
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog) messageStore.getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 5b8a19f..c1819a5 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -33,7 +33,6 @@ import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
-import org.apache.rocketmq.common.TopicAttributes;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index a266442..31a7993 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -32,12 +32,10 @@ import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHe
import org.apache.rocketmq.common.rpc.RpcClientUtils;
import org.apache.rocketmq.common.rpc.RpcRequest;
import org.apache.rocketmq.common.rpc.RpcResponse;
-import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
-import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 3e3173e..a3f5c4e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -18,9 +18,9 @@ package org.apache.rocketmq.broker.processor;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
@@ -42,7 +42,6 @@ import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
-import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
@@ -56,6 +55,7 @@ import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.QueueTypeUtils;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RemotingResponseCallback;
@@ -63,7 +63,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBatch;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
-import org.apache.rocketmq.store.StoreUtil;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
@@ -684,7 +683,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
CompletableFuture<PutMessageResult> putMessageResult;
- if (StoreUtil.isStreamMode(this.brokerController.getMessageStore()) && MessageClientIDSetter.getUniqID(messageExtBatch) != null) {
+ if (QueueTypeUtils.isBatchCq(Optional.of(topicConfig)) && MessageClientIDSetter.getUniqID(messageExtBatch) != null) {
// newly introduced inner-batch message
messageExtBatch.setSysFlag(messageExtBatch.getSysFlag() | MessageSysFlag.NEED_UNWRAP_FLAG);
messageExtBatch.setSysFlag(messageExtBatch.getSysFlag() | MessageSysFlag.INNER_BATCH_FLAG);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
index 79cc01a..6b4f30d 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
@@ -24,9 +24,9 @@ import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.attribute.BooleanAttribute;
import org.apache.rocketmq.common.attribute.EnumAttribute;
import org.apache.rocketmq.common.attribute.LongRangeAttribute;
+import org.apache.rocketmq.common.utils.QueueTypeUtils;
import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.store.queue.CQType;
-import org.apache.rocketmq.store.util.QueueTypeUtils;
+import org.apache.rocketmq.common.attribute.CQType;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -37,6 +37,7 @@ import org.mockito.junit.MockitoJUnitRunner;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static com.google.common.collect.Sets.newHashSet;
import static java.util.Arrays.asList;
@@ -123,7 +124,7 @@ public class TopicConfigManagerTest {
@Test
public void testAddWrongValueOnCreating() {
Map<String, String> attributes = new HashMap<>();
- attributes.put("+" + TopicAttributes.QUEUE_TYPE.getName(), "wrong-value");
+ attributes.put("+" + TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), "wrong-value");
TopicConfig topicConfig = new TopicConfig();
topicConfig.setTopicName("new-topic");
@@ -300,7 +301,7 @@ public class TopicConfigManagerTest {
topicConfigManager.updateTopicConfig(topicConfig);
TopicConfig topicConfigUpdated = topicConfigManager.getTopicConfigTable().get(topic);
- Assert.assertEquals(CQType.SimpleCQ, QueueTypeUtils.getCQType(topicConfigUpdated));
+ Assert.assertEquals(CQType.SimpleCQ, QueueTypeUtils.getCQType(Optional.of(topicConfigUpdated)));
Assert.assertEquals("true", topicConfigUpdated.getAttributes().get(unchangeable));
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java b/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java
index 63b2d14..79386bd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java
+++ b/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java
@@ -22,29 +22,31 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
+import java.util.Map;
+
/**
* Base interface for MQ management
*/
public interface MQAdmin {
/**
* Creates an topic
- *
- * @param key accesskey
+ * @param key accesskey
* @param newTopic topic name
* @param queueNum topic's queue number
+ * @param attributes
*/
- void createTopic(final String key, final String newTopic, final int queueNum)
+ void createTopic(final String key, final String newTopic, final int queueNum, Map<String, String> attributes)
throws MQClientException;
/**
* Creates an topic
- *
- * @param key accesskey
+ * @param key accesskey
* @param newTopic topic name
* @param queueNum topic's queue number
* @param topicSysFlag topic system flag
+ * @param attributes
*/
- void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
+ void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes)
throws MQClientException;
/**
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index 5829f77..3b893bb 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.client.consumer;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
@@ -124,8 +125,8 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
*/
@Deprecated
@Override
- public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
- createTopic(key, withNamespace(newTopic), queueNum, 0);
+ public void createTopic(String key, String newTopic, int queueNum, Map<String, String> attributes) throws MQClientException {
+ createTopic(key, withNamespace(newTopic), queueNum, 0, null);
}
/**
@@ -133,7 +134,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
*/
@Deprecated
@Override
- public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
+ public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes) throws MQClientException {
this.defaultMQPullConsumerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag);
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index fce6b64..5ef2b63 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -431,8 +431,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
@Deprecated
@Override
- public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
- createTopic(key, withNamespace(newTopic), queueNum, 0);
+ public void createTopic(String key, String newTopic, int queueNum, Map<String, String> attributes) throws MQClientException {
+ createTopic(key, withNamespace(newTopic), queueNum, 0, null);
}
@Override
@@ -448,7 +448,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
@Deprecated
@Override
- public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
+ public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes) throws MQClientException {
this.defaultMQPushConsumerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag);
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index 0a6e005..73e35c5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -76,10 +77,10 @@ public class MQAdminImpl {
}
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
- createTopic(key, newTopic, queueNum, 0);
+ createTopic(key, newTopic, queueNum, 0, null);
}
- public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
+ public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes) throws MQClientException {
try {
Validators.checkTopic(newTopic);
Validators.isSystemTopic(newTopic);
@@ -100,6 +101,7 @@ public class MQAdminImpl {
topicConfig.setReadQueueNums(queueNum);
topicConfig.setWriteQueueNums(queueNum);
topicConfig.setTopicSysFlag(topicSysFlag);
+ topicConfig.setAttributes(attributes);
boolean createOK = false;
for (int i = 0; i < 5; i++) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 05d3d48..497b274 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -100,7 +100,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
this.isRunning();
- this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag);
+ this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag, null);
}
private void isRunning() throws MQClientException {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 2fa3830..abe3ca7 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -184,7 +184,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
- this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag);
+ this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag, null);
}
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index bf2ca28..c1ec308 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -447,7 +447,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
Validators.checkTopic(newTopic);
Validators.isSystemTopic(newTopic);
- this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag);
+ this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag, null);
}
private void makeSureStateOK() throws MQClientException {
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 230785c..8c046aa 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.client.producer;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
@@ -757,12 +758,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @param key accesskey
* @param newTopic topic name
* @param queueNum topic's queue number
+ * @param attributes
* @throws MQClientException if there is any client error.
*/
@Deprecated
@Override
- public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
- createTopic(key, withNamespace(newTopic), queueNum, 0);
+ public void createTopic(String key, String newTopic, int queueNum, Map<String, String> attributes) throws MQClientException {
+ createTopic(key, withNamespace(newTopic), queueNum, 0, null);
}
/**
@@ -773,11 +775,12 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @param newTopic topic name
* @param queueNum topic's queue number
* @param topicSysFlag topic system flag
+ * @param attributes
* @throws MQClientException if there is any client error.
*/
@Deprecated
@Override
- public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
+ public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes) throws MQClientException {
this.defaultMQProducerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag);
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java b/common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java
index 7bed217..add383f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java
@@ -25,7 +25,7 @@ import java.util.Map;
import static com.google.common.collect.Sets.newHashSet;
public class TopicAttributes {
- public static final EnumAttribute QUEUE_TYPE = new EnumAttribute(
+ public static final EnumAttribute QUEUE_TYPE_ATTRIBUTE = new EnumAttribute(
"queue.type",
false,
newHashSet("BatchCQ", "SimpleCQ"),
@@ -35,6 +35,6 @@ public class TopicAttributes {
static {
ALL = new HashMap<>();
- ALL.put(QUEUE_TYPE.getName(), QUEUE_TYPE);
+ ALL.put(QUEUE_TYPE_ATTRIBUTE.getName(), QUEUE_TYPE_ATTRIBUTE);
}
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/CQType.java b/common/src/main/java/org/apache/rocketmq/common/attribute/CQType.java
similarity index 94%
rename from store/src/main/java/org/apache/rocketmq/store/queue/CQType.java
rename to common/src/main/java/org/apache/rocketmq/common/attribute/CQType.java
index efd65af..6bd6ad2 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/CQType.java
+++ b/common/src/main/java/org/apache/rocketmq/common/attribute/CQType.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.rocketmq.store.queue;
+package org.apache.rocketmq.common.attribute;
public enum CQType {
SimpleCQ,
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/QueueTypeUtils.java b/common/src/main/java/org/apache/rocketmq/common/utils/QueueTypeUtils.java
new file mode 100644
index 0000000..e2f006e
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/QueueTypeUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.utils;
+
+import org.apache.rocketmq.common.TopicAttributes;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.attribute.CQType;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+public class QueueTypeUtils {
+
+ public static boolean isBatchCq(Optional<TopicConfig> topicConfig) {
+ return Objects.equals(CQType.BatchCQ, getCQType(topicConfig));
+ }
+
+ public static CQType getCQType(Optional<TopicConfig> topicConfig) {
+ if (!topicConfig.isPresent()) {
+ return CQType.valueOf(TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getDefaultValue());
+ }
+
+ String attributeName = TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName();
+
+ Map<String, String> attributes = topicConfig.get().getAttributes();
+ if (attributes == null || attributes.size() == 0) {
+ return CQType.valueOf(TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getDefaultValue());
+ }
+
+ if (attributes.containsKey(attributeName)) {
+ return CQType.valueOf(attributes.get(attributeName));
+ } else {
+ return CQType.valueOf(TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getDefaultValue());
+ }
+ }
+}
\ No newline at end of file
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 062c269..2ae84eb 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -24,10 +24,10 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageAccessor;
@@ -44,6 +45,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.QueueTypeUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.config.BrokerRole;
@@ -51,9 +53,8 @@ import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.logfile.MappedFile;
-import org.apache.rocketmq.store.queue.CQType;
+import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.store.schedule.ScheduleMessageService;
-import org.apache.rocketmq.store.util.QueueTypeUtils;
/**
* Store all metadata downtime for recovery, data protection reliability
@@ -433,8 +434,8 @@ public class CommitLog implements Swappable {
private void setBatchSizeIfNeeded(Map<String, String> propertiesMap, DispatchRequest dispatchRequest) {
if (null != propertiesMap && propertiesMap.containsKey(MessageConst.PROPERTY_INNER_NUM) && propertiesMap.containsKey(MessageConst.PROPERTY_INNER_BASE)) {
- dispatchRequest.setMsgBaseOffset(Long.valueOf(propertiesMap.get(MessageConst.PROPERTY_INNER_BASE)));
- dispatchRequest.setBatchSize(Short.valueOf(propertiesMap.get(MessageConst.PROPERTY_INNER_NUM)));
+ dispatchRequest.setMsgBaseOffset(Long.parseLong(propertiesMap.get(MessageConst.PROPERTY_INNER_BASE)));
+ dispatchRequest.setBatchSize(Short.parseShort(propertiesMap.get(MessageConst.PROPERTY_INNER_NUM)));
}
}
@@ -672,7 +673,7 @@ public class CommitLog implements Swappable {
topicQueueLock.lock(topicQueueKey);
try {
- defaultMessageStore.assignOffset(topicQueueKey, msg, getBatchNum(msg));
+ defaultMessageStore.assignOffset(topicQueueKey, msg, getMessageNum(msg));
PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
if (encodeResult != null) {
@@ -1031,15 +1032,6 @@ public class CommitLog implements Swappable {
return this.mappedFileQueue.retryDeleteFirstFile(intervalForcibly);
}
- public void removeQueueFromTopicQueueTable(final String topic, final int queueId) {
- String key = topic + "-" + queueId;
- synchronized (this) {
- this.defaultMessageStore.removeOffsetTable(key);
- }
-
- log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", topic, queueId);
- }
-
public void checkSelf() {
mappedFileQueue.checkSelf();
}
@@ -1058,19 +1050,24 @@ public class CommitLog implements Swappable {
return diff;
}
- protected short getBatchNum(MessageExtBrokerInner msgInner) {
- short batchNum = 1;
+ protected short getMessageNum(MessageExtBrokerInner msgInner) {
+ short messageNum = 1;
// IF inner batch, build batchQueueOffset and batchNum property.
- CQType cqType = QueueTypeUtils.getCQType(defaultMessageStore);
- if (MessageSysFlag.check(msgInner.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG) || CQType.BatchCQ.equals(cqType)) {
+ CQType cqType = getCqType(msgInner);
+ if (MessageSysFlag.check(msgInner.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG) || CQType.BatchCQ.equals(cqType)) {
if (msgInner.getProperty(MessageConst.PROPERTY_INNER_NUM) != null) {
- batchNum = Short.parseShort(msgInner.getProperty(MessageConst.PROPERTY_INNER_NUM));
- batchNum = batchNum >= 1 ? batchNum : 1;
+ messageNum = Short.parseShort(msgInner.getProperty(MessageConst.PROPERTY_INNER_NUM));
+ messageNum = messageNum >= 1 ? messageNum : 1;
}
}
- return batchNum;
+ return messageNum;
+ }
+
+ private CQType getCqType(MessageExtBrokerInner msgInner) {
+ Optional<TopicConfig> topicConfig = this.defaultMessageStore.getTopicConfig(msgInner.getTopic());
+ return QueueTypeUtils.getCQType(topicConfig);
}
abstract class FlushCommitLogService extends ServiceThread {
@@ -1488,7 +1485,7 @@ public class CommitLog implements Swappable {
Long queueOffset = msgInner.getQueueOffset();
// this msg maybe a inner-batch msg.
- short batchNum = getBatchNum(msgInner);
+ short messageNum = getMessageNum(msgInner);
// Transaction messages that require special handling
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
@@ -1545,7 +1542,7 @@ public class CommitLog implements Swappable {
CommitLog.this.getMessageStore().getPerfCounter().endTick("WRITE_MEMORY_TIME_MS");
msgInner.setEncodedBuff(null);
return new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
- msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills, batchNum);
+ msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills, messageNum);
}
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 0efe74c..7763a0f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.store;
import java.io.File;
import java.nio.ByteBuffer;
+import java.util.HashMap;
import java.util.List;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
@@ -25,10 +26,11 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.logfile.MappedFile;
-import org.apache.rocketmq.store.queue.CQType;
+import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.queue.FileQueueLifeCycle;
+import org.apache.rocketmq.store.queue.QueueOffsetAssigner;
import org.apache.rocketmq.store.queue.ReferredIterator;
public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
@@ -37,7 +39,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
public static final int CQ_STORE_UNIT_SIZE = 20;
private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
- private final DefaultMessageStore defaultMessageStore;
+ private final MessageStore defaultMessageStore;
private final MappedFileQueue mappedFileQueue;
private final String topic;
@@ -55,7 +57,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
final int queueId,
final String storePath,
final int mappedFileSize,
- final DefaultMessageStore defaultMessageStore) {
+ final MessageStore defaultMessageStore) {
this.storePath = storePath;
this.mappedFileSize = mappedFileSize;
this.defaultMessageStore = defaultMessageStore;
@@ -438,6 +440,17 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}
+ @Override
+ public void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, MessageExtBrokerInner msg, short messageNum) {
+ String topicQueueKey = getTopic() + "-" + getQueueId();
+ HashMap<String, Long> topicQueueTable = queueOffsetAssigner.getTopicQueueTable();
+
+ long topicOffset = topicQueueTable.computeIfAbsent(topicQueueKey, k -> 0L);
+ topicQueueTable.put(topicQueueKey, topicOffset + messageNum);
+
+ msg.setQueueOffset(topicOffset);
+ }
+
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index d25b9eb..da47be6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -32,9 +32,9 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -47,13 +47,16 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.running.RunningStats;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.QueueTypeUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.config.BrokerRole;
@@ -74,8 +77,6 @@ import org.apache.rocketmq.store.schedule.ScheduleMessageService;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.store.util.PerfCounter;
-import static java.lang.String.format;
-
public class DefaultMessageStore implements MessageStore {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
@@ -87,10 +88,6 @@ public class DefaultMessageStore implements MessageStore {
private final ConsumeQueueStore consumeQueueStore;
- private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueueInterface>> consumeQueueTable;
-
- protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<>(1024);
-
private final FlushConsumeQueueService flushConsumeQueueService;
private final CleanCommitLogService cleanCommitLogService;
@@ -154,8 +151,7 @@ public class DefaultMessageStore implements MessageStore {
} else {
this.commitLog = new CommitLog(this);
}
- this.consumeQueueTable = new ConcurrentHashMap<>(32);
- this.consumeQueueStore = new ConsumeQueueStore(this, this.messageStoreConfig, this.consumeQueueTable);
+ this.consumeQueueStore = new ConsumeQueueStore(this, this.messageStoreConfig);
this.flushConsumeQueueService = new FlushConsumeQueueService();
this.cleanCommitLogService = new CleanCommitLogService();
@@ -194,13 +190,7 @@ public class DefaultMessageStore implements MessageStore {
@Override
public void truncateDirtyLogicFiles(long phyOffset) {
- ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> tables = DefaultMessageStore.this.consumeQueueTable;
-
- for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : tables.values()) {
- for (ConsumeQueueInterface logic : maps.values()) {
- this.consumeQueueStore.truncateDirtyLogicFiles(logic, phyOffset);
- }
- }
+ this.consumeQueueStore.truncateDirty(phyOffset);
}
/**
@@ -211,7 +201,6 @@ public class DefaultMessageStore implements MessageStore {
boolean result = true;
try {
- long start = System.currentTimeMillis();
boolean lastExitOK = !this.isTempFileExist();
log.info("last shutdown {}, root dir: {}", lastExitOK ? "normally" : "abnormally", messageStoreConfig.getStorePathRootDir());
@@ -219,7 +208,7 @@ public class DefaultMessageStore implements MessageStore {
result = result && this.commitLog.load();
// load Consume Queue
- result = result && this.loadConsumeQueue();
+ result = result && this.consumeQueueStore.load();
if (result) {
this.storeCheckpoint =
@@ -269,7 +258,7 @@ public class DefaultMessageStore implements MessageStore {
* 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed.
*/
long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
- for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
+ for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.getConsumeQueueTable().values()) {
for (ConsumeQueueInterface logic : maps.values()) {
if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
@@ -387,11 +376,7 @@ public class DefaultMessageStore implements MessageStore {
@Override
public void destroyLogics() {
- for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
- for (ConsumeQueueInterface logic : maps.values()) {
- this.consumeQueueStore.destroy(logic);
- }
- }
+ this.consumeQueueStore.destroy();
}
private PutMessageStatus checkMessage(MessageExtBrokerInner msg) {
@@ -464,6 +449,20 @@ public class DefaultMessageStore implements MessageStore {
return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
}
+ if (msg.getProperties().containsKey(MessageConst.PROPERTY_INNER_NUM)
+ && !MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) {
+ log.warn("[BUG]The message had property {} but is not an inner batch", MessageConst.PROPERTY_INNER_NUM);
+ return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+ }
+
+ if (MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) {
+ Optional<TopicConfig> topicConfig = this.getTopicConfig(msg.getTopic());
+ if (!QueueTypeUtils.isBatchCq(topicConfig)) {
+ log.error("[BUG]The message is an inner batch but cq type is not batch cq");
+ return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+ }
+ }
+
long beginTime = this.getSystemClock().now();
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
@@ -649,8 +648,7 @@ public class DefaultMessageStore implements MessageStore {
break;
}
- if (this.isTheBatchFull(sizePy, maxMsgNums, maxPullSize, getResult.getBufferTotalSize(), getResult.getMessageCount(),
- isInDisk)) {
+ if (this.isTheBatchFull(sizePy, cqUnit.getBatchNum(), maxMsgNums, maxPullSize, getResult.getBufferTotalSize(), getResult.getMessageCount(), isInDisk)) {
break;
}
@@ -756,7 +754,7 @@ public class DefaultMessageStore implements MessageStore {
return logic.getMaxOffsetInQueue();
}
} else {
- Long offset = this.topicQueueTable.get(topic + "-" + queueId);
+ Long offset = this.consumeQueueStore.getMaxOffset(topic, queueId);
if (offset != null) {
return offset;
}
@@ -800,7 +798,11 @@ public class DefaultMessageStore implements MessageStore {
public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {
ConsumeQueueInterface logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
- return logic.getOffsetInQueueByTime(timestamp);
+ long resultOffset = logic.getOffsetInQueueByTime(timestamp);
+ // Make sure the result offset is in valid range.
+ resultOffset = Math.max(resultOffset, logic.getMinOffsetInQueue());
+ resultOffset = Math.min(resultOffset, logic.getMaxOffsetInQueue());
+ return resultOffset;
}
return 0;
@@ -1071,7 +1073,7 @@ public class DefaultMessageStore implements MessageStore {
@Override
public int cleanUnusedTopic(Set<String> topics) {
- Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>> it = this.consumeQueueTable.entrySet().iterator();
+ Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>> it = this.getConsumeQueueTable().entrySet().iterator();
while (it.hasNext()) {
Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> next = it.next();
String topic = next.getKey();
@@ -1086,7 +1088,7 @@ public class DefaultMessageStore implements MessageStore {
cq.getQueueId()
);
- this.commitLog.removeQueueFromTopicQueueTable(cq.getTopic(), cq.getQueueId());
+ this.consumeQueueStore.removeTopicQueueTable(cq.getTopic(), cq.getQueueId());
}
it.remove();
@@ -1105,45 +1107,7 @@ public class DefaultMessageStore implements MessageStore {
public void cleanExpiredConsumerQueue() {
long minCommitLogOffset = this.commitLog.getMinOffset();
- Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>> it = this.consumeQueueTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> next = it.next();
- String topic = next.getKey();
- if (!topic.equals(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC)) {
- ConcurrentMap<Integer, ConsumeQueueInterface> queueTable = next.getValue();
- Iterator<Entry<Integer, ConsumeQueueInterface>> itQT = queueTable.entrySet().iterator();
- while (itQT.hasNext()) {
- Entry<Integer, ConsumeQueueInterface> nextQT = itQT.next();
- long maxCLOffsetInConsumeQueue = nextQT.getValue().getLastOffset();
-
- if (maxCLOffsetInConsumeQueue == -1) {
- log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.",
- nextQT.getValue().getTopic(),
- nextQT.getValue().getQueueId(),
- nextQT.getValue().getMaxPhysicOffset(),
- nextQT.getValue().getMinLogicOffset());
- } else if (maxCLOffsetInConsumeQueue < minCommitLogOffset) {
- log.info(
- "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}",
- topic,
- nextQT.getKey(),
- minCommitLogOffset,
- maxCLOffsetInConsumeQueue);
-
- DefaultMessageStore.this.commitLog.removeQueueFromTopicQueueTable(nextQT.getValue().getTopic(),
- nextQT.getValue().getQueueId());
-
- this.consumeQueueStore.destroy(nextQT.getValue());
- itQT.remove();
- }
- }
-
- if (queueTable.isEmpty()) {
- log.info("cleanExpiredConsumerQueue: {},topic destroyed", topic);
- it.remove();
- }
- }
- }
+ this.consumeQueueStore.cleanExpired(minCommitLogOffset);
}
public Map<String, Long> getMessageIds(final String topic, final int queueId, long minOffset, long maxOffset,
@@ -1269,13 +1233,13 @@ public class DefaultMessageStore implements MessageStore {
return (maxOffsetPy - offsetPy) > memory;
}
- private boolean isTheBatchFull(int sizePy, int maxMsgNums, long maxMsgSize, int bufferTotal, int messageTotal, boolean isInDisk) {
+ private boolean isTheBatchFull(int sizePy, int unitBatchNum, int maxMsgNums, long maxMsgSize, int bufferTotal, int messageTotal, boolean isInDisk) {
if (0 == bufferTotal || 0 == messageTotal) {
return false;
}
- if (maxMsgNums <= messageTotal) {
+ if (messageTotal + unitBatchNum > maxMsgNums) {
return true;
}
@@ -1392,16 +1356,7 @@ public class DefaultMessageStore implements MessageStore {
private void checkSelf() {
this.commitLog.checkSelf();
-
- Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>> it = this.consumeQueueTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> next = it.next();
- Iterator<Entry<Integer, ConsumeQueueInterface>> itNext = next.getValue().entrySet().iterator();
- while (itNext.hasNext()) {
- Entry<Integer, ConsumeQueueInterface> cq = itNext.next();
- this.consumeQueueStore.checkSelf(cq.getValue());
- }
- }
+ this.consumeQueueStore.checkSelf();
}
private boolean isTempFileExist() {
@@ -1410,53 +1365,6 @@ public class DefaultMessageStore implements MessageStore {
return file.exists();
}
- private boolean loadConsumeQueue() {
- checkOtherConsumeQueue();
-
- File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
- File[] fileTopicList = dirLogic.listFiles();
- if (fileTopicList != null) {
-
- for (File fileTopic : fileTopicList) {
- String topic = fileTopic.getName();
-
- File[] fileQueueIdList = fileTopic.listFiles();
- if (fileQueueIdList != null) {
- for (File fileQueueId : fileQueueIdList) {
- int queueId;
- try {
- queueId = Integer.parseInt(fileQueueId.getName());
- } catch (NumberFormatException e) {
- continue;
- }
- ConsumeQueueInterface logic = new ConsumeQueue(
- topic,
- queueId,
- StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
- this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
- this);
- this.putConsumeQueue(topic, queueId, logic);
- if (!this.consumeQueueStore.load(logic)) {
- return false;
- }
- }
- }
- }
- }
-
- log.info("load logics queue all over, OK");
-
- return true;
- }
-
- private void checkOtherConsumeQueue() {
- File dirLogic = new File(StorePathConfigHelper.getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
- if (dirLogic.exists()) {
- throw new RuntimeException(format("Batch consume queue directory: [%s] exist. Can not load consume queue while batch consume queue exists.",
- StorePathConfigHelper.getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir())));
- }
- }
-
private void recover(final boolean lastExitOK) {
long recoverCqStart = System.currentTimeMillis();
long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
@@ -1485,43 +1393,13 @@ public class DefaultMessageStore implements MessageStore {
return transientStorePool;
}
- private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueueInterface consumeQueue) {
- ConcurrentMap<Integer/* queueId */, ConsumeQueueInterface> map = this.consumeQueueTable.get(topic);
- if (null == map) {
- map = new ConcurrentHashMap<Integer/* queueId */, ConsumeQueueInterface>();
- map.put(queueId, consumeQueue);
- this.consumeQueueTable.put(topic, map);
- } else {
- map.put(queueId, consumeQueue);
- }
- }
-
private long recoverConsumeQueue() {
- long maxPhysicOffset = -1;
- for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
- for (ConsumeQueueInterface logic : maps.values()) {
- this.consumeQueueStore.recover(logic);
- if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
- maxPhysicOffset = logic.getMaxPhysicOffset();
- }
- }
- }
-
- return maxPhysicOffset;
+ return this.consumeQueueStore.recover();
}
public void recoverTopicQueueTable() {
- HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
long minPhyOffset = this.commitLog.getMinOffset();
- for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
- for (ConsumeQueueInterface logic : maps.values()) {
- String key = logic.getTopic() + "-" + logic.getQueueId();
- table.put(key, logic.getMaxOffsetInQueue());
- this.consumeQueueStore.correctMinOffset(logic, minPhyOffset);
- }
- }
-
- this.topicQueueTable = table;
+ this.consumeQueueStore.recoverOffsetTable(minPhyOffset);
}
@Override
@@ -1539,7 +1417,7 @@ public class DefaultMessageStore implements MessageStore {
}
public ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> getConsumeQueueTable() {
- return consumeQueueTable;
+ return consumeQueueStore.getConsumeQueueTable();
}
@Override
@@ -1569,8 +1447,7 @@ public class DefaultMessageStore implements MessageStore {
}
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
- ConsumeQueueInterface cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
- this.consumeQueueStore.putMessagePositionInfoWrapper(cq, dispatchRequest);
+ this.consumeQueueStore.putMessagePositionInfoWrapper(dispatchRequest);
}
@Override
@@ -1606,7 +1483,7 @@ public class DefaultMessageStore implements MessageStore {
@Override
public ConsumeQueueInterface getConsumeQueue(String topic, int queueId) {
- ConcurrentMap<Integer, ConsumeQueueInterface> map = consumeQueueTable.get(topic);
+ ConcurrentMap<Integer, ConsumeQueueInterface> map = this.getConsumeQueueTable().get(topic);
if (map == null) {
return null;
}
@@ -1656,19 +1533,21 @@ public class DefaultMessageStore implements MessageStore {
}
@Override
- public void assignOffset(String topicQueueKey, MessageExtBrokerInner msg, short batchNum) {
+ public void assignOffset(String topicQueueKey, MessageExtBrokerInner msg, short messageNum) {
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
- long topicOffset = this.topicQueueTable.computeIfAbsent(topicQueueKey, k -> 0L);
- msg.setQueueOffset(topicOffset);
- this.topicQueueTable.put(topicQueueKey, topicOffset + batchNum);
+ this.consumeQueueStore.assignQueueOffset(msg, messageNum);
}
}
@Override
- public void removeOffsetTable(String topicQueueKey) {
- this.topicQueueTable.remove(topicQueueKey);
+ public Optional<TopicConfig> getTopicConfig(String topic) {
+ return this.consumeQueueStore.getTopicConfig(topic);
+ }
+
+ public void setTopicConfigTable(ConcurrentMap<String, TopicConfig> topicConfigTable) {
+ this.consumeQueueStore.setTopicConfigTable(topicConfigTable);
}
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@@ -1975,7 +1854,7 @@ public class DefaultMessageStore implements MessageStore {
if (minOffset > this.lastPhysicalMinOffset) {
this.lastPhysicalMinOffset = minOffset;
- ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> tables = DefaultMessageStore.this.consumeQueueTable;
+ ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> tables = DefaultMessageStore.this.getConsumeQueueTable();
for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : tables.values()) {
for (ConsumeQueueInterface logic : maps.values()) {
@@ -2021,7 +1900,7 @@ public class DefaultMessageStore implements MessageStore {
logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
}
- ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> tables = DefaultMessageStore.this.consumeQueueTable;
+ ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> tables = DefaultMessageStore.this.getConsumeQueueTable();
for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : tables.values()) {
for (ConsumeQueueInterface cq : maps.values()) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
index a7d5083..2ad6ee4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
@@ -171,8 +171,8 @@ public class MappedFileQueue implements Swappable {
return true;
}
- try {
- MappedFile mappedFile = new DefaultMappedFile(file.getPath(), mappedFileSize);
+ try {
+ MappedFile mappedFile = new DefaultMappedFile(file.getPath(), mappedFileSize);
mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
@@ -234,13 +234,13 @@ public class MappedFileQueue implements Swappable {
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
- } else {
- try {
- mappedFile = new DefaultMappedFile(nextFilePath, this.mappedFileSize);
- } catch (IOException e) {
- log.error("create mappedFile exception", e);
- }
+ } else {
+ try {
+ mappedFile = new DefaultMappedFile(nextFilePath, this.mappedFileSize);
+ } catch (IOException e) {
+ log.error("create mappedFile exception", e);
}
+ }
if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 51d8a24..341a29f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -18,10 +18,12 @@ package org.apache.rocketmq.store;
import java.util.HashMap;
import java.util.LinkedList;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.SystemClock;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
@@ -562,16 +564,19 @@ public interface MessageStore {
boolean isSyncMaster();
/**
- * assign an queue offset and increase it.
+ * Assign an queue offset and increase it.
+ * If there is a race condition, you need to lock/unlock this method yourself.
+ *
* @param topicQueueKey topic-queue key
* @param msg message
- * @param batchNum batch num
+ * @param messageNum message num
*/
- void assignOffset(String topicQueueKey, MessageExtBrokerInner msg, short batchNum);
+ void assignOffset(String topicQueueKey, MessageExtBrokerInner msg, short messageNum);
/**
- * remove offset table
- * @param topicQueueKey topic-queue key
+ * get topic config
+ * @param topic topic name
+ * @return topic config info
*/
- void removeOffsetTable(String topicQueueKey);
+ Optional<TopicConfig> getTopicConfig(String topic);
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageContext.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageContext.java
index d4c160d..bf8832d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/PutMessageContext.java
+++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageContext.java
@@ -8,40 +8,41 @@
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
+
package org.apache.rocketmq.store;
public class PutMessageContext {
- private String topicQueueTableKey;
- private long[] phyPos;
- private int batchSize;
-
- public PutMessageContext(String topicQueueTableKey) {
- this.topicQueueTableKey = topicQueueTableKey;
- }
-
- public String getTopicQueueTableKey() {
- return topicQueueTableKey;
- }
-
- public long[] getPhyPos() {
- return phyPos;
- }
-
- public void setPhyPos(long[] phyPos) {
- this.phyPos = phyPos;
- }
-
- public int getBatchSize() {
- return batchSize;
- }
-
- public void setBatchSize(int batchSize) {
- this.batchSize = batchSize;
- }
- }
\ No newline at end of file
+ private String topicQueueTableKey;
+ private long[] phyPos;
+ private int batchSize;
+
+ public PutMessageContext(String topicQueueTableKey) {
+ this.topicQueueTableKey = topicQueueTableKey;
+ }
+
+ public String getTopicQueueTableKey() {
+ return topicQueueTableKey;
+ }
+
+ public long[] getPhyPos() {
+ return phyPos;
+ }
+
+ public void setPhyPos(long[] phyPos) {
+ this.phyPos = phyPos;
+ }
+
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ }
+}
\ No newline at end of file
diff --git a/store/src/main/java/org/apache/rocketmq/store/StoreUtil.java b/store/src/main/java/org/apache/rocketmq/store/StoreUtil.java
index 81b407e..e910c2a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/StoreUtil.java
+++ b/store/src/main/java/org/apache/rocketmq/store/StoreUtil.java
@@ -44,10 +44,6 @@ public class StoreUtil {
return physicalTotal;
}
- public static boolean isStreamMode(MessageStore messageStore) {
- return messageStore instanceof StreamMessageStore;
- }
-
public static void fileAppend(MappedFile file, ByteBuffer data) {
boolean success = file.appendMessage(data);
if (!success) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/StreamMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/StreamMessageStore.java
deleted file mode 100644
index d9277ff..0000000
--- a/store/src/main/java/org/apache/rocketmq/store/StreamMessageStore.java
+++ /dev/null
@@ -1,2573 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.store;
-
-import org.apache.rocketmq.common.BrokerConfig;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.ServiceThread;
-import org.apache.rocketmq.common.SystemClock;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.message.MessageAccessor;
-import org.apache.rocketmq.common.message.MessageConst;
-import org.apache.rocketmq.common.message.MessageDecoder;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.running.RunningStats;
-import org.apache.rocketmq.common.sysflag.MessageSysFlag;
-import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.store.config.BrokerRole;
-import org.apache.rocketmq.store.config.FlushDiskType;
-import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.store.config.StorePathConfigHelper;
-import org.apache.rocketmq.store.ha.HAService;
-import org.apache.rocketmq.store.index.IndexService;
-import org.apache.rocketmq.store.index.QueryOffsetResult;
-import org.apache.rocketmq.store.logfile.DefaultMappedFile;
-import org.apache.rocketmq.store.logfile.MappedFile;
-import org.apache.rocketmq.store.queue.BatchConsumeQueue;
-import org.apache.rocketmq.store.queue.CQType;
-import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
-import org.apache.rocketmq.store.queue.ConsumeQueueStore;
-import org.apache.rocketmq.store.queue.CqUnit;
-import org.apache.rocketmq.store.queue.ReferredIterator;
-import org.apache.rocketmq.store.schedule.ScheduleMessageService;
-import org.apache.rocketmq.store.stats.BrokerStatsManager;
-import org.apache.rocketmq.store.util.PerfCounter;
-import org.apache.rocketmq.store.util.QueueTypeUtils;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileLock;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static java.lang.String.format;
-
-public class StreamMessageStore implements MessageStore {
- private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
-
- public final PerfCounter.Ticks perfs = new PerfCounter.Ticks(log);
-
- private final MessageStoreConfig messageStoreConfig;
- // CommitLog
- private final CommitLog commitLog;
-
- private final ConsumeQueueStore consumeQueueStore;
-
- private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueueInterface>> consumeQueueTable;
-
- protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
-
- protected HashMap<String/* topic-queueid */, Long/* offset */> batchTopicQueueTable = new HashMap<String, Long>(1024);
-
- private final FlushConsumeQueueService flushConsumeQueueService;
-
- private final CleanCommitLogService cleanCommitLogService;
-
- private final CleanConsumeQueueService cleanConsumeQueueService;
-
- private final CorrectLogicOffsetService correctLogicOffsetService;
-
- private final IndexService indexService;
-
- private final AllocateMappedFileService allocateMappedFileService;
-
- private final ReputMessageService reputMessageService;
-
- private final HAService haService;
-
- private final ScheduleMessageService scheduleMessageService;
-
- private final StoreStatsService storeStatsService;
-
- private final TransientStorePool transientStorePool;
-
- private final RunningFlags runningFlags = new RunningFlags();
- private final SystemClock systemClock = new SystemClock();
-
- private final ScheduledExecutorService scheduledExecutorService =
- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread"));
- private final BrokerStatsManager brokerStatsManager;
- private final MessageArrivingListener messageArrivingListener;
- private final BrokerConfig brokerConfig;
-
- private volatile boolean shutdown = true;
-
- private StoreCheckpoint storeCheckpoint;
-
- private AtomicLong printTimes = new AtomicLong(0);
-
- private final LinkedList<CommitLogDispatcher> dispatcherList;
-
- private RandomAccessFile lockFile;
-
- private FileLock lock;
-
- boolean shutDownNormal = false;
-
- //polish for reput
- private ThreadPoolExecutor[] reputExecutors;
-
- private BlockingQueue<Runnable>[] reputQueues;
-
- private boolean isDispatchFromSenderThread;
-
- private static final Future EMPTY_FUTURE = new Future() {
- @Override
- public boolean cancel(final boolean mayInterruptIfRunning) {
- return false;
- }
-
- @Override
- public boolean isCancelled() {
- return false;
- }
-
- @Override
- public boolean isDone() {
- return true;
- }
-
- @Override
- public Object get() {
- return null;
- }
-
- @Override
- public Object get(final long timeout, final TimeUnit unit) {
- return null;
- }
- };
-
- // Max pull msg size
- private final static int MAX_PULL_MSG_SIZE = 128 * 1024 * 1024;
-
- public StreamMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
- final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
- this.messageArrivingListener = messageArrivingListener;
- this.brokerConfig = brokerConfig;
- this.messageStoreConfig = messageStoreConfig;
- this.brokerStatsManager = brokerStatsManager;
- this.allocateMappedFileService = new AllocateMappedFileService(this);
- if (messageStoreConfig.isEnableDLegerCommitLog()) {
- throw new RuntimeException("dleger is not supported in this message store.");
- }
- this.isDispatchFromSenderThread = messageStoreConfig.isDispatchFromSenderThread();
- this.commitLog = new CommitLog(this);
- this.consumeQueueTable = new ConcurrentHashMap<>(32);
- this.consumeQueueStore = new ConsumeQueueStore(this, this.messageStoreConfig, this.consumeQueueTable);
-
- this.flushConsumeQueueService = new FlushConsumeQueueService();
- this.cleanCommitLogService = new CleanCommitLogService();
- this.cleanConsumeQueueService = new CleanConsumeQueueService();
- this.correctLogicOffsetService = new CorrectLogicOffsetService();
- this.storeStatsService = new StoreStatsService();
- this.indexService = new IndexService(this);
- if (!messageStoreConfig.isEnableDLegerCommitLog()) {
- this.haService = new HAService(this);
- } else {
- this.haService = null;
- }
- if (isDispatchFromSenderThread) {
- this.reputMessageService = new SyncReputMessageService();
- } else {
- this.reputMessageService = new ReputMessageService();
- }
-
- this.scheduleMessageService = new ScheduleMessageService(this);
-
- this.transientStorePool = new TransientStorePool(messageStoreConfig);
-
- if (messageStoreConfig.isTransientStorePoolEnable()) {
- this.transientStorePool.init();
- }
-
- this.allocateMappedFileService.start();
-
- this.indexService.start();
-
- this.dispatcherList = new LinkedList<>();
- this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
- this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
-
- File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
- DefaultMappedFile.ensureDirOK(file.getParent());
- lockFile = new RandomAccessFile(file, "rw");
- initAsyncReputThreads(messageStoreConfig.getDispatchCqThreads(), messageStoreConfig.getDispatchCqCacheNum());
- }
-
- /**
- * @throws IOException
- */
- @Override
- public boolean load() {
- boolean result = true;
-
- try {
- long start = System.currentTimeMillis();
- boolean lastExitOK = !this.isTempFileExist();
- log.info("last shutdown {}, root dir: {}", lastExitOK ? "normally" : "abnormally", messageStoreConfig.getStorePathRootDir());
-
- if (null != scheduleMessageService) {
- result = result && this.scheduleMessageService.load();
- }
-
- // load Commit Log
- result = result && this.commitLog.load();
-
- // load Batch Consume Queue
- result = result && this.loadBatchConsumeQueue();
-
- if (result) {
- this.storeCheckpoint =
- new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
-
- this.indexService.load(lastExitOK);
-
- this.recover(lastExitOK);
-
- log.info("load over, and the max phy offset = {} cost = {}", this.getMaxPhyOffset(), System.currentTimeMillis() - start);
- }
- } catch (Exception e) {
- log.error("load exception", e);
- result = false;
- }
-
- if (!result) {
- this.allocateMappedFileService.shutdown();
- }
-
- return result;
- }
-
- /**
- * @throws Exception
- */
- @Override
- public void start() throws Exception {
-
- lock = lockFile.getChannel().tryLock(0, 1, false);
- if (lock == null || lock.isShared() || !lock.isValid()) {
- throw new RuntimeException("Lock failed,MQ already started");
- }
-
- lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
- lockFile.getChannel().force(true);
-
- if (this.getMessageStoreConfig().isDuplicationEnable()) {
- this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
- } else {
- this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
- }
- this.reputMessageService.start();
-
- if (!messageStoreConfig.isEnableDLegerCommitLog()) {
- this.haService.start();
- this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
- }
-
- this.flushConsumeQueueService.start();
- this.commitLog.start();
- this.storeStatsService.start();
-
- this.createTempFile();
- this.addScheduleTask();
- this.perfs.start();
- this.shutdown = false;
- }
-
- @Override
- public void shutdown() {
- if (!this.shutdown) {
- this.shutdown = true;
-
- this.scheduledExecutorService.shutdown();
-
- try {
-
- Thread.sleep(1000 * 3);
- } catch (InterruptedException e) {
- log.error("shutdown Exception, ", e);
- }
-
- if (this.scheduleMessageService != null) {
- this.scheduleMessageService.shutdown();
- }
- if (this.haService != null) {
- this.haService.shutdown();
- }
-
- this.storeStatsService.shutdown();
- this.indexService.shutdown();
- this.commitLog.shutdown();
- this.reputMessageService.shutdown();
- this.flushConsumeQueueService.shutdown();
- this.allocateMappedFileService.shutdown();
- this.storeCheckpoint.flush();
- this.storeCheckpoint.shutdown();
-
- this.perfs.shutdown();
-
- if (this.runningFlags.isWriteable() && dispatchBehindBytes() == 0) {
- this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
- shutDownNormal = true;
- } else {
- log.warn("the store may be wrong, so shutdown abnormally, and keep abort file. writable: {}, dispatchBehindBytes: {}, abort file: {}",
- this.runningFlags.isWriteable(), dispatchBehindBytes(), StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
- }
- }
-
- this.transientStorePool.destroy();
-
- if (lockFile != null && lock != null) {
- try {
- lock.release();
- lockFile.close();
- } catch (IOException e) {
- }
- }
- }
-
- @Override
- public void destroy() {
- this.destroyLogics();
- this.commitLog.destroy();
- this.indexService.destroy();
- this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
- this.deleteFile(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
- }
-
- @Override
- public void destroyLogics() {
- for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
- for (ConsumeQueueInterface logic : maps.values()) {
- this.consumeQueueStore.destroy(logic);
- }
- }
- }
-
- @Override
- public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
- if (this.shutdown) {
- log.warn("message store has shutdown, so putMessage is forbidden");
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
- }
-
- if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
- long value = this.printTimes.getAndIncrement();
- if ((value % 50000) == 0) {
- log.warn("message store is slave mode, so putMessage is forbidden ");
- }
-
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
- }
-
- if (msg.getProperties().containsKey(MessageConst.PROPERTY_INNER_NUM)
- && !MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) {
- log.warn("[BUG]The message had property {} but is not an inner batch", MessageConst.PROPERTY_INNER_NUM);
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
- }
-
- if (MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) {
- CQType cqType = QueueTypeUtils.getCQType(this);
-
- if (!CQType.BatchCQ.equals(cqType)) {
- log.warn("[BUG]The message is an inner batch but cq type is not batch consume queue");
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
- }
- }
-
- if (!this.runningFlags.isWriteable()) {
- long value = this.printTimes.getAndIncrement();
- if ((value % 50000) == 0) {
- log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
- }
-
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
- } else {
- this.printTimes.set(0);
- }
-
- int topicLen = msg.getTopic().length();
- if (topicLen > this.messageStoreConfig.getMaxTopicLength()) {
- log.warn("putMessage message topic[{}] length too long {}", msg.getTopic(), topicLen);
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
- }
-
- if (topicLen > Byte.MAX_VALUE) {
- log.warn("putMessage message topic[{}] length too long {}, but it is not supported by broker",
- msg.getTopic(), topicLen);
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
- }
-
- if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
- log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null));
- }
-
- if (this.isOSPageCacheBusy()) {
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null));
- }
-
- long beginTime = this.getSystemClock().now();
- perfs.startTick("PUT_MESSAGE_TIME_MS");
- CompletableFuture<PutMessageResult> result = this.commitLog.asyncPutMessage(msg);
- perfs.endTick("PUT_MESSAGE_TIME_MS");
-
- long eclipseTime = this.getSystemClock().now() - beginTime;
- if (eclipseTime > 500) {
- log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
- }
-
- return result;
- }
-
- @Override
- public PutMessageResult putMessage(MessageExtBrokerInner msg) {
- CompletableFuture<PutMessageResult> future = asyncPutMessage(msg);
- try {
- return future.get(3, TimeUnit.SECONDS);
- } catch (Throwable t) {
- log.error("Get async put result failed", t);
- return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
- }
- }
-
- @Override
- public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
- CompletableFuture<PutMessageResult> future = asyncPutMessages(messageExtBatch);
- try {
- return future.get(3, TimeUnit.SECONDS);
- } catch (Throwable t) {
- log.error("Get async put result failed", t);
- return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
- }
- }
-
- @Override
- public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
- if (this.shutdown) {
- log.warn("StreamMessageStore has shutdown, so putMessages is forbidden");
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
- }
-
- if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
- long value = this.printTimes.getAndIncrement();
- if ((value % 50000) == 0) {
- log.warn("StreamMessageStore is in slave mode, so putMessages is forbidden ");
- }
-
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
- }
-
- if (!this.runningFlags.isWriteable()) {
- long value = this.printTimes.getAndIncrement();
- if ((value % 50000) == 0) {
- log.warn("StreamMessageStore is not writable, so putMessages is forbidden " + this.runningFlags.getFlagBits());
- }
-
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
- } else {
- this.printTimes.set(0);
- }
-
- int topicLen = messageExtBatch.getTopic().length();
- if (topicLen > this.messageStoreConfig.getMaxTopicLength()) {
- log.warn("putMessage batch message topic[{}] length too long {}", messageExtBatch.getTopic(), topicLen);
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
- }
-
- if (topicLen > Byte.MAX_VALUE) {
- log.warn("putMessage batch message topic[{}] length too long {}, but it is not supported by broker",
- messageExtBatch.getTopic(), topicLen);
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
- }
-
- if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) {
- log.warn("PutMessages body length too long " + messageExtBatch.getBody().length);
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
- }
-
- if (this.isOSPageCacheBusy()) {
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null));
- }
-
- long beginTime = this.getSystemClock().now();
- CompletableFuture<PutMessageResult> result = this.commitLog.asyncPutMessages(messageExtBatch);
-
- long eclipseTime = this.getSystemClock().now() - beginTime;
- if (eclipseTime > 500) {
- log.warn("not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, messageExtBatch.getBody().length);
- }
- this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
-
- return result;
- }
-
- @Override
- public boolean isOSPageCacheBusy() {
- long begin = this.getCommitLog().getBeginTimeInLock();
- long diff = this.systemClock.now() - begin;
-
- return diff < 10000000
- && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
- }
-
- @Override
- public long lockTimeMills() {
- return this.commitLog.lockTimeMills();
- }
-
- @Override
- public SystemClock getSystemClock() {
- return systemClock;
- }
-
- @Override
- public CommitLog getCommitLog() {
- return commitLog;
- }
-
- public boolean isDispatchFromSenderThread() {
- return isDispatchFromSenderThread;
- }
-
- @Override
- public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
- final int maxMsgNums,
- final MessageFilter messageFilter) {
- return getMessage(group, topic, queueId, offset, maxMsgNums, MAX_PULL_MSG_SIZE, messageFilter);
- }
-
- @Override
- public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
- final int maxMsgNums,
- final int maxTotalMsgSize,
- final MessageFilter messageFilter) {
- if (this.shutdown) {
- log.warn("message store has shutdown, so getMessage is forbidden");
- return null;
- }
-
- if (!this.runningFlags.isReadable()) {
- log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
- return null;
- }
-
- long beginTime = this.getSystemClock().now();
-
- GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
- long nextBeginOffset = offset;
- long minOffset = 0;
- long maxOffset = 0;
-
- GetMessageResult getResult = new GetMessageResult();
-
- final long maxOffsetPy = this.commitLog.getMaxOffset();
-
- ConsumeQueueInterface consumeQueue = getConsumeQueue(topic, queueId);
- if (consumeQueue != null) {
- minOffset = consumeQueue.getMinOffsetInQueue();
- maxOffset = consumeQueue.getMaxOffsetInQueue();
-
- if (maxOffset == 0) {
- status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
- nextBeginOffset = nextOffsetCorrection(offset, 0);
- } else if (offset < minOffset) {
- status = GetMessageStatus.OFFSET_TOO_SMALL;
- nextBeginOffset = nextOffsetCorrection(offset, minOffset);
- } else if (offset == maxOffset) {
- status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
- nextBeginOffset = nextOffsetCorrection(offset, offset);
- } else if (offset > maxOffset) {
- status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
- if (0 == minOffset) {
- nextBeginOffset = nextOffsetCorrection(offset, minOffset);
- } else {
- nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
- }
- } else {
- final int maxFilterMessageCount = Math.max(messageStoreConfig.getPullBatchMaxMessageCount(), maxMsgNums);
- final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
-
- long maxPullSize = Math.max(maxTotalMsgSize, 100);
- if (maxPullSize > MAX_PULL_MSG_SIZE) {
- log.warn("The max pull size is too large maxPullSize={} topic={} queueId={}", maxPullSize, topic, queueId);
- maxPullSize = MAX_PULL_MSG_SIZE;
- }
- status = GetMessageStatus.NO_MATCHED_MESSAGE;
- long maxPhyOffsetPulling = 0;
- int cqFileNum = 0;
-
- while (getResult.getBufferTotalSize() <= 0
- && nextBeginOffset < maxOffset
- && cqFileNum++ < this.messageStoreConfig.getTravelCqFileNumWhenGetMessage()) {
- ReferredIterator<CqUnit> bufferConsumeQueue = consumeQueue.iterateFrom(nextBeginOffset);
-
- if (bufferConsumeQueue == null) {
- status = GetMessageStatus.OFFSET_FOUND_NULL;
- nextBeginOffset = nextOffsetCorrection(nextBeginOffset, this.consumeQueueStore.rollNextFile(consumeQueue, nextBeginOffset));
- log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
- + maxOffset + ", but access logic queue failed. Correct nextBeginOffset to " + nextBeginOffset);
- break;
- }
-
- try {
- long nextPhyFileStartOffset = Long.MIN_VALUE;
- while (bufferConsumeQueue.hasNext()
- && nextBeginOffset < maxOffset) {
- CqUnit cqUnit = bufferConsumeQueue.next();
- long offsetPy = cqUnit.getPos();
- int sizePy = cqUnit.getSize();
-
- boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
-
- if (cqUnit.getQueueOffset() - offset > maxFilterMessageCount) {
- break;
- }
-
- if (this.isTheBatchFull(sizePy, cqUnit.getBatchNum(), maxMsgNums, maxPullSize, getResult.getBufferTotalSize(), getResult.getMessageCount(),
- isInDisk)) {
- break;
- }
-
- if (getResult.getBufferTotalSize() >= maxPullSize) {
- break;
- }
-
- maxPhyOffsetPulling = offsetPy;
-
- //Be careful, here should before the isTheBatchFull
- nextBeginOffset = cqUnit.getQueueOffset() + cqUnit.getBatchNum();
-
- if (nextPhyFileStartOffset != Long.MIN_VALUE) {
- if (offsetPy < nextPhyFileStartOffset) {
- continue;
- }
- }
-
- if (messageFilter != null
- && !messageFilter.isMatchedByConsumeQueue(cqUnit.getValidTagsCodeAsLong(), cqUnit.getCqExtUnit())) {
- if (getResult.getBufferTotalSize() == 0) {
- status = GetMessageStatus.NO_MATCHED_MESSAGE;
- }
-
- continue;
- }
-
- SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
- if (null == selectResult) {
- if (getResult.getBufferTotalSize() == 0) {
- status = GetMessageStatus.MESSAGE_WAS_REMOVING;
- }
-
- nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
- continue;
- }
-
- if (messageFilter != null
- && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
- if (getResult.getBufferTotalSize() == 0) {
- status = GetMessageStatus.NO_MATCHED_MESSAGE;
- }
- // release...
- selectResult.release();
- continue;
- }
-
- this.storeStatsService.getGetMessageTransferedMsgCount().add(1);
- getResult.addMessage(selectResult, cqUnit.getQueueOffset(), cqUnit.getBatchNum());
- status = GetMessageStatus.FOUND;
- nextPhyFileStartOffset = Long.MIN_VALUE;
- }
- } finally {
- bufferConsumeQueue.release();
- }
- }
-
- if (diskFallRecorded) {
- long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
- brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
- }
-
- long diff = maxOffsetPy - maxPhyOffsetPulling;
- long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
- * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
- getResult.setSuggestPullingFromSlave(diff > memory);
- }
- } else {
- status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
- nextBeginOffset = nextOffsetCorrection(offset, 0);
- }
-
- if (GetMessageStatus.FOUND == status) {
- this.storeStatsService.getGetMessageTimesTotalFound().add(1);
- } else {
- this.storeStatsService.getGetMessageTimesTotalMiss().add(1);
- }
- long elapsedTime = this.getSystemClock().now() - beginTime;
- this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime);
-
- getResult.setStatus(status);
- getResult.setNextBeginOffset(nextBeginOffset);
- getResult.setMaxOffset(maxOffset);
- getResult.setMinOffset(minOffset);
- return getResult;
- }
-
- @Override
- public long getMaxOffsetInQueue(String topic, int queueId) {
- ConsumeQueueInterface logic = this.getConsumeQueue(topic, queueId);
- if (logic != null) {
- return logic.getMaxOffsetInQueue();
- }
-
- return 0;
- }
-
- @Override
- public long getMaxOffsetInQueue(String topic, int queueId, boolean committed) {
- if (committed) {
- ConsumeQueueInterface logic = this.getConsumeQueue(topic, queueId);
- if (logic != null) {
- return logic.getMaxOffsetInQueue();
- }
- } else {
- Long offset = this.batchTopicQueueTable.get(topic + "-" + queueId);
- if (offset != null) {
- return offset;
- }
- }
-
- return 0;
- }
-
- @Override
- public long getMinOffsetInQueue(String topic, int queueId) {
- ConsumeQueueInterface logic = this.getConsumeQueue(topic, queueId);
- if (logic != null) {
- return logic.getMinOffsetInQueue();
- }
-
- return -1;
- }
-
- @Override
- public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQueueOffset) {
- ConsumeQueueInterface consumeQueue = getConsumeQueue(topic, queueId);
- if (consumeQueue != null) {
-
- ReferredIterator<CqUnit> bufferConsumeQueue = consumeQueue.iterateFrom(consumeQueueOffset);
- if (bufferConsumeQueue != null) {
- try {
- if (bufferConsumeQueue.hasNext()) {
- long offsetPy = bufferConsumeQueue.next().getPos();
- return offsetPy;
- }
- } finally {
- bufferConsumeQueue.release();
- }
- }
- }
-
- return 0;
- }
-
- @Override
- public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {
- ConsumeQueueInterface logic = getConsumeQueue(topic, queueId);
- if (logic != null) {
- long resultOffset = logic.getOffsetInQueueByTime(timestamp);
- // -1 means no msg found.
- if (resultOffset == -1) {
- return -1;
- }
- // Make sure the result offset should in valid range.
- resultOffset = Math.max(resultOffset, logic.getMinOffsetInQueue());
- resultOffset = Math.min(resultOffset, logic.getMaxOffsetInQueue());
- return resultOffset;
- }
-
- // logic is null means there is no message in this queue, return -1.
- return -1;
- }
-
- @Override
- public MessageExt lookMessageByOffset(long commitLogOffset) {
- SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4);
- if (null != sbr) {
- try {
- // 1 TOTALSIZE
- int size = sbr.getByteBuffer().getInt();
- return lookMessageByOffset(commitLogOffset, size);
- } finally {
- sbr.release();
- }
- }
-
- return null;
- }
-
- @Override
- public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) {
- SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4);
- if (null != sbr) {
- try {
- // 1 TOTALSIZE
- int size = sbr.getByteBuffer().getInt();
- return this.commitLog.getMessage(commitLogOffset, size);
- } finally {
- sbr.release();
- }
- }
-
- return null;
- }
-
- @Override
- public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset, int msgSize) {
- return this.commitLog.getMessage(commitLogOffset, msgSize);
- }
-
- @Override
- public String getRunningDataInfo() {
- return this.storeStatsService.toString();
- }
-
- private String getStorePathPhysic() {
- String storePathPhysic = StreamMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
- return storePathPhysic;
- }
-
- @Override
- public HashMap<String, String> getRuntimeInfo() {
- HashMap<String, String> result = this.storeStatsService.getRuntimeInfo();
-
- {
- String storePathPhysic = this.getMessageStoreConfig().getStorePathCommitLog();
- double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
- result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio));
-
- }
-
- {
-
- String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir());
- double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
- result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio));
- }
-
- {
- if (this.scheduleMessageService != null) {
- this.scheduleMessageService.buildRunningStats(result);
- }
- }
-
- result.put(RunningStats.commitLogMinOffset.name(), String.valueOf(this.getMinPhyOffset()));
- result.put(RunningStats.commitLogMaxOffset.name(), String.valueOf(this.getMaxPhyOffset()));
-
- return result;
- }
-
- @Override
- public long getMaxPhyOffset() {
- return this.commitLog.getMaxOffset();
- }
-
- @Override
- public long getMinPhyOffset() {
- return this.commitLog.getMinOffset();
- }
-
- @Override
- public long getEarliestMessageTime(String topic, int queueId) {
- ConsumeQueueInterface logicQueue = this.getConsumeQueue(topic, queueId);
- if (logicQueue != null) {
- return getStoreTime(logicQueue.getEarliestUnit());
- }
-
- return -1;
- }
-
- private long getStoreTime(CqUnit result) {
- if (result != null) {
- try {
- final long phyOffset = result.getPos();
- final int size = result.getSize();
- long storeTime = this.getCommitLog().pickupStoreTimestamp(phyOffset, size);
- return storeTime;
- } catch (Exception e) {
- }
- }
- return -1;
- }
-
- @Override
- public long getEarliestMessageTime() {
- final long minPhyOffset = this.getMinPhyOffset();
- final int size = this.messageStoreConfig.getMaxMessageSize() * 2;
- return this.getCommitLog().pickupStoreTimestamp(minPhyOffset, size);
- }
-
- @Override
- public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset) {
- ConsumeQueueInterface logicQueue = this.getConsumeQueue(topic, queueId);
- if (logicQueue != null) {
- return getStoreTime(logicQueue.get(consumeQueueOffset));
- }
-
- return -1;
- }
-
- @Override
- public long getMessageTotalInQueue(String topic, int queueId) {
- ConsumeQueueInterface logicQueue = this.getConsumeQueue(topic, queueId);
- if (logicQueue != null) {
- return logicQueue.getMessageTotalInQueue();
- }
-
- return -1;
- }
-
- @Override
- public SelectMappedBufferResult getCommitLogData(final long offset) {
- if (this.shutdown) {
- log.warn("message store has shutdown, so getPhyQueueData is forbidden");
- return null;
- }
-
- return this.commitLog.getData(offset);
- }
-
- @Override
- public boolean appendToCommitLog(long startOffset, byte[] data, int dataStart, int dataLength) {
- if (this.shutdown) {
- log.warn("message store has shutdown, so appendToPhyQueue is forbidden");
- return false;
- }
-
- boolean result = this.commitLog.appendData(startOffset, data, dataStart, dataLength);
- if (result) {
- this.reputMessageService.wakeup();
- } else {
- log.error("appendToPhyQueue failed " + startOffset + " " + data.length);
- }
-
- return result;
- }
-
- @Override
- public void executeDeleteFilesManually() {
- this.cleanCommitLogService.excuteDeleteFilesManualy();
- }
-
- @Override
- public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {
- QueryMessageResult queryMessageResult = new QueryMessageResult();
-
- long lastQueryMsgTime = end;
-
- for (int i = 0; i < 3; i++) {
- QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);
- if (queryOffsetResult.getPhyOffsets().isEmpty()) {
- break;
- }
-
- Collections.sort(queryOffsetResult.getPhyOffsets());
-
- queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset());
- queryMessageResult.setIndexLastUpdateTimestamp(queryOffsetResult.getIndexLastUpdateTimestamp());
-
- for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) {
- long offset = queryOffsetResult.getPhyOffsets().get(m);
-
- try {
-
- boolean match = true;
- MessageExt msg = this.lookMessageByOffset(offset);
- if (0 == m) {
- lastQueryMsgTime = msg.getStoreTimestamp();
- }
-
-// String[] keyArray = msg.getKeys().split(MessageConst.KEY_SEPARATOR);
-// if (topic.equals(msg.getTopic())) {
-// for (String k : keyArray) {
-// if (k.equals(key)) {
-// match = true;
-// break;
-// }
-// }
-// }
-
- if (match) {
- SelectMappedBufferResult result = this.commitLog.getData(offset, false);
- if (result != null) {
- int size = result.getByteBuffer().getInt(0);
- result.getByteBuffer().limit(size);
- result.setSize(size);
- queryMessageResult.addMessage(result);
- }
- } else {
- log.warn("queryMessage hash duplicate, {} {}", topic, key);
- }
- } catch (Exception e) {
- log.error("queryMessage exception", e);
- }
- }
-
- if (queryMessageResult.getBufferTotalSize() > 0) {
- break;
- }
-
- if (lastQueryMsgTime < begin) {
- break;
- }
- }
-
- return queryMessageResult;
- }
-
- @Override
- public void updateHaMasterAddress(String newAddr) {
- this.haService.updateMasterAddress(newAddr);
- }
-
- @Override
- public long slaveFallBehindMuch() {
- return this.commitLog.getMaxOffset() - this.haService.getPush2SlaveMaxOffset().get();
- }
-
- @Override
- public long now() {
- return this.systemClock.now();
- }
-
- @Override
- public int cleanUnusedTopic(Set<String> topics) {
- Iterator<Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>> it = consumeQueueTable.entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> next = it.next();
- String topic = next.getKey();
-
- if (!topics.contains(topic) && !topic.equals(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC)) {
- ConcurrentMap<Integer, ConsumeQueueInterface> queueTable = next.getValue();
- for (ConsumeQueueInterface cq : queueTable.values()) {
- this.consumeQueueStore.destroy(cq);
- log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned",
- cq.getTopic(),
- cq.getQueueId()
- );
-
- this.commitLog.removeQueueFromTopicQueueTable(cq.getTopic(), cq.getQueueId());
- }
- it.remove();
- log.info("cleanUnusedTopic: {},topic consumeQueue destroyed", topic);
- }
- }
- return 0;
- }
-
- @Override
- public void cleanExpiredConsumerQueue() {
- long minCommitLogOffset = this.commitLog.getMinOffset();
-
- Iterator<Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>> it = this.consumeQueueTable.entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> next = it.next();
- String topic = next.getKey();
- if (!topic.equals(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC)) {
- ConcurrentMap<Integer, ConsumeQueueInterface> queueTable = next.getValue();
- Iterator<Map.Entry<Integer, ConsumeQueueInterface>> itQT = queueTable.entrySet().iterator();
- while (itQT.hasNext()) {
- Map.Entry<Integer, ConsumeQueueInterface> nextQT = itQT.next();
- long maxCLOffsetInConsumeQueue = nextQT.getValue().getMaxPhysicOffset();
-
- if (maxCLOffsetInConsumeQueue == -1) {
- log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.",
- nextQT.getValue().getTopic(),
- nextQT.getValue().getQueueId(),
- nextQT.getValue().getMaxPhysicOffset(),
- nextQT.getValue().getMinLogicOffset());
- } else if (maxCLOffsetInConsumeQueue < minCommitLogOffset) {
- log.info(
- "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}",
- topic,
- nextQT.getKey(),
- minCommitLogOffset,
- maxCLOffsetInConsumeQueue);
-
- this.commitLog.removeQueueFromTopicQueueTable(nextQT.getValue().getTopic(),
- nextQT.getValue().getQueueId());
-
- this.consumeQueueStore.destroy(nextQT.getValue());
- itQT.remove();
- }
- }
-
- if (queueTable.isEmpty()) {
- log.info("cleanExpiredConsumerQueue: {},topic destroyed", topic);
- it.remove();
- }
- }
- }
- }
-
- public double getDiskSpaceWarningLevelRatio() {
- return cleanCommitLogService.getDiskSpaceWarningLevelRatio();
- }
-
- @Override
- public boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset) {
-
- final long maxOffsetPy = this.commitLog.getMaxOffset();
-
- ConsumeQueueInterface consumeQueue = getConsumeQueue(topic, queueId);
- if (consumeQueue != null) {
- CqUnit cqUnit = consumeQueue.get(consumeOffset);
-
- if (cqUnit != null) {
- long offsetPy = cqUnit.getPos();
- return checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
- } else {
- return false;
- }
- }
- return false;
- }
-
- @Override
- public long dispatchBehindBytes() {
- return this.reputMessageService.behind();
- }
-
- @Override
- public long flush() {
- return this.commitLog.flush();
- }
-
- @Override
- public boolean resetWriteOffset(long phyOffset) {
- return this.commitLog.resetOffset(phyOffset);
- }
-
- @Override
- public long getConfirmOffset() {
- return this.commitLog.getConfirmOffset();
- }
-
- @Override
- public void setConfirmOffset(long phyOffset) {
- this.commitLog.setConfirmOffset(phyOffset);
- }
-
- @Override
- public MessageExt lookMessageByOffset(long commitLogOffset, int size) {
- SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, size);
- if (null != sbr) {
- try {
- return MessageDecoder.decode(sbr.getByteBuffer(), true, false);
- } finally {
- sbr.release();
- }
- }
-
- return null;
- }
-
- private long nextOffsetCorrection(long oldOffset, long newOffset) {
- long nextOffset = oldOffset;
- if (this.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE || this.getMessageStoreConfig().isOffsetCheckInSlave()) {
- nextOffset = newOffset;
- }
- return nextOffset;
- }
-
- private boolean checkInDiskByCommitOffset(long offsetPy, long maxOffsetPy) {
- long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
- return (maxOffsetPy - offsetPy) > memory;
- }
-
- private boolean isTheBatchFull(int sizePy, int unitBatchNum, int maxMsgNums, long maxMsgSize, int bufferTotal,
- int messageTotal, boolean isInDisk) {
-
- //At least has one message(batch)
- if (0 == bufferTotal || 0 == messageTotal) {
- return false;
- }
-
- if (messageTotal + unitBatchNum > maxMsgNums) {
- return true;
- }
-
- if (bufferTotal + sizePy > maxMsgSize) {
- return true;
- }
-
- if (isInDisk) {
- if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) {
- return true;
- }
- if (messageTotal + unitBatchNum > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk()) {
- return true;
- }
- } else {
- if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) {
- return true;
- }
-
- if (messageTotal + unitBatchNum > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory()) {
- return true;
- }
- }
-
- return false;
- }
-
- private void deleteFile(final String fileName) {
- File file = new File(fileName);
- boolean result = file.delete();
- log.info(fileName + (result ? " delete OK" : " delete Failed"));
- }
-
- /**
- * @throws IOException
- */
- private void createTempFile() throws IOException {
- String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
- File file = new File(fileName);
- DefaultMappedFile.ensureDirOK(file.getParent());
- boolean result = file.createNewFile();
- log.info(fileName + (result ? " create OK" : " already exists"));
- }
-
- private void addScheduleTask() {
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- StreamMessageStore.this.cleanFilesPeriodically();
- }
- }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- StreamMessageStore.this.checkSelf();
- }
- }, 1, 10, TimeUnit.MINUTES);
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- if (!getMessageStoreConfig().isMappedFileSwapEnable()) {
- log.warn("Swap is not enabled.");
- return ;
- }
- StreamMessageStore.this.commitLog.swapMap(getMessageStoreConfig().getCommitLogSwapMapReserveFileNum(),
- getMessageStoreConfig().getCommitLogForceSwapMapInterval(), getMessageStoreConfig().getCommitLogSwapMapInterval());
- for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : StreamMessageStore.this.consumeQueueTable.values()) {
- for (ConsumeQueueInterface logic : maps.values()) {
- StreamMessageStore.this.consumeQueueStore.swapMap(logic, getMessageStoreConfig().getLogicQueueSwapMapReserveFileNum(),
- getMessageStoreConfig().getLogicQueueForceSwapMapInterval(), getMessageStoreConfig().getLogicQueueSwapMapInterval());
- }
- }
- } catch (Exception e) {
- log.error("swap map exception", e);
- }
- }
- }, 1, 5, TimeUnit.MINUTES);
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- StreamMessageStore.this.commitLog.cleanSwappedMap(getMessageStoreConfig().getCleanSwapedMapInterval());
- for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : StreamMessageStore.this.consumeQueueTable.values()) {
- for (ConsumeQueueInterface logic : maps.values()) {
- StreamMessageStore.this.consumeQueueStore.cleanSwappedMap(logic, getMessageStoreConfig().getCleanSwapedMapInterval());
- }
- }
- } catch (Exception e) {
- log.error("clean swap map exception", e);
- }
- }
- }, 1, 5, TimeUnit.MINUTES);
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- if (StreamMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) {
- try {
- if (StreamMessageStore.this.commitLog.getBeginTimeInLock() != 0) {
- long lockTime = System.currentTimeMillis() - StreamMessageStore.this.commitLog.getBeginTimeInLock();
- if (lockTime > 1000 && lockTime < 10000000) {
-
- String stack = UtilAll.jstack();
- final String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-"
- + StreamMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime;
- MixAll.string2FileNotSafe(stack, fileName);
- }
- }
- } catch (Exception e) {
- }
- }
- }
- }, 1, 1, TimeUnit.SECONDS);
-
- // this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- // @Override
- // public void run() {
- // StreamMessageStore.this.cleanExpiredConsumerQueue();
- // }
- // }, 1, 1, TimeUnit.HOURS);
- }
-
- private void cleanFilesPeriodically() {
- this.cleanCommitLogService.run();
- this.cleanConsumeQueueService.run();
- this.correctLogicOffsetService.run();
- }
-
- private void checkSelf() {
- this.commitLog.checkSelf();
-
- Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>> it = this.consumeQueueTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> next = it.next();
- Iterator<Entry<Integer, ConsumeQueueInterface>> itNext = next.getValue().entrySet().iterator();
- while (itNext.hasNext()) {
- Entry<Integer, ConsumeQueueInterface> cq = itNext.next();
- this.consumeQueueStore.checkSelf(cq.getValue());
- }
- }
- }
-
- private boolean isTempFileExist() {
- String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
- File file = new File(fileName);
- return file.exists();
- }
-
- protected boolean loadBatchConsumeQueue() {
- checkOtherConsumeQueue();
-
- File dirLogic = new File(StorePathConfigHelper.getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
- File[] fileTopicList = dirLogic.listFiles();
- if (fileTopicList != null) {
-
- for (File fileTopic : fileTopicList) {
- String topic = fileTopic.getName();
-
- File[] fileQueueIdList = fileTopic.listFiles();
- if (fileQueueIdList != null) {
- for (File fileQueueId : fileQueueIdList) {
- int queueId;
- try {
- queueId = Integer.parseInt(fileQueueId.getName());
- } catch (NumberFormatException e) {
- continue;
- }
- ConsumeQueueInterface logic = new BatchConsumeQueue(
- topic,
- queueId,
- StorePathConfigHelper.getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
- this.getMessageStoreConfig().getMapperFileSizeBatchConsumeQueue(),
- this);
- this.putConsumeQueue(topic, queueId, logic);
- if (!this.consumeQueueStore.load(logic)) {
- return false;
- }
- }
- }
- }
- }
-
- log.info("load logics queue all over, OK");
-
- return true;
- }
-
- private void checkOtherConsumeQueue() {
- File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
- if (dirLogic.exists()) {
- throw new RuntimeException(format("Consume queue directory: [%s] exist. Can not load batch consume queue while consume queue exists.",
- StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir())));
- }
- }
-
- private void recover(final boolean lastExitOK) {
- long recoverCqStart = System.currentTimeMillis();
- long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
- long recoverCqEnd = System.currentTimeMillis();
-
- if (lastExitOK) {
- this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
- } else {
- this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
- }
- long recoverClogEnd = System.currentTimeMillis();
- this.recoverTopicQueueTable();
- long recoverOffsetEnd = System.currentTimeMillis();
-
- log.info("Recover end total:{} recoverCq:{} recoverClog:{} recoverOffset:{}",
- recoverOffsetEnd - recoverCqStart, recoverCqEnd - recoverCqStart, recoverClogEnd - recoverCqEnd, recoverOffsetEnd - recoverClogEnd);
- }
-
- @Override
- public MessageStoreConfig getMessageStoreConfig() {
- return messageStoreConfig;
- }
-
- @Override
- public TransientStorePool getTransientStorePool() {
- return transientStorePool;
- }
-
- private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueueInterface consumeQueue) {
- ConcurrentMap<Integer/* queueId */, ConsumeQueueInterface> map = this.consumeQueueTable.get(topic);
- if (null == map) {
- map = new ConcurrentHashMap<Integer/* queueId */, ConsumeQueueInterface>();
- map.put(queueId, consumeQueue);
- this.consumeQueueTable.put(topic, map);
- } else {
- map.put(queueId, consumeQueue);
- }
- }
-
- private long recoverConsumeQueue() {
- long maxPhysicOffset = -1;
- for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
- for (ConsumeQueueInterface logic : maps.values()) {
- this.consumeQueueStore.recover(logic);
- if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
- maxPhysicOffset = logic.getMaxPhysicOffset();
- }
- }
- }
-
- return maxPhysicOffset;
- }
-
- public void recoverTopicQueueTable() {
- HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
- long minPhyOffset = this.commitLog.getMinOffset();
- for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
- for (ConsumeQueueInterface logic : maps.values()) {
- String key = logic.getTopic() + "-" + logic.getQueueId();
- table.put(key, logic.getMaxOffsetInQueue());
- this.consumeQueueStore.correctMinOffset(logic, minPhyOffset);
- }
- }
-
- this.batchTopicQueueTable = table;
- }
-
- @Override
- public AllocateMappedFileService getAllocateMappedFileService() {
- return allocateMappedFileService;
- }
-
- @Override
- public void truncateDirtyLogicFiles(long phyOffset) {
- ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> tables = StreamMessageStore.this.consumeQueueTable;
-
- for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : tables.values()) {
- for (ConsumeQueueInterface logic : maps.values()) {
- this.consumeQueueStore.truncateDirtyLogicFiles(logic, phyOffset);
- }
- }
- }
-
- @Override
- public StoreStatsService getStoreStatsService() {
- return storeStatsService;
- }
-
- public ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> getConsumeQueueTable() {
- return consumeQueueTable;
- }
-
- @Override
- public StoreCheckpoint getStoreCheckpoint() {
- return storeCheckpoint;
- }
-
- @Override
- public HAService getHaService() {
- return haService;
- }
-
- @Override
- public void registerCleanFileHook(CleanFilesHook logicalQueueCleanHook) {
-
- }
-
- @Override
- public ScheduleMessageService getScheduleMessageService() {
- return scheduleMessageService;
- }
-
- @Override
- public RunningFlags getRunningFlags() {
- return runningFlags;
- }
-
- public void initAsyncReputThreads(int tsNum, int cacheNum) {
- if (tsNum <= 0) {
- tsNum = 1;
- }
- if (cacheNum < 512) {
- cacheNum = 512;
- }
- reputExecutors = new ThreadPoolExecutor[tsNum];
- reputQueues = new BlockingQueue[tsNum];
-
- for (int i = 0; i < tsNum; i++) {
- final int tmp = i;
- reputQueues[i] = new LinkedBlockingDeque<>(cacheNum);
- //Each executor can only have one thread, otherwise the cq index will get wrong
- reputExecutors[i] = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
- reputQueues[i],
- new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "MQDispatchThread-" + tmp);
- }
- });
- }
- for (ThreadPoolExecutor executorService: reputExecutors) {
- if (executorService.getMaximumPoolSize() != 1 ||
- executorService.getCorePoolSize() != 1) {
- throw new RuntimeException("The MQDispatchThreadPoll can only have one thread");
- }
- }
-
- }
-
- public Future doDispatch(final DispatchRequest request) {
- return doDispatch(request, false);
- }
-
- public Future doDispatch(final DispatchRequest request, boolean async) {
- Runnable task = new Runnable() {
- @Override
- public void run() {
- for (CommitLogDispatcher dispatcher : StreamMessageStore.this.dispatcherList) {
- dispatcher.dispatch(request);
- }
- }
- };
- if (!async) {
- task.run();
- return EMPTY_FUTURE;
- }
- int hash = Math.abs((request.getTopic() + request.getQueueId()).hashCode());
- int slot = hash % reputExecutors.length;
- try {
- return reputExecutors[slot].submit(task);
- } catch (RejectedExecutionException ignored) {
- int tryNum = 0;
- while (tryNum++ < Integer.MAX_VALUE) {
- try {
- Thread.sleep(1);
- } catch (Throwable ignored2) {
-
- }
- try {
- return reputExecutors[slot].submit(task);
- } catch (RejectedExecutionException e) {
- log.warn("DispatchReject topic:{} queue:{} pyOffset:{} tryNum:{}", request.getTopic(), request.getQueueId(), request.getCommitLogOffset(), tryNum, e);
- }
- }
- }
- return EMPTY_FUTURE;
- }
-
- public void syncProcessDispatchRequest(DispatchRequest request, boolean isRecover) throws InterruptedException {
- if (!isDispatchFromSenderThread) {
- log.error("addDispatchRequestQueue operation not supported while isCreateDispatchRequestAsync is true");
- } else {
- if (isRecover) {
- ((SyncReputMessageService) this.reputMessageService).processDispatchRequestForRecover(request);
- } else {
- ((SyncReputMessageService) this.reputMessageService).processDispatchRequest(request);
- }
- }
- }
-
- public boolean dispatched(long physicalOffset) {
- return reputMessageService.dispatched(physicalOffset);
- }
-
- public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
- ConsumeQueueInterface cq = this.findOrCreateConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
- this.consumeQueueStore.putMessagePositionInfoWrapper(cq, dispatchRequest);
- }
-
- private ConsumeQueueInterface findOrCreateConsumeQueue(String topic, int queueId) {
- return this.consumeQueueStore.findOrCreateConsumeQueue(topic, queueId);
- }
-
- @Override
- public BrokerStatsManager getBrokerStatsManager() {
- return brokerStatsManager;
- }
-
- @Override
- public void handleScheduleMessageService(final BrokerRole brokerRole) {
- if (this.scheduleMessageService != null) {
- if (brokerRole == BrokerRole.SLAVE) {
- this.scheduleMessageService.shutdown();
- } else {
- this.scheduleMessageService.start();
- }
- }
-
- }
-
- public int remainTransientStoreBufferNumbs() {
- return this.transientStorePool.availableBufferNums();
- }
-
- @Override
- public boolean isTransientStorePoolDeficient() {
- return remainTransientStoreBufferNumbs() == 0;
- }
-
- @Override
- public LinkedList<CommitLogDispatcher> getDispatcherList() {
- return this.dispatcherList;
- }
-
- @Override
- public ConsumeQueueInterface getConsumeQueue(String topic, int queueId) {
- ConcurrentMap<Integer, ConsumeQueueInterface> map = consumeQueueTable.get(topic);
- if (map == null) {
- return null;
- }
- return map.get(queueId);
- }
-
- @Override
- public void unlockMappedFile(final MappedFile mappedFile) {
- this.scheduledExecutorService.schedule(new Runnable() {
- @Override
- public void run() {
- mappedFile.munlock();
- }
- }, 6, TimeUnit.SECONDS);
- }
-
- @Override
- public PerfCounter.Ticks getPerfCounter() {
- return perfs;
- }
-
- @Override
- public ConsumeQueueStore getQueueStore() {
- return consumeQueueStore;
- }
-
- @Override
- public boolean isSyncDiskFlush() {
- return FlushDiskType.SYNC_FLUSH == this.getMessageStoreConfig().getFlushDiskType();
- }
-
- @Override
- public boolean isSyncMaster() {
- return BrokerRole.SYNC_MASTER == this.getMessageStoreConfig().getBrokerRole();
- }
-
- @Override
- public void assignOffset(String topicQueueKey, MessageExtBrokerInner msg, short batchNum) {
- final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
-
- if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
- assignOffsetForCq(topicQueueKey, msg);
- assignOffsetForBcq(topicQueueKey, msg, batchNum);
- }
- }
-
- private void assignOffsetForCq(String topicQueueKey, MessageExtBrokerInner msg) {
- // not supported yet
- }
-
- private void assignOffsetForBcq(String topicQueueKey, MessageExtBrokerInner msg, short batchNum) {
- Long batchTopicOffset = this.batchTopicQueueTable.computeIfAbsent(topicQueueKey, k -> 0L);
- CQType cqType = QueueTypeUtils.getCQType(this);
- if (MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG) || CQType.BatchCQ.equals(cqType)) {
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_BASE, String.valueOf(batchTopicOffset));
- msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
- }
- msg.setQueueOffset(batchTopicOffset);
- this.batchTopicQueueTable.put(topicQueueKey, batchTopicOffset + batchNum);
- }
-
- @Override
- public void removeOffsetTable(String topicQueueKey) {
- this.topicQueueTable.remove(topicQueueKey);
- this.batchTopicQueueTable.remove(topicQueueKey);
- }
-
- @Override
- public void onCommitLogAppend(MessageExtBrokerInner msg, AppendMessageResult result, MappedFile commitLogFile) {
- DispatchRequest dispatchRequest;
- switch (result.getStatus()) {
- case PUT_OK:
- dispatchRequest = constructDispatchRequest(msg, result);
- onCommitLogDispatch(dispatchRequest, this.isDispatchFromSenderThread(), commitLogFile, false, false);
- break;
- case END_OF_FILE:
- dispatchRequest = new DispatchRequest(0, true);
- onCommitLogDispatch(dispatchRequest, this.isDispatchFromSenderThread(), commitLogFile, false, true);
- break;
- default:
- throw new RuntimeException("");
- }
- }
-
- @Override
- public void onCommitLogDispatch(DispatchRequest dispatchRequest, boolean doDispatch, MappedFile commitLogFile, boolean isRecover, boolean isFileEnd) {
- if (isFileEnd) {
- if (doDispatch) {
- long nextReputFromOffset = this.getCommitLog().rollNextFile(commitLogFile.getFileFromOffset());
- dispatchRequest.setNextReputFromOffset(nextReputFromOffset);
- syncDispatch(dispatchRequest, isRecover);
- }
- } else {
- if (doDispatch) {
- dispatchRequest.setNextReputFromOffset(dispatchRequest.getCommitLogOffset() + dispatchRequest.getMsgSize());
- syncDispatch(dispatchRequest, isRecover);
- }
- }
- }
-
- private DispatchRequest constructDispatchRequest(MessageExtBrokerInner msg, AppendMessageResult appendResult) {
- long tagsCode = 0;
- String keys = "";
- String uniqKey = null;
- int sysFlag = msg.getSysFlag();
- String topic = msg.getTopic();
- long storeTimestamp = msg.getStoreTimestamp();
- int queueId = msg.getQueueId();
- long preparedTransactionOffset = msg.getPreparedTransactionOffset();
- Map<String, String> propertiesMap = msg.getProperties();
-
- if (msg.getProperties() != null && msg.getProperties().size() > 0) {
-
- keys = propertiesMap.get(MessageConst.PROPERTY_KEYS);
-
- uniqKey = propertiesMap.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
-
- String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
- if (tags != null && tags.length() > 0) {
- tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);
- }
-
- // Timing message processing
- {
- String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
- if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {
- int delayLevel = Integer.parseInt(t);
-
- if (delayLevel > this.getScheduleMessageService().getMaxDelayLevel()) {
- delayLevel = this.getScheduleMessageService().getMaxDelayLevel();
- }
-
- if (delayLevel > 0) {
- tagsCode = this.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
- storeTimestamp);
- }
- }
- }
- }
-
- DispatchRequest dispatchRequest = new DispatchRequest(
- topic,
- queueId,
- appendResult.getWroteOffset(),
- appendResult.getWroteBytes(),
- tagsCode,
- storeTimestamp,
- appendResult.getLogicsOffset(),
- keys,
- uniqKey,
- sysFlag,
- preparedTransactionOffset,
- propertiesMap
- );
-
- if (null != propertiesMap && propertiesMap.containsKey(MessageConst.PROPERTY_INNER_NUM) && propertiesMap.containsKey(MessageConst.PROPERTY_INNER_BASE)) {
- dispatchRequest.setMsgBaseOffset(Long.parseLong(propertiesMap.get(MessageConst.PROPERTY_INNER_BASE)));
- dispatchRequest.setBatchSize(Short.parseShort(propertiesMap.get(MessageConst.PROPERTY_INNER_NUM)));
- }
- return dispatchRequest;
- }
-
- private void syncDispatch(DispatchRequest dispatchRequest, boolean isRecover) {
- try {
- this.syncProcessDispatchRequest(dispatchRequest, isRecover);
- } catch (InterruptedException e) {
- log.error("OnCommitlogAppend sync dispatch failed, addDispatchRequestQueue interrupted. DispatchRequest:{}", dispatchRequest);
- }
- }
-
- class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
-
- @Override
- public void dispatch(DispatchRequest dispatchRequest) {
- final int tranType = MessageSysFlag.getTransactionValue(dispatchRequest.getSysFlag());
- switch (tranType) {
- case MessageSysFlag.TRANSACTION_NOT_TYPE:
- case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
- StreamMessageStore.this.putMessagePositionInfo(dispatchRequest);
- if (BrokerRole.SLAVE != StreamMessageStore.this.getMessageStoreConfig().getBrokerRole()
- && StreamMessageStore.this.brokerConfig.isLongPollingEnable()) {
- StreamMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
- dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
- dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
- dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
- }
- if (StreamMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
- StreamMessageStore.this.storeStatsService
- .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
- StreamMessageStore.this.storeStatsService
- .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
- .add(dispatchRequest.getMsgSize());
- }
- break;
- case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
- case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
- break;
- }
- }
- }
-
- class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
-
- @Override
- public void dispatch(DispatchRequest request) {
- if (StreamMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
- StreamMessageStore.this.indexService.buildIndex(request);
- }
- }
- }
-
- class CleanCommitLogService {
-
- private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20;
- private final String diskSpaceWarningLevelRatio =
- System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "");
-
- private final String diskSpaceCleanForciblyRatio =
- System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "");
- private long lastRedeleteTimestamp = 0;
-
- private volatile int manualDeleteFileSeveralTimes = 0;
-
- private volatile boolean cleanImmediately = false;
-
- double getDiskSpaceWarningLevelRatio() {
- double finalDiskSpaceWarningLevelRatio;
- if ("".equals(diskSpaceWarningLevelRatio)) {
- finalDiskSpaceWarningLevelRatio = StreamMessageStore.this.getMessageStoreConfig().getDiskSpaceWarningLevelRatio() / 100.0;
- } else {
- finalDiskSpaceWarningLevelRatio = Double.parseDouble(diskSpaceWarningLevelRatio);
- }
-
- if (finalDiskSpaceWarningLevelRatio > 0.90) {
- finalDiskSpaceWarningLevelRatio = 0.90;
- }
- if (finalDiskSpaceWarningLevelRatio < 0.35) {
- finalDiskSpaceWarningLevelRatio = 0.35;
- }
-
- return finalDiskSpaceWarningLevelRatio;
- }
-
- double getDiskSpaceCleanForciblyRatio() {
- double finalDiskSpaceCleanForciblyRatio;
- if ("".equals(diskSpaceCleanForciblyRatio)) {
- finalDiskSpaceCleanForciblyRatio = StreamMessageStore.this.getMessageStoreConfig().getDiskSpaceCleanForciblyRatio() / 100.0;
- } else {
- finalDiskSpaceCleanForciblyRatio = Double.parseDouble(diskSpaceCleanForciblyRatio);
- }
-
- if (finalDiskSpaceCleanForciblyRatio > 0.85) {
- finalDiskSpaceCleanForciblyRatio = 0.85;
- }
- if (finalDiskSpaceCleanForciblyRatio < 0.30) {
- finalDiskSpaceCleanForciblyRatio = 0.30;
- }
-
- return finalDiskSpaceCleanForciblyRatio;
- }
-
- public void excuteDeleteFilesManualy() {
- this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES;
- StreamMessageStore.log.info("executeDeleteFilesManually was invoked");
- }
-
- public long run() {
- int deleteCount = 0;
- try {
- deleteCount = this.deleteExpiredFiles();
-
- this.redeleteHangedFile();
- } catch (Throwable e) {
- StreamMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
- }
- return deleteCount;
- }
-
- private int deleteExpiredFiles() {
- int deleteCount = 0;
- long fileReservedTime = StreamMessageStore.this.getMessageStoreConfig().getFileReservedTime();
- int deletePhysicFilesInterval = StreamMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
- int destroyMapedFileIntervalForcibly = StreamMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
- int maxBatchDeleteFilesNum = StreamMessageStore.this.getMessageStoreConfig().getMaxBatchDeleteFilesNum();
- if (maxBatchDeleteFilesNum < 10) {
- maxBatchDeleteFilesNum = 10;
- }
-
- boolean timeup = this.isTimeToDelete();
- boolean spacefull = this.isSpaceToDelete();
- boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
-
- boolean needDelete = timeup || spacefull || manualDelete;
-
- if (needDelete) {
-
- if (manualDelete)
- this.manualDeleteFileSeveralTimes--;
-
- boolean cleanAtOnce = StreamMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
-
- String storePathPhysic = StreamMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
- double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
- long totalSpace = UtilAll.getTotalSpace(storePathPhysic);
- double realRatio = (StreamMessageStore.this.commitLog.getMaxOffset() - StreamMessageStore.this.commitLog.getMinOffset()) / (totalSpace + 0.001);
-
- cleanAtOnce = cleanAtOnce && (realRatio > 0.3);
-
- log.info("begin to delete before {} hours file. timeup:{} spacefull:{} manualDeleteFileSeveralTimes:{} cleanAtOnce:{} maxBatchDeleteFilesNum:{} physicRatio:{}",
- fileReservedTime,
- timeup,
- spacefull,
- manualDeleteFileSeveralTimes,
- cleanAtOnce,
- maxBatchDeleteFilesNum,
- physicRatio);
-
- fileReservedTime *= 60 * 60 * 1000;
-
- deleteCount = StreamMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
- destroyMapedFileIntervalForcibly, cleanAtOnce);
- if (deleteCount > 0) {
- } else if (spacefull) {
- log.warn("disk space will be full soon, but delete file failed. timeup:{} manualDelete:{} cleanAtOnce:{} physicRatio:{}",
- timeup,
- manualDelete,
- cleanAtOnce,
- physicRatio);
- }
- }
- return deleteCount;
- }
-
- private void redeleteHangedFile() {
- int interval = StreamMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval();
- long currentTimestamp = System.currentTimeMillis();
- if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) {
- this.lastRedeleteTimestamp = currentTimestamp;
- int destroyMapedFileIntervalForcibly =
- StreamMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
- if (StreamMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) {
- }
- }
- }
-
- public String getServiceName() {
- return CleanCommitLogService.class.getSimpleName();
- }
-
- private boolean isTimeToDelete() {
- String when = StreamMessageStore.this.getMessageStoreConfig().getDeleteWhen();
- if (UtilAll.isItTimeToDo(when)) {
- StreamMessageStore.log.info("it's time to reclaim disk space, " + when);
- return true;
- }
-
- return false;
- }
-
- private boolean isSpaceToDelete() {
- double ratio = StreamMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
-
- cleanImmediately = false;
-
- {
- double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic());
- if (physicRatio > getDiskSpaceWarningLevelRatio()) {
- boolean diskok = StreamMessageStore.this.runningFlags.getAndMakeDiskFull();
- if (diskok) {
- StreamMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
- }
-
- cleanImmediately = true;
- } else if (physicRatio > getDiskSpaceCleanForciblyRatio()) {
- cleanImmediately = true;
- } else {
- boolean diskok = StreamMessageStore.this.runningFlags.getAndMakeDiskOK();
- if (!diskok) {
- StreamMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
- }
- }
-
- if (physicRatio < 0 || physicRatio > ratio) {
- StreamMessageStore.log.info("physic disk maybe full soon, so reclaim space, {}, cleanImmediately {}", physicRatio, cleanImmediately);
- return true;
- }
- }
-
- {
- String storePathLogics = StorePathConfigHelper
- .getStorePathConsumeQueue(StreamMessageStore.this.getMessageStoreConfig().getStorePathRootDir());
- double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
- if (logicsRatio > getDiskSpaceWarningLevelRatio()) {
- boolean diskok = StreamMessageStore.this.runningFlags.getAndMakeDiskFull();
- if (diskok) {
- StreamMessageStore.log.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full");
- }
-
- cleanImmediately = true;
- } else if (logicsRatio > getDiskSpaceCleanForciblyRatio()) {
- cleanImmediately = true;
- } else {
- boolean diskok = StreamMessageStore.this.runningFlags.getAndMakeDiskOK();
- if (!diskok) {
- StreamMessageStore.log.info("logics disk space OK " + logicsRatio + ", so mark disk ok");
- }
- }
-
- if (logicsRatio < 0 || logicsRatio > ratio) {
- StreamMessageStore.log.info("logics disk maybe full soon, so reclaim space, " + logicsRatio);
- return true;
- }
- }
-
- return false;
- }
-
- public int getManualDeleteFileSeveralTimes() {
- return manualDeleteFileSeveralTimes;
- }
-
- public void setManualDeleteFileSeveralTimes(int manualDeleteFileSeveralTimes) {
- this.manualDeleteFileSeveralTimes = manualDeleteFileSeveralTimes;
- }
- }
-
- class CleanConsumeQueueService {
- private long lastPhysicalMinOffset = 0;
-
- public void run() {
- try {
- this.deleteExpiredFiles();
- } catch (Throwable e) {
- StreamMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
- }
- }
-
- private void deleteExpiredFiles() {
- int deleteLogicsFilesInterval = StreamMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
-
- long minOffset = StreamMessageStore.this.commitLog.getMinOffset();
- if (minOffset > this.lastPhysicalMinOffset) {
- this.lastPhysicalMinOffset = minOffset;
-
- ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> tables = StreamMessageStore.this.consumeQueueTable;
-
- for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : tables.values()) {
- for (ConsumeQueueInterface logic : maps.values()) {
- int deleteCount = StreamMessageStore.this.consumeQueueStore.deleteExpiredFile(logic, minOffset);
-
- if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
- try {
- Thread.sleep(deleteLogicsFilesInterval);
- } catch (InterruptedException ignored) {
- }
- }
- }
- }
-
- StreamMessageStore.this.indexService.deleteExpiredFile(minOffset);
- }
- }
-
- public String getServiceName() {
- return CleanConsumeQueueService.class.getSimpleName();
- }
- }
-
- class CorrectLogicOffsetService {
-
- private long lastForceCorrectTime = -1L;
-
- public void run() {
- try {
- this.correctLogicMinOffset();
- } catch (Throwable e) {
- StreamMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
- }
- }
-
- private boolean needCorrect(ConsumeQueueInterface logic, long minPhyOffset, long lastForeCorrectTimeCurRun) {
- if (logic == null) {
- return false;
- }
- // If first exist and not available, it means first file may destroy failed, delete it.
- if (StreamMessageStore.this.consumeQueueStore.isFirstFileExist(logic) && !StreamMessageStore.this.consumeQueueStore.isFirstFileAvailable(logic)) {
- log.error("CorrectLogicOffsetService.needCorrect. first file not available, trigger correct." +
- " topic:{}, queue:{}, maxPhyOffset in queue:{}, minPhyOffset " +
- "in commit log:{}, minOffset in queue:{}, maxOffset in queue:{}, cqType:{}"
- , logic.getTopic(), logic.getQueueId(), logic.getMaxPhysicOffset()
- , minPhyOffset, logic.getMinOffsetInQueue(), logic.getMaxOffsetInQueue(), logic.getCQType());
- return true;
- }
-
- // logic.getMaxPhysicOffset() or minPhyOffset = -1
- // means there is no message in current queue, so no need to correct.
- if (logic.getMaxPhysicOffset() == -1 || minPhyOffset == -1) {
- return false;
- }
-
- if (logic.getMaxPhysicOffset() < minPhyOffset) {
- if (logic.getMinOffsetInQueue() < logic.getMaxOffsetInQueue()) {
- log.error("CorrectLogicOffsetService.needCorrect. logic max phy offset: {} is less than min phy offset: {}, " +
- "but min offset: {} is less than max offset: {}. topic:{}, queue:{}, cqType:{}."
- , logic.getMaxPhysicOffset(), minPhyOffset, logic.getMinOffsetInQueue()
- , logic.getMaxOffsetInQueue(), logic.getTopic(), logic.getQueueId(), logic.getCQType());
- return true;
- } else if (logic.getMinOffsetInQueue() == logic.getMaxOffsetInQueue()) {
- return false;
- } else {
- log.error("CorrectLogicOffsetService.needCorrect. It should not happen, logic max phy offset: {} is less than min phy offset: {}," +
- " but min offset: {} is larger than max offset: {}. topic:{}, queue:{}, cqType:{}"
- , logic.getMaxPhysicOffset(), minPhyOffset, logic.getMinOffsetInQueue()
- , logic.getMaxOffsetInQueue(), logic.getTopic(), logic.getQueueId(), logic.getCQType());
- return false;
- }
- }
- //the logic.getMaxPhysicOffset() >= minPhyOffset
- int forceCorrectInterval = StreamMessageStore.this.getMessageStoreConfig().getCorrectLogicMinOffsetForceInterval();
- if ((System.currentTimeMillis() - lastForeCorrectTimeCurRun) > forceCorrectInterval) {
- lastForceCorrectTime = System.currentTimeMillis();
- CqUnit cqUnit = logic.getEarliestUnit();
- if (cqUnit == null) {
- if (logic.getMinOffsetInQueue() == logic.getMaxOffsetInQueue()) {
- return false;
- } else {
- log.error("CorrectLogicOffsetService.needCorrect. cqUnit is null, logic max phy offset: {} is greater than min phy offset: {}, " +
- "but min offset: {} is not equal to max offset: {}. topic:{}, queue:{}, cqType:{}."
- , logic.getMaxPhysicOffset(), minPhyOffset, logic.getMinOffsetInQueue()
- , logic.getMaxOffsetInQueue(), logic.getTopic(), logic.getQueueId(), logic.getCQType());
- return true;
- }
- }
-
- if (cqUnit.getPos() < minPhyOffset) {
- log.error("CorrectLogicOffsetService.needCorrect. logic max phy offset: {} is greater than min phy offset: {}, " +
- "but minPhyPos in cq is: {}. min offset in queue: {}, max offset in queue: {}, topic:{}, queue:{}, cqType:{}."
- , logic.getMaxPhysicOffset(), minPhyOffset, cqUnit.getPos(), logic.getMinOffsetInQueue()
- , logic.getMaxOffsetInQueue(), logic.getTopic(), logic.getQueueId(), logic.getCQType());
- return true;
- }
-
- if (cqUnit.getPos() >= minPhyOffset) {
-
- // Normal case, do not need correct.
- return false;
- }
- }
-
- return false;
- }
-
- private void correctLogicMinOffset() {
-
- long lastForeCorrectTimeCurRun = lastForceCorrectTime;
- long minPhyOffset = getMinPhyOffset();
- ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> tables = StreamMessageStore.this.consumeQueueTable;
- for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : tables.values()) {
- for (ConsumeQueueInterface logic : maps.values()) {
- if (needCorrect(logic, minPhyOffset, lastForeCorrectTimeCurRun)) {
- doCorrect(logic, minPhyOffset);
- }
- }
- }
- }
-
- private void doCorrect(ConsumeQueueInterface logic, long minPhyOffset) {
- StreamMessageStore.this.consumeQueueStore.deleteExpiredFile(logic, minPhyOffset);
- int sleepIntervalWhenCorrectMinOffset = StreamMessageStore.this.getMessageStoreConfig().getCorrectLogicMinOffsetSleepInterval();
- if (sleepIntervalWhenCorrectMinOffset > 0) {
- try {
- Thread.sleep(sleepIntervalWhenCorrectMinOffset);
- } catch (InterruptedException ignored) {
- }
- }
- }
-
- public String getServiceName() {
- return CorrectLogicOffsetService.class.getSimpleName();
- }
- }
-
- class FlushConsumeQueueService extends ServiceThread {
- private static final int RETRY_TIMES_OVER = 3;
- private long lastFlushTimestamp = 0;
-
- private void doFlush(int retryTimes) {
- int flushConsumeQueueLeastPages = StreamMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
-
- if (retryTimes == RETRY_TIMES_OVER) {
- flushConsumeQueueLeastPages = 0;
- }
-
- long logicsMsgTimestamp = 0;
-
- int flushConsumeQueueThoroughInterval = StreamMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
- long currentTimeMillis = System.currentTimeMillis();
- if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
- this.lastFlushTimestamp = currentTimeMillis;
- flushConsumeQueueLeastPages = 0;
- logicsMsgTimestamp = StreamMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
- }
-
- ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> tables = StreamMessageStore.this.consumeQueueTable;
-
- for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : tables.values()) {
- for (ConsumeQueueInterface cq : maps.values()) {
- boolean result = false;
- for (int i = 0; i < retryTimes && !result; i++) {
- result = StreamMessageStore.this.consumeQueueStore.flush(cq, flushConsumeQueueLeastPages);
- }
- }
- }
-
- if (0 == flushConsumeQueueLeastPages) {
- if (logicsMsgTimestamp > 0) {
- StreamMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
- }
- StreamMessageStore.this.getStoreCheckpoint().flush();
- }
- }
-
- @Override
- public void run() {
- StreamMessageStore.log.info(this.getServiceName() + " service started");
-
- while (!this.isStopped()) {
- try {
- int interval = StreamMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue();
- this.waitForRunning(interval);
- this.doFlush(1);
- } catch (Exception e) {
- StreamMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
- }
- }
-
- this.doFlush(RETRY_TIMES_OVER);
-
- StreamMessageStore.log.info(this.getServiceName() + " service end");
- }
-
- @Override
- public String getServiceName() {
- return FlushConsumeQueueService.class.getSimpleName();
- }
-
- @Override
- public long getJointime() {
- return 1000 * 60;
- }
- }
-
- private class SyncReputMessageServiceFutureItem {
- private Future future;
- private long nextReputFromOffset;
-
- public SyncReputMessageServiceFutureItem(Future future, long nextReputFromOffset) {
- this.future = future;
- this.nextReputFromOffset = nextReputFromOffset;
- }
-
- public Future getFuture() {
- return future;
- }
-
- public void setFuture(Future future) {
- this.future = future;
- }
-
- public long getNextReputFromOffset() {
- return nextReputFromOffset;
- }
-
- public void setNextReputFromOffset(long nextReputFromOffset) {
- this.nextReputFromOffset = nextReputFromOffset;
- }
- }
-
- class SyncReputMessageService extends ReputMessageService {
-
- private BlockingQueue<SyncReputMessageServiceFutureItem> reputResultQueue;
-
- SyncReputMessageService() {
- reputResultQueue = new LinkedBlockingDeque<>(messageStoreConfig.getDispatchCqThreads() * messageStoreConfig.getDispatchCqCacheNum());
- }
-
- void processDispatchRequestForRecover(DispatchRequest dispatchRequest) {
- int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
- if (dispatchRequest.isSuccess() && size > 0) {
- StreamMessageStore.this.doDispatch(dispatchRequest);
- this.reputFromOffset = Math.max(dispatchRequest.getNextReputFromOffset(), this.reputFromOffset);
- }
- }
-
- void processDispatchRequest(DispatchRequest dispatchRequest) throws InterruptedException {
- int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
- if (dispatchRequest.isSuccess() && size > 0) {
- Future future = StreamMessageStore.this.doDispatch(dispatchRequest, messageStoreConfig.isEnableAsyncReput());
- reputResultQueue.put(new SyncReputMessageServiceFutureItem(future, dispatchRequest.getNextReputFromOffset()));
- }
- }
-
- @Override
- public void shutdown() {
- for (int i = 0; i < 30 && this.reputResultQueue.size() > 0; i++) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignored) {
- }
- }
-
- if (this.reputResultQueue.size() > 0) {
- log.warn("shutdown SyncReputMessageService, but reputResultQueue not all processed, CLMaxOffset: {} reputFromOffset: {}",
- StreamMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);
- }
-
- super.shutdown();
- }
- @Override
- public void run() {
- StreamMessageStore.log.info(this.getServiceName() + " service started");
- while (!this.isStopped()) {
- try {
- SyncReputMessageServiceFutureItem item = reputResultQueue.poll(1, TimeUnit.SECONDS);
- if (null == item) {
- continue;
- }
- item.getFuture().get();
- this.reputFromOffset = Math.max(item.getNextReputFromOffset(), this.reputFromOffset);
-
- } catch (Exception e) {
- StreamMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
- waitForRunning(100);
- }
- }
-
- StreamMessageStore.log.info(this.getServiceName() + " service end");
- }
-
- @Override
- public String getServiceName() {
- return SyncReputMessageService.class.getSimpleName();
- }
- }
-
- class ReputMessageService extends ServiceThread {
-
- protected volatile long reputFromOffset = 0;
-
- public long getReputFromOffset() {
- return reputFromOffset;
- }
-
- public void setReputFromOffset(long reputFromOffset) {
- this.reputFromOffset = reputFromOffset;
- }
-
- public boolean dispatched(long physicalOffset) {
- return this.reputFromOffset > physicalOffset;
- }
-
- @Override
- public void shutdown() {
- for (int i = 0; i < 30 && this.isCommitLogAvailable(); i++) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignored) {
- }
- }
-
- if (this.isCommitLogAvailable()) {
- log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}",
- StreamMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);
- }
-
- super.shutdown();
- }
-
- public long behind() {
- return StreamMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;
- }
-
- protected boolean isCommitLogAvailable() {
- return this.reputFromOffset < StreamMessageStore.this.commitLog.getMaxOffset();
- }
-
- private void doReput() throws Exception {
- if (this.reputFromOffset < StreamMessageStore.this.commitLog.getMinOffset()) {
- log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
- this.reputFromOffset, StreamMessageStore.this.commitLog.getMinOffset());
- this.reputFromOffset = StreamMessageStore.this.commitLog.getMinOffset();
- }
- for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
-
- if (StreamMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
- && this.reputFromOffset >= StreamMessageStore.this.getConfirmOffset()) {
- break;
- }
-
- SelectMappedBufferResult result = StreamMessageStore.this.commitLog.getData(reputFromOffset);
- boolean reputAsync = messageStoreConfig.isEnableAsyncReput();
- if (result != null) {
- long cacheReputFromOffset = this.reputFromOffset;
- List<Future> futures = new LinkedList<>();
- try {
- this.reputFromOffset = result.getStartOffset();
- cacheReputFromOffset = this.reputFromOffset;
-
- for (int readSize = 0; readSize < result.getSize() && doNext; ) {
- DispatchRequest dispatchRequest =
- StreamMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
- int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
-
- if (dispatchRequest.isSuccess()) {
- if (size > 0) {
- futures.add(StreamMessageStore.this.doDispatch(dispatchRequest, reputAsync));
-
- if (BrokerRole.SLAVE != StreamMessageStore.this.getMessageStoreConfig().getBrokerRole()
- && StreamMessageStore.this.brokerConfig.isLongPollingEnable()) {
- StreamMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
- dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
- dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
- dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
- }
-
- cacheReputFromOffset += size;
- readSize += size;
- if (StreamMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
- StreamMessageStore.this.storeStatsService
- .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
- StreamMessageStore.this.storeStatsService
- .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
- .add(dispatchRequest.getMsgSize());
- }
- } else if (size == 0) {
- cacheReputFromOffset = StreamMessageStore.this.commitLog.rollNextFile(cacheReputFromOffset);
- readSize = result.getSize();
- }
- } else if (!dispatchRequest.isSuccess()) {
-
- if (size > 0) {
- log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", cacheReputFromOffset);
- cacheReputFromOffset += size;
- } else {
- doNext = false;
- if (StreamMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
- log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
- cacheReputFromOffset);
- cacheReputFromOffset += result.getSize() - readSize;
- }
- }
- }
- }
- for (Future future: futures) {
- future.get();
- }
- futures.clear();
- this.reputFromOffset = cacheReputFromOffset;
- } finally {
- result.release();
- }
- } else {
- doNext = false;
- }
- }
- }
-
- @Override
- public void run() {
- StreamMessageStore.log.info(this.getServiceName() + " service started");
-
- while (!this.isStopped()) {
- try {
- Thread.sleep(1);
- this.doReput();
- } catch (Exception e) {
- StreamMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
- }
- }
-
- StreamMessageStore.log.info(this.getServiceName() + " service end");
- }
-
- @Override
- public String getServiceName() {
- return ReputMessageService.class.getSimpleName();
- }
-
- }
-}
diff --git a/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java b/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java
index 97c50b9..a78eeed 100644
--- a/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java
+++ b/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java
@@ -8,12 +8,13 @@
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
+
package org.apache.rocketmq.store;
import java.util.ArrayList;
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 7a41c27..e58e695 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.store.config;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.queue.BatchConsumeQueue;
-import org.apache.rocketmq.store.queue.CQType;
import java.io.File;
@@ -206,9 +205,6 @@ public class MessageStoreConfig {
@ImportantField
private boolean enableCleanExpiredOffset = false;
- @ImportantField
- private String defaultCQType = CQType.SimpleCQ.toString();
-
private int maxAsyncPutMessageRequests = 5000;
private int pullBatchMaxMessageCount = 160;
@@ -799,13 +795,6 @@ public class MessageStoreConfig {
this.enableCleanExpiredOffset = enableCleanExpiredOffset;
}
- public String getDefaultCQType() {
- return defaultCQType;
- }
-
- public void setDefaultCQType(String defaultCQType) {
- this.defaultCQType = defaultCQType;
- }
public String getReadOnlyCommitLogStorePaths() {
return readOnlyCommitLogStorePaths;
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
index 50078c9..cbe794e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
@@ -33,7 +33,6 @@ import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import java.net.Inet6Address;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
-import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -431,7 +430,7 @@ public class DLedgerCommitLog extends CommitLog {
String topicQueueKey = msg.getTopic() + "-" + msg.getQueueId();
topicQueueLock.lock(topicQueueKey);
try {
- defaultMessageStore.assignOffset(topicQueueKey, msg, getBatchNum(msg));
+ defaultMessageStore.assignOffset(topicQueueKey, msg, getMessageNum(msg));
encodeResult = this.messageSerializer.serialize(msg);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
index 0b6a9dd..7d1feba 100644
--- a/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.store.logfile;
import org.apache.rocketmq.store.AppendMessageCallback;
import org.apache.rocketmq.store.AppendMessageResult;
-import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.MessageExtBatch;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageContext;
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
index de0e636..648a472 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
@@ -17,11 +17,17 @@
package org.apache.rocketmq.store.queue;
+import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.MappedFileQueue;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.BrokerRole;
@@ -29,6 +35,7 @@ import org.apache.rocketmq.store.logfile.MappedFile;
import java.io.File;
import java.nio.ByteBuffer;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -386,9 +393,9 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy
}
@Override
- public int deleteExpiredFile(long offset) {
- int cnt = this.mappedFileQueue.deleteExpiredFileByOffset(offset, CQ_STORE_UNIT_SIZE);
- this.correctMinOffset(offset);
+ public int deleteExpiredFile(long minCommitLogPos) {
+ int cnt = this.mappedFileQueue.deleteExpiredFileByOffset(minCommitLogPos, CQ_STORE_UNIT_SIZE);
+ this.correctMinOffset(minCommitLogPos);
return cnt;
}
@@ -473,6 +480,21 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy
this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}
+ @Override
+ public void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, MessageExtBrokerInner msg, short messageNum) {
+ HashMap<String, Long> batchTopicQueueTable = queueOffsetAssigner.getBatchTopicQueueTable();
+ String topicQueueKey = getTopic() + "-" + getQueueId();
+
+ Long topicOffset = batchTopicQueueTable.computeIfAbsent(topicQueueKey, k -> 0L);
+
+ if (MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) {
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_BASE, String.valueOf(topicOffset));
+ msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
+ }
+ msg.setQueueOffset(topicOffset);
+ batchTopicQueueTable.put(topicQueueKey, topicOffset + messageNum);
+ }
+
boolean putBatchMessagePositionInfo(final long offset, final int size, final long tagsCode, final long storeTime,
final long msgBaseOffset, final short batchSize) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
index 48f717d..5232a74 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
@@ -17,6 +17,8 @@
package org.apache.rocketmq.store.queue;
+import org.apache.rocketmq.common.attribute.CQType;
+
public interface ConsumeQueueInterface {
/**
* Get the topic name
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index bf42e74..e8146ff 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -16,27 +16,49 @@
*/
package org.apache.rocketmq.store.queue;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.attribute.CQType;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.QueueTypeUtils;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.ConsumeQueue;
-import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.DispatchRequest;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
-import org.apache.rocketmq.store.StoreUtil;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class ConsumeQueueStore {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
protected final MessageStore messageStore;
protected final MessageStoreConfig messageStoreConfig;
+ protected final QueueOffsetAssigner queueOffsetAssigner = new QueueOffsetAssigner();
protected final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueueInterface>> consumeQueueTable;
- public ConsumeQueueStore(MessageStore messageStore, MessageStoreConfig messageStoreConfig, ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> consumeQueueTable) {
+ // Should be careful, do not change the topic config
+ // TopicConfigManager is more suitable here.
+ private ConcurrentMap<String, TopicConfig> topicConfigTable;
+
+ public ConsumeQueueStore(MessageStore messageStore, MessageStoreConfig messageStoreConfig) {
this.messageStore = messageStore;
this.messageStoreConfig = messageStoreConfig;
- this.consumeQueueTable = consumeQueueTable;
+ this.consumeQueueTable = new ConcurrentHashMap<>(32);
+ }
+
+ public void setTopicConfigTable(ConcurrentMap<String, TopicConfig> topicConfigTable) {
+ this.topicConfigTable = topicConfigTable;
}
private FileQueueLifeCycle getLifeCycle(String topic, int queueId) {
@@ -64,21 +86,143 @@ public class ConsumeQueueStore {
fileQueueLifeCycle.putMessagePositionInfoWrapper(request);
}
+ public void putMessagePositionInfoWrapper(DispatchRequest dispatchRequest) {
+ ConsumeQueueInterface cq = this.findOrCreateConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
+ this.putMessagePositionInfoWrapper(cq, dispatchRequest);
+ }
+
public boolean load(ConsumeQueueInterface consumeQueue) {
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
return fileQueueLifeCycle.load();
}
+ public boolean load() {
+ return loadConsumeQueues() && loadBatchConsumeQueues();
+ }
+
+ private boolean loadBatchConsumeQueues() {
+ File dirLogic = new File(StorePathConfigHelper.getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
+ File[] fileTopicList = dirLogic.listFiles();
+ if (fileTopicList != null) {
+
+ for (File fileTopic : fileTopicList) {
+ String topic = fileTopic.getName();
+
+ File[] fileQueueIdList = fileTopic.listFiles();
+ if (fileQueueIdList != null) {
+ for (File fileQueueId : fileQueueIdList) {
+ int queueId;
+ try {
+ queueId = Integer.parseInt(fileQueueId.getName());
+ } catch (NumberFormatException e) {
+ continue;
+ }
+
+ TopicConfig topicConfig = this.topicConfigTable == null ? null : this.topicConfigTable.get(topic);
+
+ // For batch consume queue, the topic config must exist
+ if (topicConfig == null) {
+ log.warn("topic: {} has no topic config.", topic);
+ continue;
+ }
+
+ if (!Objects.equals(CQType.BatchCQ, QueueTypeUtils.getCQType(Optional.of(topicConfig)))) {
+ log.error("[BUG]topic: {} should be BCQ.", topic);
+ }
+
+ ConsumeQueueInterface logic = new BatchConsumeQueue(
+ topic,
+ queueId,
+ StorePathConfigHelper.getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
+ this.messageStoreConfig.getMappedFileSizeConsumeQueue(),
+ this.messageStore);
+ this.putConsumeQueue(topic, queueId, logic);
+ if (!this.load(logic)) {
+ return false;
+ }
+ }
+ }
+ }
+ }
+
+ log.info("load batch consume queue all over, OK");
+
+ return true;
+ }
+
+ private boolean loadConsumeQueues() {
+ File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
+ File[] fileTopicList = dirLogic.listFiles();
+ if (fileTopicList != null) {
+
+ for (File fileTopic : fileTopicList) {
+ String topic = fileTopic.getName();
+
+ File[] fileQueueIdList = fileTopic.listFiles();
+ if (fileQueueIdList != null) {
+ for (File fileQueueId : fileQueueIdList) {
+ int queueId;
+ try {
+ queueId = Integer.parseInt(fileQueueId.getName());
+ } catch (NumberFormatException e) {
+ continue;
+ }
+ ConsumeQueueInterface logic = new ConsumeQueue(
+ topic,
+ queueId,
+ StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
+ this.messageStoreConfig.getMappedFileSizeConsumeQueue(),
+ this.messageStore);
+ this.putConsumeQueue(topic, queueId, logic);
+ if (!this.load(logic)) {
+ return false;
+ }
+ }
+ }
+ }
+ }
+
+ log.info("load logics queue all over, OK");
+
+ return true;
+ }
+
public void recover(ConsumeQueueInterface consumeQueue) {
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
fileQueueLifeCycle.recover();
}
+ public long recover() {
+ long maxPhysicOffset = -1;
+ for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
+ for (ConsumeQueueInterface logic : maps.values()) {
+ this.recover(logic);
+ if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
+ maxPhysicOffset = logic.getMaxPhysicOffset();
+ }
+ }
+ }
+
+ return maxPhysicOffset;
+ }
+
public void checkSelf(ConsumeQueueInterface consumeQueue) {
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
fileQueueLifeCycle.checkSelf();
}
+ public void checkSelf() {
+ Iterator<Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>> it = this.consumeQueueTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> next = it.next();
+ Iterator<Map.Entry<Integer, ConsumeQueueInterface>> itNext = next.getValue().entrySet().iterator();
+ while (itNext.hasNext()) {
+ Map.Entry<Integer, ConsumeQueueInterface> cq = itNext.next();
+ this.checkSelf(cq.getValue());
+ }
+ }
+ }
+
public boolean flush(ConsumeQueueInterface consumeQueue, int flushLeastPages) {
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
return fileQueueLifeCycle.flush(flushLeastPages);
@@ -89,9 +233,9 @@ public class ConsumeQueueStore {
fileQueueLifeCycle.destroy();
}
- public int deleteExpiredFile(ConsumeQueueInterface consumeQueue, long minOffset) {
+ public int deleteExpiredFile(ConsumeQueueInterface consumeQueue, long minCommitLogPos) {
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
- return fileQueueLifeCycle.deleteExpiredFile(minOffset);
+ return fileQueueLifeCycle.deleteExpiredFile(minCommitLogPos);
}
public void truncateDirtyLogicFiles(ConsumeQueueInterface consumeQueue, long phyOffset) {
@@ -124,10 +268,9 @@ public class ConsumeQueueStore {
}
private ConsumeQueueInterface doFindOrCreateConsumeQueue(String topic, int queueId) {
-
ConcurrentMap<Integer, ConsumeQueueInterface> map = consumeQueueTable.get(topic);
if (null == map) {
- ConcurrentMap<Integer, ConsumeQueueInterface> newMap = new ConcurrentHashMap<Integer, ConsumeQueueInterface>(128);
+ ConcurrentMap<Integer, ConsumeQueueInterface> newMap = new ConcurrentHashMap<>(128);
ConcurrentMap<Integer, ConsumeQueueInterface> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
@@ -143,7 +286,9 @@ public class ConsumeQueueStore {
ConsumeQueueInterface newLogic;
- if (StoreUtil.isStreamMode(this.messageStore)) {
+ Optional<TopicConfig> topicConfig = this.getTopicConfig(topic);
+ // TODO maybe the topic has been deleted.
+ if (Objects.equals(CQType.BatchCQ, QueueTypeUtils.getCQType(topicConfig))) {
newLogic = new BatchConsumeQueue(
topic,
queueId,
@@ -162,7 +307,7 @@ public class ConsumeQueueStore {
queueId,
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
this.messageStoreConfig.getMappedFileSizeConsumeQueue(),
- (DefaultMessageStore) this.messageStore);
+ this.messageStore);
ConsumeQueueInterface oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
@@ -173,4 +318,129 @@ public class ConsumeQueueStore {
return logic;
}
+
+ public Long getMaxOffset(String topic, int queueId) {
+ return this.queueOffsetAssigner.getTopicQueueTable().get(topic + "-" + queueId);
+ }
+
+ public void setTopicQueueTable(HashMap<String, Long> topicQueueTable) {
+ this.queueOffsetAssigner.setTopicQueueTable(topicQueueTable);
+ }
+
+ public void setBatchTopicQueueTable(HashMap<String, Long> batchTopicQueueTable) {
+ this.queueOffsetAssigner.setBatchTopicQueueTable(batchTopicQueueTable);
+ }
+
+ public void assignQueueOffset(MessageExtBrokerInner msg, short messageNum) {
+ FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(msg.getTopic(), msg.getQueueId());
+ fileQueueLifeCycle.assignQueueOffset(this.queueOffsetAssigner, msg, messageNum);
+ }
+
+ public void removeTopicQueueTable(String topic, Integer queueId) {
+ this.queueOffsetAssigner.remove(topic, queueId);
+ }
+
+ public ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> getConsumeQueueTable() {
+ return consumeQueueTable;
+ }
+
+ private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueueInterface consumeQueue) {
+ ConcurrentMap<Integer/* queueId */, ConsumeQueueInterface> map = this.consumeQueueTable.get(topic);
+ if (null == map) {
+ map = new ConcurrentHashMap<>();
+ map.put(queueId, consumeQueue);
+ this.consumeQueueTable.put(topic, map);
+ } else {
+ map.put(queueId, consumeQueue);
+ }
+ }
+
+ public void recoverOffsetTable(long minPhyOffset) {
+ HashMap<String, Long> cqOffsetTable = new HashMap<>(1024);
+ HashMap<String, Long> bcqOffsetTable = new HashMap<>(1024);
+
+ for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
+ for (ConsumeQueueInterface logic : maps.values()) {
+ String key = logic.getTopic() + "-" + logic.getQueueId();
+
+ long maxOffsetInQueue = logic.getMaxOffsetInQueue();
+ if (Objects.equals(CQType.BatchCQ, logic.getCQType())) {
+ bcqOffsetTable.put(key, maxOffsetInQueue);
+ } else {
+ cqOffsetTable.put(key, maxOffsetInQueue);
+ }
+
+ this.correctMinOffset(logic, minPhyOffset);
+ }
+ }
+
+ this.setTopicQueueTable(cqOffsetTable);
+ this.setBatchTopicQueueTable(bcqOffsetTable);
+ }
+
+ public void destroy() {
+ for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
+ for (ConsumeQueueInterface logic : maps.values()) {
+ this.destroy(logic);
+ }
+ }
+ }
+
+ public void cleanExpired(long minCommitLogOffset) {
+ Iterator<Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>> it = this.consumeQueueTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> next = it.next();
+ String topic = next.getKey();
+ if (!topic.equals(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC)) {
+ ConcurrentMap<Integer, ConsumeQueueInterface> queueTable = next.getValue();
+ Iterator<Map.Entry<Integer, ConsumeQueueInterface>> itQT = queueTable.entrySet().iterator();
+ while (itQT.hasNext()) {
+ Map.Entry<Integer, ConsumeQueueInterface> nextQT = itQT.next();
+ long maxCLOffsetInConsumeQueue = nextQT.getValue().getLastOffset();
+
+ if (maxCLOffsetInConsumeQueue == -1) {
+ log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.",
+ nextQT.getValue().getTopic(),
+ nextQT.getValue().getQueueId(),
+ nextQT.getValue().getMaxPhysicOffset(),
+ nextQT.getValue().getMinLogicOffset());
+ } else if (maxCLOffsetInConsumeQueue < minCommitLogOffset) {
+ log.info(
+ "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}",
+ topic,
+ nextQT.getKey(),
+ minCommitLogOffset,
+ maxCLOffsetInConsumeQueue);
+
+ removeTopicQueueTable(nextQT.getValue().getTopic(),
+ nextQT.getValue().getQueueId());
+
+ this.destroy(nextQT.getValue());
+ itQT.remove();
+ }
+ }
+
+ if (queueTable.isEmpty()) {
+ log.info("cleanExpiredConsumerQueue: {},topic destroyed", topic);
+ it.remove();
+ }
+ }
+ }
+ }
+
+ public void truncateDirty(long phyOffset) {
+ for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
+ for (ConsumeQueueInterface logic : maps.values()) {
+ this.truncateDirtyLogicFiles(logic, phyOffset);
+ }
+ }
+ }
+
+ public Optional<TopicConfig> getTopicConfig(String topic) {
+ if (this.topicConfigTable == null) {
+ return Optional.empty();
+ }
+
+ return Optional.ofNullable(this.topicConfigTable.get(topic));
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java b/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java
index dd8c86d..05c217f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.store.queue;
import org.apache.rocketmq.store.DispatchRequest;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.Swappable;
public interface FileQueueLifeCycle extends Swappable {
@@ -32,4 +33,5 @@ public interface FileQueueLifeCycle extends Swappable {
boolean isFirstFileExist();
void correctMinOffset(long minCommitLogOffset);
void putMessagePositionInfoWrapper(DispatchRequest request);
+ void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, MessageExtBrokerInner msg, short messageNum);
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
new file mode 100644
index 0000000..09e18ec
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.queue;
+
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+import java.util.HashMap;
+
+/**
+ * QueueOffsetAssigner is a component for assigning queue.
+ *
+ */
+public class QueueOffsetAssigner {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+
+ private HashMap<String, Long> topicQueueTable = new HashMap<>(1024);
+ private HashMap<String, Long> batchTopicQueueTable = new HashMap<>(1024);
+
+ public HashMap<String, Long> getTopicQueueTable() {
+ return topicQueueTable;
+ }
+
+ public void setTopicQueueTable(HashMap<String, Long> topicQueueTable) {
+ this.topicQueueTable = topicQueueTable;
+ }
+
+ public HashMap<String, Long> getBatchTopicQueueTable() {
+ return batchTopicQueueTable;
+ }
+
+ public void setBatchTopicQueueTable(HashMap<String, Long> batchTopicQueueTable) {
+ this.batchTopicQueueTable = batchTopicQueueTable;
+ }
+
+ public synchronized void remove(String topic, Integer queueId) {
+ String topicQueueKey = topic + "-" + queueId;
+ // Beware of thread-safety
+ this.topicQueueTable.remove(topicQueueKey);
+ this.batchTopicQueueTable.remove(topicQueueKey);
+
+ log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", topic, queueId);
+ }
+}
\ No newline at end of file
diff --git a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java b/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
deleted file mode 100644
index 1067337..0000000
--- a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.store.util;
-
-import org.apache.rocketmq.common.TopicAttributes;
-import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.MessageStore;
-import org.apache.rocketmq.store.StreamMessageStore;
-import org.apache.rocketmq.store.queue.CQType;
-
-import java.util.Map;
-
-public class QueueTypeUtils {
-
- @Deprecated
- public static CQType getCQType(MessageStore messageStore) {
- if (messageStore instanceof DefaultMessageStore) {
- return CQType.SimpleCQ;
- } else if (messageStore instanceof StreamMessageStore) {
- return CQType.BatchCQ;
- } else {
- throw new RuntimeException("new cq type is not supported now.");
- }
- }
-
- public static CQType getCQType(TopicConfig topicConfig) {
- String attributeName = TopicAttributes.QUEUE_TYPE.getName();
-
- Map<String, String> attributes = topicConfig.getAttributes();
- if (attributes == null || attributes.size() == 0) {
- return CQType.valueOf(TopicAttributes.QUEUE_TYPE.getDefaultValue());
- }
-
- if (attributes.containsKey(attributeName)) {
- return CQType.valueOf(attributes.get(attributeName));
- } else {
- return CQType.valueOf(TopicAttributes.QUEUE_TYPE.getDefaultValue());
- }
- }
-}
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java
index 1ccb974..e27483e 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
-import org.apache.rocketmq.store.queue.CQType;
+import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.After;
import org.junit.Before;
@@ -72,7 +72,6 @@ public class DefaultMessageStoreShutDownTest {
messageStoreConfig.setMaxIndexNum(100 * 100);
messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
messageStoreConfig.setHaListenPort(StoreTestBase.nextPort());
- messageStoreConfig.setDefaultCQType(CQType.SimpleCQ.name());
return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), null, new BrokerConfig());
}
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 41cf92f..932fc2f 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -38,7 +38,7 @@ import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
-import org.apache.rocketmq.store.queue.CQType;
+import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
@@ -115,7 +115,6 @@ public class DefaultMessageStoreTest {
messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
messageStoreConfig.setFlushIntervalConsumeQueue(1);
messageStoreConfig.setHaListenPort(StoreTestBase.nextPort());
- messageStoreConfig.setDefaultCQType(CQType.SimpleCQ.name());
return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), new MyMessageArrivingListener(), new BrokerConfig());
}
diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
index 3ebe148..500cd81 100644
--- a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
@@ -17,9 +17,17 @@
package org.apache.rocketmq.store.queue;
+import org.apache.rocketmq.common.TopicAttributes;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.attribute.CQType;
+import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+import org.apache.rocketmq.common.utils.QueueTypeUtils;
+import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageExtBrokerInner;
@@ -27,21 +35,168 @@ import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import java.io.File;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
public class BatchConsumeMessageTest extends QueueTestBase {
+ private MessageStore messageStore;
- @Test
- public void testGetOffsetInQueueByTime() throws Exception {
- MessageStore messageStore = createMessageStore(null, true, CQType.BatchCQ);
+ @Before
+ public void init() throws Exception {
+ messageStore = createMessageStore(null, true);
messageStore.load();
messageStore.start();
+ }
+
+ @After
+ public void destroy() {
+ messageStore.shutdown();
+ messageStore.destroy();
+
+ File file = new File(messageStore.getMessageStoreConfig().getStorePathRootDir());
+ UtilAll.deleteFile(file);
+ }
+
+ @Test
+ public void testSendMessagesToCqTopic() {
+ String topic = UUID.randomUUID().toString();
+ createTopic(topic, CQType.SimpleCQ, messageStore);
+
+ int batchNum = 10;
+
+ // case 1 has PROPERTY_INNER_NUM but has no INNER_BATCH_FLAG
+ MessageExtBrokerInner messageExtBrokerInner = buildMessage(topic, batchNum);
+ messageExtBrokerInner.setSysFlag(0);
+ PutMessageResult putMessageResult = messageStore.putMessage(messageExtBrokerInner);
+ Assert.assertEquals(PutMessageStatus.MESSAGE_ILLEGAL, putMessageResult.getPutMessageStatus());
+
+ // case 2 has PROPERTY_INNER_NUM and has INNER_BATCH_FLAG, but is not a batchCq
+ messageExtBrokerInner = buildMessage(topic, 1);
+ putMessageResult = messageStore.putMessage(messageExtBrokerInner);
+ Assert.assertEquals(PutMessageStatus.MESSAGE_ILLEGAL, putMessageResult.getPutMessageStatus());
+
+ // case 3 has neither PROPERTY_INNER_NUM nor INNER_BATCH_FLAG.
+ messageExtBrokerInner = buildMessage(topic, -1);
+ putMessageResult = messageStore.putMessage(messageExtBrokerInner);
+ Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
+ }
+
+ @Test
+ public void testSendMessagesToBcqTopic() {
+ String topic = UUID.randomUUID().toString();
+ createTopic(topic, CQType.BatchCQ, messageStore);
+
+ // case 1 has PROPERTY_INNER_NUM but has no INNER_BATCH_FLAG
+ MessageExtBrokerInner messageExtBrokerInner = buildMessage(topic, 1);
+ PutMessageResult putMessageResult = messageStore.putMessage(messageExtBrokerInner);
+ Assert.assertEquals(PutMessageStatus.MESSAGE_ILLEGAL, putMessageResult.getPutMessageStatus());
+
+ // case 2 has neither PROPERTY_INNER_NUM nor INNER_BATCH_FLAG.
+ messageExtBrokerInner = buildMessage(topic, -1);
+ putMessageResult = messageStore.putMessage(messageExtBrokerInner);
+ Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
+
+ // case 3 has INNER_BATCH_FLAG but has no PROPERTY_INNER_NUM.
+ messageExtBrokerInner = buildMessage(topic, 1);
+ MessageAccessor.clearProperty(messageExtBrokerInner, MessageConst.PROPERTY_INNER_NUM);
+ messageExtBrokerInner.setSysFlag(MessageSysFlag.INNER_BATCH_FLAG);
+ putMessageResult = messageStore.putMessage(messageExtBrokerInner);
+ Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
+ }
+
+ @Test
+ public void testConsumeBatchMessage() {
+ String topic = UUID.randomUUID().toString();
+ createTopic(topic, CQType.BatchCQ, messageStore);
+ int batchNum = 10;
+
+ MessageExtBrokerInner messageExtBrokerInner = buildMessage(topic, batchNum);
+ PutMessageResult putMessageResult = messageStore.putMessage(messageExtBrokerInner);
+ Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
+
+ await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
+
+ List<GetMessageResult> results = new ArrayList<>();
+ for (int i = 0; i < batchNum; i++) {
+ GetMessageResult result = messageStore.getMessage("whatever", topic, 0, i, Integer.MAX_VALUE, Integer.MAX_VALUE, null);
+ try {
+ Assert.assertEquals(GetMessageStatus.FOUND, result.getStatus());
+ results.add(result);
+ } finally {
+ result.release();
+ }
+ }
+
+ for (GetMessageResult result : results) {
+ Assert.assertEquals(0, result.getMinOffset());
+ Assert.assertEquals(batchNum, result.getMaxOffset());
+ }
+
+ }
+
+ @Test
+ public void testNextBeginOffsetConsumeBatchMessage() {
+ String topic = UUID.randomUUID().toString();
+ createTopic(topic, CQType.BatchCQ, messageStore);
+ Random random = new Random();
+ int putMessageCount = 1000;
+
+ Queue<Integer> queue = new ArrayDeque<>();
+ for (int i = 0; i < putMessageCount; i++) {
+ int batchNum = random.nextInt(1000) + 2;
+ MessageExtBrokerInner messageExtBrokerInner = buildMessage(topic, batchNum);
+ PutMessageResult putMessageResult = messageStore.putMessage(messageExtBrokerInner);
+ Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
+ queue.add(batchNum);
+ }
+
+ await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
+
+ long pullOffset = 0L;
+ int getMessageCount = 0;
+ while (true) {
+ GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, pullOffset, 1, null);
+ if (Objects.equals(getMessageResult.getStatus(), GetMessageStatus.OFFSET_OVERFLOW_ONE)) {
+ break;
+ }
+ Assert.assertEquals(1, getMessageResult.getMessageQueueOffset().size());
+ Long baseOffset = getMessageResult.getMessageQueueOffset().get(0);
+ Integer batchNum = queue.poll();
+ Assert.assertNotNull(batchNum);
+ Assert.assertEquals(baseOffset + batchNum, getMessageResult.getNextBeginOffset());
+ pullOffset = getMessageResult.getNextBeginOffset();
+ getMessageCount++;
+ }
+ Assert.assertEquals(putMessageCount, getMessageCount);
+ }
+
+ @Test
+ public void testGetOffsetInQueueByTime() throws Exception {
String topic = "testGetOffsetInQueueByTime";
- //The initial min max offset, before and after the creation of consume queue
+ createTopic(topic, CQType.BatchCQ, messageStore);
+ Assert.assertTrue(QueueTypeUtils.isBatchCq(messageStore.getTopicConfig(topic)));
+
+ // The initial min max offset, before and after the creation of consume queue
Assert.assertEquals(0, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(-1, messageStore.getMinOffsetInQueue(topic, 0));
@@ -54,34 +209,30 @@ public class BatchConsumeMessageTest extends QueueTestBase {
if (i == 7)
timeMid = System.currentTimeMillis();
}
+
+ await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
+
Assert.assertEquals(80, messageStore.getOffsetInQueueByTime(topic, 0, timeMid));
+ Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
+ Assert.assertEquals(190, messageStore.getMaxOffsetInQueue(topic, 0));
+
Thread.sleep(5 * 1000);
int maxBatchDeleteFilesNum = messageStore.getMessageStoreConfig().getMaxBatchDeleteFilesNum();
messageStore.getCommitLog().deleteExpiredFile(1L, 100, 12000, true, maxBatchDeleteFilesNum);
+ Assert.assertEquals(80, messageStore.getOffsetInQueueByTime(topic, 0, timeMid));
Thread.sleep(70 * 1000);
Assert.assertEquals(180, messageStore.getOffsetInQueueByTime(topic, 0, timeMid));
-
}
@Test
public void testDispatchNormalConsumeQueue() throws Exception {
- MessageStore messageStore = createMessageStore(null, true, CQType.SimpleCQ);
- messageStore.load();
- messageStore.start();
String topic = "TestDispatchBuildConsumeQueue";
- int batchNum = 10;
+ createTopic(topic, CQType.SimpleCQ, messageStore);
+
long timeStart = System.currentTimeMillis();
long timeMid = -1;
- for (int i = 0; i < 100; i++) {
-// MessageExtBrokerInner messageExtBrokerInner = buildMessage(topic, batchNum);
-// messageExtBrokerInner.setSysFlag(0);
-// PutMessageResult putMessageResult = messageStore.putMessage(messageExtBrokerInner);
-// Assert.assertEquals(PutMessageStatus.MESSAGE_ILLEGAL, putMessageResult.getPutMessageStatus());
-//
-// messageExtBrokerInner = buildMessage(topic, 1);
-// putMessageResult = messageStore.putMessage(messageExtBrokerInner);
-// Assert.assertEquals(PutMessageStatus.MESSAGE_ILLEGAL, putMessageResult.getPutMessageStatus());
+ for (int i = 0; i < 100; i++) {
MessageExtBrokerInner messageExtBrokerInner = buildMessage(topic, -1);
PutMessageResult putMessageResult = messageStore.putMessage(messageExtBrokerInner);
Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
@@ -90,7 +241,9 @@ public class BatchConsumeMessageTest extends QueueTestBase {
if (i == 49)
timeMid = System.currentTimeMillis();
}
- Thread.sleep(500);
+
+ await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
+
ConsumeQueueInterface consumeQueue = messageStore.getConsumeQueue(topic, 0);
Assert.assertEquals(CQType.SimpleCQ, consumeQueue.getCQType());
//check the consume queue
@@ -106,43 +259,41 @@ public class BatchConsumeMessageTest extends QueueTestBase {
for (int i = -100; i < 100; i += 20) {
Assert.assertEquals(consumeQueue.getOffsetInQueueByTime(timeMid + i), messageStore.getOffsetInQueueByTime(topic, 0, timeMid + i));
}
+
//check the message time
+ long latencyAllowed = 20;
long earlistMessageTime = messageStore.getEarliestMessageTime(topic, 0);
- Assert.assertTrue(earlistMessageTime > timeStart - 10);
- Assert.assertTrue(earlistMessageTime < timeStart + 10);
+ Assert.assertTrue(earlistMessageTime > timeStart - latencyAllowed);
+ Assert.assertTrue(earlistMessageTime < timeStart + latencyAllowed);
long messageStoreTime = messageStore.getMessageStoreTimeStamp(topic, 0, 50);
- Assert.assertTrue(messageStoreTime > timeMid - 10);
- Assert.assertTrue(messageStoreTime < timeMid + 10);
+ Assert.assertTrue(messageStoreTime > timeMid - latencyAllowed);
+ Assert.assertTrue(messageStoreTime < timeMid + latencyAllowed);
long commitLogOffset = messageStore.getCommitLogOffsetInQueue(topic, 0, 50);
Assert.assertTrue(commitLogOffset >= messageStore.getMinPhyOffset());
Assert.assertTrue(commitLogOffset <= messageStore.getMaxPhyOffset());
Assert.assertFalse(messageStore.checkInDiskByConsumeOffset(topic, 0, 50));
- messageStore.shutdown();
- messageStore.destroy();
}
@Test
public void testDispatchBuildBatchConsumeQueue() throws Exception {
- MessageStore messageStore = createMessageStore(null, true, CQType.BatchCQ);
- messageStore.load();
- messageStore.start();
String topic = "testDispatchBuildBatchConsumeQueue";
int batchNum = 10;
long timeStart = System.currentTimeMillis();
long timeMid = -1;
+
+ createTopic(topic, CQType.BatchCQ, messageStore);
+
for (int i = 0; i < 100; i++) {
PutMessageResult putMessageResult = messageStore.putMessage(buildMessage(topic, batchNum));
Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
Thread.sleep(2);
if (i == 29)
timeMid = System.currentTimeMillis();
-
- MessageExtBrokerInner messageExtBrokerInner = buildMessage(topic, 1);
- putMessageResult = messageStore.putMessage(messageExtBrokerInner);
- Assert.assertEquals(PutMessageStatus.MESSAGE_ILLEGAL, putMessageResult.getPutMessageStatus());
}
- Thread.sleep(500);
+
+ await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
+
ConsumeQueueInterface consumeQueue = messageStore.getConsumeQueue(topic, 0);
Assert.assertEquals(CQType.BatchCQ, consumeQueue.getCQType());
@@ -176,21 +327,18 @@ public class BatchConsumeMessageTest extends QueueTestBase {
for (int i = 0; i < 10; i++) {
SelectMappedBufferResult sbr = getMessageResult.getMessageMapedList().get(i);
MessageExt messageExt = MessageDecoder.decode(sbr.getByteBuffer());
- short tmpBatchNum = Short.valueOf(messageExt.getProperty(MessageConst.PROPERTY_INNER_NUM));
+ short tmpBatchNum = Short.parseShort(messageExt.getProperty(MessageConst.PROPERTY_INNER_NUM));
Assert.assertEquals(i * batchNum, Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_BASE)));
Assert.assertEquals(batchNum, tmpBatchNum);
}
-
- messageStore.destroy();
- messageStore.shutdown();
}
@Test
- public void testGetBatchMessageWithinNumber() throws Exception {
- MessageStore messageStore = createMessageStore(null, true, CQType.BatchCQ);
- messageStore.load();
- messageStore.start();
+ public void testGetBatchMessageWithinNumber() {
String topic = UUID.randomUUID().toString();
+
+ createTopic(topic, CQType.BatchCQ, messageStore);
+
int batchNum = 20;
for (int i = 0; i < 200; i++) {
PutMessageResult putMessageResult = messageStore.putMessage(buildMessage(topic, batchNum));
@@ -198,7 +346,9 @@ public class BatchConsumeMessageTest extends QueueTestBase {
Assert.assertEquals(i * batchNum, putMessageResult.getAppendMessageResult().getLogicsOffset());
Assert.assertEquals(batchNum, putMessageResult.getAppendMessageResult().getMsgNum());
}
- Thread.sleep(500);
+
+ await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
+
ConsumeQueueInterface consumeQueue = messageStore.getConsumeQueue(topic, 0);
Assert.assertEquals(CQType.BatchCQ, consumeQueue.getCQType());
Assert.assertEquals(0, consumeQueue.getMinOffsetInQueue());
@@ -212,7 +362,7 @@ public class BatchConsumeMessageTest extends QueueTestBase {
Assert.assertEquals(batchNum, getMessageResult.getMessageCount());
SelectMappedBufferResult sbr = getMessageResult.getMessageMapedList().get(0);
MessageExt messageExt = MessageDecoder.decode(sbr.getByteBuffer());
- short tmpBatchNum = Short.valueOf(messageExt.getProperty(MessageConst.PROPERTY_INNER_NUM));
+ short tmpBatchNum = Short.parseShort(messageExt.getProperty(MessageConst.PROPERTY_INNER_NUM));
Assert.assertEquals(0, messageExt.getQueueOffset());
Assert.assertEquals(batchNum, tmpBatchNum);
}
@@ -236,22 +386,19 @@ public class BatchConsumeMessageTest extends QueueTestBase {
SelectMappedBufferResult sbr = getMessageResult.getMessageMapedList().get(i);
MessageExt messageExt = MessageDecoder.decode(sbr.getByteBuffer());
Assert.assertNotNull(messageExt);
- short innerBatchNum = Short.valueOf(messageExt.getProperty(MessageConst.PROPERTY_INNER_NUM));
+ short innerBatchNum = Short.parseShort(messageExt.getProperty(MessageConst.PROPERTY_INNER_NUM));
Assert.assertEquals(i * batchNum, Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_BASE)));
Assert.assertEquals(batchNum, innerBatchNum);
}
}
- messageStore.destroy();
- messageStore.shutdown();
}
@Test
- public void testGetBatchMessageWithinSize() throws Exception {
- MessageStore messageStore = createMessageStore(null, true, CQType.BatchCQ);
- messageStore.load();
- messageStore.start();
+ public void testGetBatchMessageWithinSize() {
String topic = UUID.randomUUID().toString();
+ createTopic(topic, CQType.BatchCQ, messageStore);
+
int batchNum = 10;
for (int i = 0; i < 100; i++) {
PutMessageResult putMessageResult = messageStore.putMessage(buildMessage(topic, batchNum));
@@ -259,7 +406,9 @@ public class BatchConsumeMessageTest extends QueueTestBase {
Assert.assertEquals(i * 10, putMessageResult.getAppendMessageResult().getLogicsOffset());
Assert.assertEquals(batchNum, putMessageResult.getAppendMessageResult().getMsgNum());
}
- Thread.sleep(500);
+
+ await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
+
ConsumeQueueInterface consumeQueue = messageStore.getConsumeQueue(topic, 0);
Assert.assertEquals(CQType.BatchCQ, consumeQueue.getCQType());
Assert.assertEquals(0, consumeQueue.getMinOffsetInQueue());
@@ -272,7 +421,7 @@ public class BatchConsumeMessageTest extends QueueTestBase {
Assert.assertEquals(1, getMessageResult.getMessageMapedList().size());
SelectMappedBufferResult sbr = getMessageResult.getMessageMapedList().get(0);
MessageExt messageExt = MessageDecoder.decode(sbr.getByteBuffer());
- short tmpBatchNum = Short.valueOf(messageExt.getProperty(MessageConst.PROPERTY_INNER_NUM));
+ short tmpBatchNum = Short.parseShort(messageExt.getProperty(MessageConst.PROPERTY_INNER_NUM));
Assert.assertEquals(0, messageExt.getQueueOffset());
Assert.assertEquals(batchNum, tmpBatchNum);
}
@@ -293,14 +442,29 @@ public class BatchConsumeMessageTest extends QueueTestBase {
Assert.assertFalse(getMessageResult.getMessageMapedList().get(i).hasReleased());
SelectMappedBufferResult sbr = getMessageResult.getMessageMapedList().get(i);
MessageExt messageExt = MessageDecoder.decode(sbr.getByteBuffer());
- short tmpBatchNum = Short.valueOf(messageExt.getProperty(MessageConst.PROPERTY_INNER_NUM));
+ short tmpBatchNum = Short.parseShort(messageExt.getProperty(MessageConst.PROPERTY_INNER_NUM));
Assert.assertEquals(i * batchNum, Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_BASE)));
Assert.assertEquals(batchNum, tmpBatchNum);
}
}
- messageStore.destroy();
- messageStore.shutdown();
+ }
+
+ private void createTopic(String topic, CQType cqType, MessageStore messageStore) {
+ ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
+ TopicConfig topicConfigToBeAdded = new TopicConfig();
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), cqType.toString());
+ topicConfigToBeAdded.setTopicName(topic);
+ topicConfigToBeAdded.setAttributes(attributes);
+
+ topicConfigTable.put(topic, topicConfigToBeAdded);
+ ((DefaultMessageStore)messageStore).setTopicConfigTable(topicConfigTable);
+ }
+
+ private Callable<Boolean> fullyDispatched(MessageStore messageStore) {
+ return () -> messageStore.dispatchBehindBytes() == 0;
}
}
diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java
index 8395d7d..1d9addc 100644
--- a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java
@@ -19,8 +19,8 @@ package org.apache.rocketmq.store.queue;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.store.ConsumeQueue;
-import org.apache.rocketmq.store.StreamMessageStore;
-import org.apache.rocketmq.store.MessageArrivingListener;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.StoreTestBase;
import org.apache.rocketmq.store.config.MessageStoreConfig;
@@ -32,7 +32,6 @@ import org.junit.Test;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Random;
import static java.lang.String.format;
@@ -45,13 +44,13 @@ public class BatchConsumeQueueTest extends StoreTestBase {
path = createBaseDir();
}
baseDirs.add(path);
- StreamMessageStore bcqMessageStore = null;
+ MessageStore messageStore = null;
try {
- bcqMessageStore = createBcqMessageStore(null);
+ messageStore = createMessageStore(null);
} catch (Exception e) {
Assert.fail();
}
- BatchConsumeQueue batchConsumeQueue = new BatchConsumeQueue("topic", 0, path,fileSize, bcqMessageStore);
+ BatchConsumeQueue batchConsumeQueue = new BatchConsumeQueue("topic", 0, path,fileSize, messageStore);
batchConsumeQueues.add(batchConsumeQueue);
return batchConsumeQueue;
}
@@ -280,7 +279,7 @@ public class BatchConsumeQueueTest extends StoreTestBase {
}
}
- protected StreamMessageStore createBcqMessageStore(String baseDir) throws Exception {
+ protected MessageStore createMessageStore(String baseDir) throws Exception {
if (baseDir == null) {
baseDir = createBaseDir();
}
@@ -299,17 +298,13 @@ public class BatchConsumeQueueTest extends StoreTestBase {
messageStoreConfig.setMaxTransferBytesOnMessageInMemory(1024 * 1024);
messageStoreConfig.setMaxTransferCountOnMessageInDisk(1024);
messageStoreConfig.setMaxTransferCountOnMessageInMemory(1024);
- messageStoreConfig.setDefaultCQType(CQType.BatchCQ.name());
messageStoreConfig.setSearchBcqByCacheEnable(true);
- StreamMessageStore defaultMessageStore = new StreamMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), new MessageArrivingListener() {
- @Override
- public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
- byte[] filterBitMap, Map<String, String> properties) {
-
- }
- }, new BrokerConfig());
- return defaultMessageStore;
+ return new DefaultMessageStore(
+ messageStoreConfig,
+ new BrokerStatsManager("simpleTest"),
+ (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> {},
+ new BrokerConfig());
}
}
diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
index 4345f0e..f274201 100644
--- a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.store.queue;
+import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.MessageStore;
@@ -30,7 +31,7 @@ public class ConsumeQueueTest extends QueueTestBase {
public void testIterator() throws Exception {
final int msgNum = 100;
final int msgSize = 1000;
- MessageStore messageStore = createMessageStore(null, true, CQType.SimpleCQ);
+ MessageStore messageStore = createMessageStore(null, true);
messageStore.load();
String topic = UUID.randomUUID().toString();
//The initial min max offset, before and after the creation of consume queue
diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java b/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java
index a8e9f94..c16d7cb 100644
--- a/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java
+++ b/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java
@@ -23,7 +23,6 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.StreamMessageStore;
import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
@@ -33,11 +32,10 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager;
import java.io.File;
import java.util.Map;
-import java.util.Objects;
public class QueueTestBase extends StoreTestBase {
- protected MessageStore createMessageStore(String baseDir, boolean extent, CQType cqType) throws Exception {
+ protected MessageStore createMessageStore(String baseDir, boolean extent) throws Exception {
if (baseDir == null) {
baseDir = createBaseDir();
}
@@ -56,28 +54,18 @@ public class QueueTestBase extends StoreTestBase {
messageStoreConfig.setMaxTransferBytesOnMessageInMemory(1024 * 1024);
messageStoreConfig.setMaxTransferCountOnMessageInDisk(1024);
messageStoreConfig.setMaxTransferCountOnMessageInMemory(1024);
- messageStoreConfig.setDefaultCQType(cqType.name());
messageStoreConfig.setFlushIntervalCommitLog(1);
messageStoreConfig.setFlushCommitLogThoroughInterval(2);
- if (Objects.equals(CQType.BatchCQ, cqType)) {
- return new StreamMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), new MessageArrivingListener() {
- @Override
- public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
- byte[] filterBitMap, Map<String, String> properties) {
+ DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), new MessageArrivingListener() {
+ @Override
+ public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
+ byte[] filterBitMap, Map<String, String> properties) {
- }
- }, new BrokerConfig());
- } else {
- return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), new MessageArrivingListener() {
- @Override
- public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
- byte[] filterBitMap, Map<String, String> properties) {
-
- }
- }, new BrokerConfig());
- }
+ }
+ }, new BrokerConfig());
+ return messageStore;
}
public MessageExtBrokerInner buildMessage(String topic, int batchNum) {
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
index 8287d81..20149d4 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
@@ -50,20 +50,20 @@ public class MQAdminTestUtils {
private static Logger log = Logger.getLogger(MQAdminTestUtils.class);
public static boolean createTopic(String nameSrvAddr, String clusterName, String topic,
- int queueNum) {
+ int queueNum, Map<String, String> attributes) {
int defaultWaitTime = 5;
- return createTopic(nameSrvAddr, clusterName, topic, queueNum, defaultWaitTime);
+ return createTopic(nameSrvAddr, clusterName, topic, queueNum, attributes, defaultWaitTime);
}
public static boolean createTopic(String nameSrvAddr, String clusterName, String topic,
- int queueNum, int waitTimeSec) {
+ int queueNum, Map<String, String> attributes, int waitTimeSec) {
boolean createResult = false;
DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
mqAdminExt.setInstanceName(UUID.randomUUID().toString());
mqAdminExt.setNamesrvAddr(nameSrvAddr);
try {
mqAdminExt.start();
- mqAdminExt.createTopic(clusterName, topic, queueNum);
+ mqAdminExt.createTopic(clusterName, topic, queueNum, attributes);
} catch (Exception e) {
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index a4e07fb..32af3bd 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -37,7 +37,7 @@ import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.store.queue.CQType;
+import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
@@ -61,18 +61,14 @@ public class BaseConf {
//the logic queue test need at least three brokers
protected final static String broker3Name;
protected final static String clusterName;
- protected final static String steamClusterName = "steam-cluster";
protected final static int brokerNum;
protected final static int waitTime = 5;
protected final static int consumeTime = 2 * 60 * 1000;
protected final static int QUEUE_NUMBERS = 8;
protected final static NamesrvController namesrvController;
- protected static BrokerController brokerController1;
- protected static BrokerController brokerController2;
- protected static BrokerController brokerController3;
- protected static BrokerController streamBrokerController1;
- protected static BrokerController streamBrokerController2;
- protected static BrokerController streamBrokerController3;
+ protected final static BrokerController brokerController1;
+ protected final static BrokerController brokerController2;
+ protected final static BrokerController brokerController3;
protected final static List<BrokerController> brokerControllerList;
protected final static Map<String, BrokerController> brokerControllerMap;
protected final static List<Object> mqClients = new ArrayList<Object>();
@@ -80,7 +76,7 @@ public class BaseConf {
private final static Logger log = Logger.getLogger(BaseConf.class);
static {
- System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
+ System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
namesrvController = IntegrationTestBase.createAndStartNamesrv();
nsAddr = "127.0.0.1:" + namesrvController.getNettyServerConfig().getListenPort();
brokerController1 = IntegrationTestBase.createAndStartBroker(nsAddr);
@@ -91,9 +87,6 @@ public class BaseConf {
broker2Name = brokerController2.getBrokerConfig().getBrokerName();
broker3Name = brokerController3.getBrokerConfig().getBrokerName();
brokerNum = 3;
- streamBrokerController1 = IntegrationTestBase.createAndStartBroker(nsAddr, CQType.BatchCQ.toString(), steamClusterName);
- streamBrokerController2 = IntegrationTestBase.createAndStartBroker(nsAddr, CQType.BatchCQ.toString(), steamClusterName);
- streamBrokerController3 = IntegrationTestBase.createAndStartBroker(nsAddr, CQType.BatchCQ.toString(), steamClusterName);
brokerControllerList = ImmutableList.of(brokerController1, brokerController2, brokerController3);
brokerControllerMap = brokerControllerList.stream().collect(Collectors.toMap(input -> input.getBrokerConfig().getBrokerName(), Function.identity()));
}
@@ -124,7 +117,7 @@ public class BaseConf {
public static String initTopic() {
String topic = "tt-" + MQRandomUtils.getRandomTopic();
- IntegrationTestBase.initTopic(topic, nsAddr, clusterName);
+ IntegrationTestBase.initTopic(topic, nsAddr, clusterName, CQType.SimpleCQ);
return topic;
}
@@ -171,9 +164,9 @@ public class BaseConf {
}
public static RMQNormalProducer getProducer(String nsAddr, String topic, String producerGoup,
- String instanceName) {
+ String instanceName) {
RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic, producerGoup,
- instanceName);
+ instanceName);
if (debug) {
producer.setDebug();
}
@@ -191,31 +184,31 @@ public class BaseConf {
}
public static RMQNormalConsumer getConsumer(String nsAddr, String topic, String subExpression,
- AbstractListener listener) {
+ AbstractListener listener) {
return getConsumer(nsAddr, topic, subExpression, listener, false);
}
public static RMQNormalConsumer getConsumer(String nsAddr, String topic, String subExpression,
- AbstractListener listener, boolean useTLS) {
+ AbstractListener listener, boolean useTLS) {
String consumerGroup = initConsumerGroup();
return getConsumer(nsAddr, consumerGroup, topic, subExpression, listener, useTLS);
}
public static RMQNormalConsumer getConsumer(String nsAddr, String consumerGroup, String topic,
- String subExpression, AbstractListener listener) {
+ String subExpression, AbstractListener listener) {
return getConsumer(nsAddr, consumerGroup, topic, subExpression, listener, false);
}
public static RMQNormalConsumer getConsumer(String nsAddr, String consumerGroup, String topic,
- String subExpression, AbstractListener listener, boolean useTLS) {
+ String subExpression, AbstractListener listener, boolean useTLS) {
RMQNormalConsumer consumer = ConsumerFactory.getRMQNormalConsumer(nsAddr, consumerGroup,
- topic, subExpression, listener, useTLS);
+ topic, subExpression, listener, useTLS);
if (debug) {
consumer.setDebug();
}
mqClients.add(consumer);
log.info(String.format("consumer[%s] start,topic[%s],subExpression[%s]", consumerGroup,
- topic, subExpression));
+ topic, subExpression));
return consumer;
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
index a74fd21..398bd19 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -20,13 +20,18 @@ package org.apache.rocketmq.test.base;
import com.google.common.truth.Truth;
import java.io.File;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.TopicAttributes;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -34,10 +39,8 @@ import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.store.queue.CQType;
import org.apache.rocketmq.test.util.MQAdminTestUtils;
import org.apache.rocketmq.test.util.TestUtils;
-import org.assertj.core.util.Strings;
public class IntegrationTestBase {
public static InternalLogger logger = InternalLoggerFactory.getLogger(IntegrationTestBase.class);
@@ -142,34 +145,6 @@ public class IntegrationTestBase {
storeConfig.setMaxIndexNum(INDEX_NUM);
storeConfig.setMaxHashSlotNum(INDEX_NUM * 4);
storeConfig.setDeleteWhen("01;02;03;04;05;06;07;08;09;10;11;12;13;14;15;16;17;18;19;20;21;22;23;00");
- storeConfig.setDefaultCQType(CQType.SimpleCQ.toString());
- storeConfig.setMaxTransferCountOnMessageInMemory(1024);
- storeConfig.setMaxTransferCountOnMessageInDisk(1024);
- return createAndStartBroker(storeConfig, brokerConfig);
- }
-
- public static BrokerController createAndStartBroker(String nsAddr, String cqType, String cluster) {
- String baseDir = createBaseDir();
- BrokerConfig brokerConfig = new BrokerConfig();
- MessageStoreConfig storeConfig = new MessageStoreConfig();
- brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.incrementAndGet());
- brokerConfig.setBrokerIP1("127.0.0.1");
- brokerConfig.setNamesrvAddr(nsAddr);
- brokerConfig.setEnablePropertyFilter(true);
- brokerConfig.setLoadBalancePollNameServerInterval(500);
- brokerConfig.setBrokerClusterName(cluster);
- storeConfig.setStorePathRootDir(baseDir);
- storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
- storeConfig.setMappedFileSizeCommitLog(COMMIT_LOG_SIZE);
- storeConfig.setMaxIndexNum(INDEX_NUM);
- storeConfig.setMaxHashSlotNum(INDEX_NUM * 4);
- storeConfig.setDeleteWhen("01;02;03;04;05;06;07;08;09;10;11;12;13;14;15;16;17;18;19;20;21;22;23;00");
-
- if (!Strings.isNullOrEmpty(cqType)) {
- storeConfig.setDefaultCQType(cqType);
- } else {
- storeConfig.setDefaultCQType(CQType.SimpleCQ.toString());
- }
storeConfig.setMaxTransferCountOnMessageInMemory(1024);
storeConfig.setMaxTransferCountOnMessageInDisk(1024);
return createAndStartBroker(storeConfig, brokerConfig);
@@ -193,17 +168,21 @@ public class IntegrationTestBase {
return brokerController;
}
- public static boolean initTopic(String topic, String nsAddr, String clusterName, int queueNumbers) {
+ public static boolean initTopic(String topic, String nsAddr, String clusterName, int queueNumbers, CQType cqType) {
long startTime = System.currentTimeMillis();
boolean createResult;
while (true) {
- createResult = MQAdminTestUtils.createTopic(nsAddr, clusterName, topic, queueNumbers);
+ Map<String, String> attributes = new HashMap<>();
+ if (!Objects.equals(CQType.SimpleCQ, cqType)) {
+ attributes.put("+" + TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), cqType.toString());
+ }
+ createResult = MQAdminTestUtils.createTopic(nsAddr, clusterName, topic, queueNumbers, attributes);
if (createResult) {
break;
} else if (System.currentTimeMillis() - startTime > topicCreateTime) {
Truth.assertWithMessage(String.format("topic[%s] is created failed after:%d ms", topic,
- System.currentTimeMillis() - startTime)).fail();
+ System.currentTimeMillis() - startTime)).fail();
break;
} else {
TestUtils.waitForMoment(500);
@@ -214,8 +193,8 @@ public class IntegrationTestBase {
return createResult;
}
- public static boolean initTopic(String topic, String nsAddr, String clusterName) {
- return initTopic(topic, nsAddr, clusterName, BaseConf.QUEUE_NUMBERS);
+ public static boolean initTopic(String topic, String nsAddr, String clusterName, CQType cqType) {
+ return initTopic(topic, nsAddr, clusterName, BaseConf.QUEUE_NUMBERS, cqType);
}
public static void deleteFile(File file) {
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java b/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java
index dd71dd1..0e1bf26 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java
@@ -30,7 +30,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.store.queue.CQType;
+import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.base.IntegrationTestBase;
import org.apache.rocketmq.test.factory.ConsumerFactory;
@@ -63,7 +63,6 @@ public class DLedgerProduceAndConsumeIT {
storeConfig.setdLegerGroup(brokerName);
storeConfig.setdLegerSelfId(selfId);
storeConfig.setdLegerPeers(peers);
- storeConfig.setDefaultCQType(CQType.SimpleCQ.toString());
return storeConfig;
}
@@ -83,7 +82,7 @@ public class DLedgerProduceAndConsumeIT {
String topic = UUID.randomUUID().toString();
String consumerGroup = UUID.randomUUID().toString();
- IntegrationTestBase.initTopic(topic, BaseConf.nsAddr, cluster, 1);
+ IntegrationTestBase.initTopic(topic, BaseConf.nsAddr, cluster, 1, CQType.SimpleCQ);
DefaultMQProducer producer = ProducerFactory.getRMQProducer(BaseConf.nsAddr);
DefaultMQPullConsumer consumer = ConsumerFactory.getRMQPullConsumer(BaseConf.nsAddr, consumerGroup);
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java
index bf0b9e5..3a649ed 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java
@@ -23,7 +23,6 @@ import java.util.Random;
import java.util.UUID;
import org.apache.log4j.Logger;
-import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
@@ -32,14 +31,14 @@ import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.TopicAttributes;
+import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.StreamMessageStore;
-import org.apache.rocketmq.store.queue.CQType;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.base.IntegrationTestBase;
import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT;
@@ -101,23 +100,23 @@ public class BatchSendIT extends BaseConf {
@Test
public void testBatchSend_SysInnerBatch() throws Exception {
- waitBrokerRegistered(nsAddr, steamClusterName, brokerNum);
-
- Assert.assertTrue(streamBrokerController1.getMessageStore() instanceof StreamMessageStore);
- Assert.assertTrue(streamBrokerController2.getMessageStore() instanceof StreamMessageStore);
- Assert.assertTrue(streamBrokerController3.getMessageStore() instanceof StreamMessageStore);
+ waitBrokerRegistered(nsAddr, clusterName, brokerNum);
String batchTopic = UUID.randomUUID().toString();
- IntegrationTestBase.initTopic(batchTopic, nsAddr, steamClusterName);
- Assert.assertEquals(8, streamBrokerController1.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
- Assert.assertEquals(8, streamBrokerController2.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
- Assert.assertEquals(8, streamBrokerController3.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
- Assert.assertEquals(-1, streamBrokerController1.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
- Assert.assertEquals(-1, streamBrokerController2.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
- Assert.assertEquals(-1, streamBrokerController3.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
- Assert.assertEquals(0, streamBrokerController1.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));
- Assert.assertEquals(0, streamBrokerController2.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));
- Assert.assertEquals(0, streamBrokerController3.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));
+ IntegrationTestBase.initTopic(batchTopic, nsAddr, clusterName, CQType.BatchCQ);
+
+ Assert.assertEquals(CQType.BatchCQ.toString(), brokerController1.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getAttributes().get(TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName()));
+ Assert.assertEquals(CQType.BatchCQ.toString(), brokerController2.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getAttributes().get(TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName()));
+ Assert.assertEquals(CQType.BatchCQ.toString(), brokerController3.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getAttributes().get(TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName()));
+ Assert.assertEquals(8, brokerController1.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
+ Assert.assertEquals(8, brokerController2.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
+ Assert.assertEquals(8, brokerController3.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
+ Assert.assertEquals(-1, brokerController1.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
+ Assert.assertEquals(-1, brokerController2.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
+ Assert.assertEquals(-1, brokerController3.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
+ Assert.assertEquals(0, brokerController1.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));
+ Assert.assertEquals(0, brokerController2.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));
+ Assert.assertEquals(0, brokerController3.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));
DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
MessageQueue messageQueue = producer.fetchPublishMessageQueues(batchTopic).iterator().next();
@@ -168,7 +167,7 @@ public class BatchSendIT extends BaseConf {
Assert.assertTrue(brokerController3.getMessageStore() instanceof DefaultMessageStore);
String batchTopic = UUID.randomUUID().toString();
- IntegrationTestBase.initTopic(batchTopic, nsAddr, clusterName);
+ IntegrationTestBase.initTopic(batchTopic, nsAddr, clusterName, CQType.SimpleCQ);
Assert.assertEquals(8, brokerController1.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
Assert.assertEquals(8, brokerController2.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
Assert.assertEquals(8, brokerController3.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 6f551d3..378f7db 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -95,13 +95,13 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
- public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
- createTopic(key, newTopic, queueNum, 0);
+ public void createTopic(String key, String newTopic, int queueNum, Map<String, String> attributes) throws MQClientException {
+ createTopic(key, newTopic, queueNum, 0, attributes);
}
@Override
- public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
- defaultMQAdminExtImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
+ public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes) throws MQClientException {
+ defaultMQAdminExtImpl.createTopic(key, newTopic, queueNum, topicSysFlag, attributes);
}
@Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index b7e4816..f9b4822 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -77,11 +77,8 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
-import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
-import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
@@ -95,9 +92,6 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.apache.rocketmq.tools.admin.api.TrackType;
-import static org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils.checkAndBuildMappingItems;
-import static org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils.getMappingDetailFromConfig;
-
public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
private static final Set<String> SYSTEM_GROUP_SET = new HashSet<String>();
@@ -1063,13 +1057,13 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
- createTopic(key, newTopic, queueNum, 0);
+ public void createTopic(String key, String newTopic, int queueNum, Map<String, String> attributes) throws MQClientException {
+ createTopic(key, newTopic, queueNum, 0, null);
}
@Override
- public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
- this.mqClientInstance.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag);
+ public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes) throws MQClientException {
+ this.mqClientInstance.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag, attributes);
}
@Override
@@ -1176,7 +1170,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
@Override
public void setMessageRequestMode(final String brokerAddr, final String topic, final String consumerGroup, final
- MessageRequestMode mode, final int popShareQueueNum, final long timeoutMillis)
+ MessageRequestMode mode, final int popShareQueueNum, final long timeoutMillis)
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQClientException {
this.mqClientInstance.getMQClientAPIImpl().setMessageRequestMode(brokerAddr, topic, consumerGroup, mode, popShareQueueNum, timeoutMillis);
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java
index 3caa477..284900b 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java
@@ -23,7 +23,6 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
-import org.apache.rocketmq.common.TopicAttributes;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.attribute.AttributeParser;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;