You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/12/23 12:06:03 UTC

[GitHub] [rocketmq] Erik1288 opened a new pull request #3671: Improve Batch Message Processing Throughput

Erik1288 opened a new pull request #3671:
URL: https://github.com/apache/rocketmq/pull/3671


   **Make sure set the target branch to `develop`**
   
   ## What is the purpose of the change
   
   Improve Batch Message Processing Throughput.
   
   RIP: https://docs.google.com/document/d/10hlfG6Jg36D8Kid6t8Zuf3g_63pb8yQZqwRqV2GZkEQ/edit#
   
   ## Brief changelog
   
   1. Introducing BatchConsumeQueue index to improve throughput for BatchMessage.
   2. Refactor consume queue interface.
   
   ## Verifying this change
   
   Follow this checklist to help us incorporate your contribution quickly and easily. Notice, `it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR`.
   
   - [x] Make sure there is a [Github issue](https://github.com/apache/rocketmq/issues) filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue. 
   - [x] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
   - [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
   - [x] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/rocketmq/tree/master/test).
   - [x] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test`  to make sure integration-test pass.
   - [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Git-Yang commented on a change in pull request #3671: [RIP-26] Improve Batch Message Processing Throughput

Posted by GitBox <gi...@apache.org>.
Git-Yang commented on a change in pull request #3671:
URL: https://github.com/apache/rocketmq/pull/3671#discussion_r774950441



##########
File path: broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
##########
@@ -296,14 +298,19 @@ public boolean initialize() throws CloneNotSupportedException {
 
         if (result) {
             try {
-                this.messageStore =
-                    new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
-                        this.brokerConfig);
+                MessageStore messageStore;
+                if (Objects.equals(CQType.BatchCQ.toString(), this.messageStoreConfig.getDefaultCQType())) {
+                    messageStore = new StreamMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig);
+                } else {
+                    messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig);
+                }
+
+                this.messageStore = messageStore;
                 if (messageStoreConfig.isEnableDLegerCommitLog()) {
                     DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);

Review comment:
       If BatchCQ is turned on, the type conversion here will be abnormal.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] dongeforever merged pull request #3671: [RIP-26] Improve Batch Message Processing Throughput

Posted by GitBox <gi...@apache.org>.
dongeforever merged pull request #3671:
URL: https://github.com/apache/rocketmq/pull/3671


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Git-Yang commented on a change in pull request #3671: [RIP-26] Improve Batch Message Processing Throughput

Posted by GitBox <gi...@apache.org>.
Git-Yang commented on a change in pull request #3671:
URL: https://github.com/apache/rocketmq/pull/3671#discussion_r774945006



##########
File path: store/src/main/java/org/apache/rocketmq/store/StreamMessageStore.java
##########
@@ -0,0 +1,2573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.SystemClock;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.running.RunningStats;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.config.FlushDiskType;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.config.StorePathConfigHelper;
+import org.apache.rocketmq.store.ha.HAService;
+import org.apache.rocketmq.store.index.IndexService;
+import org.apache.rocketmq.store.index.QueryOffsetResult;
+import org.apache.rocketmq.store.logfile.DefaultMappedFile;
+import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.store.queue.BatchConsumeQueue;
+import org.apache.rocketmq.store.queue.CQType;
+import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
+import org.apache.rocketmq.store.queue.ConsumeQueueStore;
+import org.apache.rocketmq.store.queue.CqUnit;
+import org.apache.rocketmq.store.queue.ReferredIterator;
+import org.apache.rocketmq.store.schedule.ScheduleMessageService;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.apache.rocketmq.store.util.PerfCounter;
+import org.apache.rocketmq.store.util.QueueTypeUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileLock;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.lang.String.format;
+
+public class StreamMessageStore implements MessageStore {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+
+    public final PerfCounter.Ticks perfs = new PerfCounter.Ticks(log);
+
+    private final MessageStoreConfig messageStoreConfig;
+    // CommitLog
+    private final CommitLog commitLog;
+
+    private final ConsumeQueueStore consumeQueueStore;
+
+    private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueueInterface>> consumeQueueTable;
+
+    protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
+
+    protected HashMap<String/* topic-queueid */, Long/* offset */> batchTopicQueueTable = new HashMap<String, Long>(1024);
+
+    private final FlushConsumeQueueService flushConsumeQueueService;
+
+    private final CleanCommitLogService cleanCommitLogService;
+
+    private final CleanConsumeQueueService cleanConsumeQueueService;
+
+    private final CorrectLogicOffsetService correctLogicOffsetService;
+
+    private final IndexService indexService;
+
+    private final AllocateMappedFileService allocateMappedFileService;
+
+    private final ReputMessageService reputMessageService;
+
+    private final HAService haService;
+
+    private final ScheduleMessageService scheduleMessageService;
+
+    private final StoreStatsService storeStatsService;
+
+    private final TransientStorePool transientStorePool;
+
+    private final RunningFlags runningFlags = new RunningFlags();
+    private final SystemClock systemClock = new SystemClock();
+
+    private final ScheduledExecutorService scheduledExecutorService =
+        Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread"));
+    private final BrokerStatsManager brokerStatsManager;
+    private final MessageArrivingListener messageArrivingListener;
+    private final BrokerConfig brokerConfig;
+
+    private volatile boolean shutdown = true;
+
+    private StoreCheckpoint storeCheckpoint;
+
+    private AtomicLong printTimes = new AtomicLong(0);
+
+    private final LinkedList<CommitLogDispatcher> dispatcherList;
+
+    private RandomAccessFile lockFile;
+
+    private FileLock lock;
+
+    boolean shutDownNormal = false;
+
+    //polish for reput
+    private ThreadPoolExecutor[] reputExecutors;
+
+    private BlockingQueue<Runnable>[] reputQueues;
+
+    private boolean isDispatchFromSenderThread;
+
+    private static final Future EMPTY_FUTURE = new Future() {
+        @Override
+        public boolean cancel(final boolean mayInterruptIfRunning) {
+            return false;
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return false;
+        }
+
+        @Override
+        public boolean isDone() {
+            return true;
+        }
+
+        @Override
+        public Object get() {
+            return null;
+        }
+
+        @Override
+        public Object get(final long timeout, final TimeUnit unit) {
+            return null;
+        }
+    };
+
+    // Max pull msg size
+    private final static int MAX_PULL_MSG_SIZE = 128 * 1024 * 1024;
+
+    public StreamMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
+                               final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
+        this.messageArrivingListener = messageArrivingListener;
+        this.brokerConfig = brokerConfig;
+        this.messageStoreConfig = messageStoreConfig;
+        this.brokerStatsManager = brokerStatsManager;
+        this.allocateMappedFileService = new AllocateMappedFileService(this);
+        if (messageStoreConfig.isEnableDLegerCommitLog()) {
+            throw new RuntimeException("dleger is not supported in this message store.");
+        }
+        this.isDispatchFromSenderThread = messageStoreConfig.isDispatchFromSenderThread();
+        this.commitLog = new CommitLog(this);
+        this.consumeQueueTable = new ConcurrentHashMap<>(32);
+        this.consumeQueueStore = new ConsumeQueueStore(this, this.messageStoreConfig, this.consumeQueueTable);
+
+        this.flushConsumeQueueService = new FlushConsumeQueueService();
+        this.cleanCommitLogService = new CleanCommitLogService();
+        this.cleanConsumeQueueService = new CleanConsumeQueueService();
+        this.correctLogicOffsetService = new CorrectLogicOffsetService();
+        this.storeStatsService = new StoreStatsService();
+        this.indexService = new IndexService(this);
+        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
+            this.haService = new HAService(this);
+        } else {
+            this.haService = null;
+        }
+        if (isDispatchFromSenderThread) {
+            this.reputMessageService = new SyncReputMessageService();
+        } else {
+            this.reputMessageService = new ReputMessageService();
+        }
+
+        this.scheduleMessageService = new ScheduleMessageService(this);
+
+        this.transientStorePool = new TransientStorePool(messageStoreConfig);
+
+        if (messageStoreConfig.isTransientStorePoolEnable()) {
+            this.transientStorePool.init();
+        }
+
+        this.allocateMappedFileService.start();
+
+        this.indexService.start();
+
+        this.dispatcherList = new LinkedList<>();
+        this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
+        this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
+
+        File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
+        DefaultMappedFile.ensureDirOK(file.getParent());
+        lockFile = new RandomAccessFile(file, "rw");
+        initAsyncReputThreads(messageStoreConfig.getDispatchCqThreads(), messageStoreConfig.getDispatchCqCacheNum());
+    }
+
+    /**
+     * @throws IOException
+     */
+    @Override
+    public boolean load() {
+        boolean result = true;
+
+        try {
+            long start = System.currentTimeMillis();
+            boolean lastExitOK = !this.isTempFileExist();
+            log.info("last shutdown {}, root dir: {}", lastExitOK ? "normally" : "abnormally", messageStoreConfig.getStorePathRootDir());
+
+            if (null != scheduleMessageService) {
+                result = result && this.scheduleMessageService.load();
+            }
+
+            // load Commit Log
+            result = result && this.commitLog.load();
+
+            // load Batch Consume Queue
+            result = result && this.loadBatchConsumeQueue();
+
+            if (result) {
+                this.storeCheckpoint =
+                    new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
+
+                this.indexService.load(lastExitOK);
+
+                this.recover(lastExitOK);
+
+                log.info("load over, and the max phy offset = {} cost = {}", this.getMaxPhyOffset(), System.currentTimeMillis() - start);
+            }
+        } catch (Exception e) {
+            log.error("load exception", e);
+            result = false;
+        }
+
+        if (!result) {
+            this.allocateMappedFileService.shutdown();
+        }
+
+        return result;
+    }
+
+    /**
+     * @throws Exception
+     */
+    @Override
+    public void start() throws Exception {
+
+        lock = lockFile.getChannel().tryLock(0, 1, false);
+        if (lock == null || lock.isShared() || !lock.isValid()) {
+            throw new RuntimeException("Lock failed,MQ already started");
+        }
+
+        lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
+        lockFile.getChannel().force(true);
+
+        if (this.getMessageStoreConfig().isDuplicationEnable()) {
+            this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
+        } else {
+            this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
+        }
+        this.reputMessageService.start();
+
+        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
+            this.haService.start();
+            this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
+        }
+
+        this.flushConsumeQueueService.start();
+        this.commitLog.start();
+        this.storeStatsService.start();
+
+        this.createTempFile();
+        this.addScheduleTask();
+        this.perfs.start();
+        this.shutdown = false;
+    }
+
+    @Override
+    public void shutdown() {
+        if (!this.shutdown) {
+            this.shutdown = true;
+
+            this.scheduledExecutorService.shutdown();
+
+            try {
+
+                Thread.sleep(1000 * 3);
+            } catch (InterruptedException e) {
+                log.error("shutdown Exception, ", e);
+            }
+
+            if (this.scheduleMessageService != null) {
+                this.scheduleMessageService.shutdown();
+            }
+            if (this.haService != null) {
+                this.haService.shutdown();
+            }
+
+            this.storeStatsService.shutdown();
+            this.indexService.shutdown();
+            this.commitLog.shutdown();
+            this.reputMessageService.shutdown();
+            this.flushConsumeQueueService.shutdown();
+            this.allocateMappedFileService.shutdown();
+            this.storeCheckpoint.flush();
+            this.storeCheckpoint.shutdown();
+
+            this.perfs.shutdown();
+
+            if (this.runningFlags.isWriteable() && dispatchBehindBytes() == 0) {
+                this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
+                shutDownNormal = true;
+            } else {
+                log.warn("the store may be wrong, so shutdown abnormally, and keep abort file. writable: {}, dispatchBehindBytes: {}, abort file: {}",
+                        this.runningFlags.isWriteable(), dispatchBehindBytes(), StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
+            }
+        }
+
+        this.transientStorePool.destroy();
+
+        if (lockFile != null && lock != null) {
+            try {
+                lock.release();
+                lockFile.close();
+            } catch (IOException e) {
+            }
+        }
+    }
+
+    @Override
+    public void destroy() {
+        this.destroyLogics();
+        this.commitLog.destroy();
+        this.indexService.destroy();
+        this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
+        this.deleteFile(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
+    }
+
+    @Override
+    public void destroyLogics() {
+        for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
+            for (ConsumeQueueInterface logic : maps.values()) {
+                this.consumeQueueStore.destroy(logic);
+            }
+        }
+    }
+
+    @Override
+    public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
+        if (this.shutdown) {
+            log.warn("message store has shutdown, so putMessage is forbidden");
+            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
+        }
+
+        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
+            long value = this.printTimes.getAndIncrement();
+            if ((value % 50000) == 0) {
+                log.warn("message store is slave mode, so putMessage is forbidden ");
+            }
+
+            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
+        }
+
+        if (msg.getProperties().containsKey(MessageConst.PROPERTY_INNER_NUM)
+                && !MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) {
+            log.warn("[BUG]The message had property {} but is not an inner batch", MessageConst.PROPERTY_INNER_NUM);
+            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+        }
+
+        if (MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) {
+            CQType cqType = QueueTypeUtils.getCQType(this);
+
+            if (!CQType.BatchCQ.equals(cqType)) {
+                log.warn("[BUG]The message is an inner batch but cq type is not batch consume queue");
+                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+            }
+        }
+
+        if (!this.runningFlags.isWriteable()) {
+            long value = this.printTimes.getAndIncrement();
+            if ((value % 50000) == 0) {
+                log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
+            }
+
+            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
+        } else {
+            this.printTimes.set(0);
+        }
+
+        int topicLen = msg.getTopic().length();
+        if (topicLen > this.messageStoreConfig.getMaxTopicLength()) {
+            log.warn("putMessage message topic[{}] length too long {}", msg.getTopic(), topicLen);
+            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+        }
+
+        if (topicLen > Byte.MAX_VALUE) {
+            log.warn("putMessage message topic[{}] length too long {}, but it is not supported by broker",
+                    msg.getTopic(), topicLen);
+            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+        }
+
+        if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
+            log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
+            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null));
+        }
+
+        if (this.isOSPageCacheBusy()) {
+            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null));
+        }
+
+        long beginTime = this.getSystemClock().now();
+        perfs.startTick("PUT_MESSAGE_TIME_MS");
+        CompletableFuture<PutMessageResult> result = this.commitLog.asyncPutMessage(msg);
+        perfs.endTick("PUT_MESSAGE_TIME_MS");
+
+        long eclipseTime = this.getSystemClock().now() - beginTime;
+        if (eclipseTime > 500) {
+            log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
+        }
+
+        return result;
+    }
+
+    @Override
+    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
+        CompletableFuture<PutMessageResult> future = asyncPutMessage(msg);
+        try {
+            return future.get(3, TimeUnit.SECONDS);
+        } catch (Throwable t) {
+            log.error("Get async put result failed", t);
+            return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
+        }
+    }
+
+    @Override
+    public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
+        CompletableFuture<PutMessageResult> future = asyncPutMessages(messageExtBatch);
+        try {
+            return future.get(3, TimeUnit.SECONDS);
+        } catch (Throwable t) {
+            log.error("Get async put result failed", t);
+            return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
+        }
+    }
+
+    @Override
+    public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
+        if (this.shutdown) {
+            log.warn("StreamMessageStore has shutdown, so putMessages is forbidden");
+            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
+        }
+
+        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
+            long value = this.printTimes.getAndIncrement();
+            if ((value % 50000) == 0) {
+                log.warn("StreamMessageStore is in slave mode, so putMessages is forbidden ");
+            }
+
+            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
+        }
+
+        if (!this.runningFlags.isWriteable()) {
+            long value = this.printTimes.getAndIncrement();
+            if ((value % 50000) == 0) {
+                log.warn("StreamMessageStore is not writable, so putMessages is forbidden " + this.runningFlags.getFlagBits());
+            }
+
+            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
+        } else {
+            this.printTimes.set(0);
+        }
+
+        int topicLen = messageExtBatch.getTopic().length();
+        if (topicLen > this.messageStoreConfig.getMaxTopicLength()) {
+            log.warn("putMessage batch message topic[{}] length too long {}", messageExtBatch.getTopic(), topicLen);
+            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+        }
+
+        if (topicLen > Byte.MAX_VALUE) {
+            log.warn("putMessage batch message topic[{}] length too long {}, but it is not supported by broker",
+                    messageExtBatch.getTopic(), topicLen);
+            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+        }
+
+        if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) {
+            log.warn("PutMessages body length too long " + messageExtBatch.getBody().length);
+            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+        }
+
+        if (this.isOSPageCacheBusy()) {
+            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null));
+        }
+
+        long beginTime = this.getSystemClock().now();
+        CompletableFuture<PutMessageResult> result = this.commitLog.asyncPutMessages(messageExtBatch);
+
+        long eclipseTime = this.getSystemClock().now() - beginTime;
+        if (eclipseTime > 500) {
+            log.warn("not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, messageExtBatch.getBody().length);
+        }
+        this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
+
+        return result;
+    }
+
+    @Override
+    public boolean isOSPageCacheBusy() {
+        long begin = this.getCommitLog().getBeginTimeInLock();
+        long diff = this.systemClock.now() - begin;
+
+        return diff < 10000000
+            && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
+    }
+
+    @Override
+    public long lockTimeMills() {
+        return this.commitLog.lockTimeMills();
+    }
+
+    @Override
+    public SystemClock getSystemClock() {
+        return systemClock;
+    }
+
+    @Override
+    public CommitLog getCommitLog() {
+        return commitLog;
+    }
+
+    public boolean isDispatchFromSenderThread() {
+        return isDispatchFromSenderThread;
+    }
+
+    @Override
+    public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
+        final int maxMsgNums,
+        final MessageFilter messageFilter) {
+        return getMessage(group, topic, queueId, offset, maxMsgNums, MAX_PULL_MSG_SIZE, messageFilter);
+    }
+
+    @Override
+    public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
+        final int maxMsgNums,
+        final int maxTotalMsgSize,
+        final MessageFilter messageFilter) {
+        if (this.shutdown) {
+            log.warn("message store has shutdown, so getMessage is forbidden");
+            return null;
+        }
+
+        if (!this.runningFlags.isReadable()) {
+            log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
+            return null;
+        }
+
+        long beginTime = this.getSystemClock().now();
+
+        GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
+        long nextBeginOffset = offset;
+        long minOffset = 0;
+        long maxOffset = 0;
+
+        GetMessageResult getResult = new GetMessageResult();
+
+        final long maxOffsetPy = this.commitLog.getMaxOffset();
+
+        ConsumeQueueInterface consumeQueue = getConsumeQueue(topic, queueId);
+        if (consumeQueue != null) {
+            minOffset = consumeQueue.getMinOffsetInQueue();
+            maxOffset = consumeQueue.getMaxOffsetInQueue();
+
+            if (maxOffset == 0) {
+                status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
+                nextBeginOffset = nextOffsetCorrection(offset, 0);
+            } else if (offset < minOffset) {
+                status = GetMessageStatus.OFFSET_TOO_SMALL;
+                nextBeginOffset = nextOffsetCorrection(offset, minOffset);
+            } else if (offset == maxOffset) {
+                status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
+                nextBeginOffset = nextOffsetCorrection(offset, offset);
+            } else if (offset > maxOffset) {
+                status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
+                if (0 == minOffset) {
+                    nextBeginOffset = nextOffsetCorrection(offset, minOffset);
+                } else {
+                    nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
+                }
+            } else {
+                final int maxFilterMessageCount = Math.max(messageStoreConfig.getPullBatchMaxMessageCount(), maxMsgNums);
+                final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
+
+                long maxPullSize = Math.max(maxTotalMsgSize, 100);
+                if (maxPullSize > MAX_PULL_MSG_SIZE) {
+                    log.warn("The max pull size is too large maxPullSize={} topic={} queueId={}", maxPullSize, topic, queueId);
+                    maxPullSize = MAX_PULL_MSG_SIZE;
+                }
+                status = GetMessageStatus.NO_MATCHED_MESSAGE;
+                long maxPhyOffsetPulling = 0;
+                int cqFileNum = 0;
+
+                while (getResult.getBufferTotalSize() <= 0
+                        && nextBeginOffset < maxOffset
+                        && cqFileNum++ < this.messageStoreConfig.getTravelCqFileNumWhenGetMessage()) {
+                    ReferredIterator<CqUnit> bufferConsumeQueue = consumeQueue.iterateFrom(nextBeginOffset);
+
+                    if (bufferConsumeQueue == null) {
+                        status = GetMessageStatus.OFFSET_FOUND_NULL;
+                        nextBeginOffset = nextOffsetCorrection(nextBeginOffset, this.consumeQueueStore.rollNextFile(consumeQueue, nextBeginOffset));
+                        log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
+                                + maxOffset + ", but access logic queue failed. Correct nextBeginOffset to " + nextBeginOffset);
+                        break;
+                    }
+
+                    try {
+                        long nextPhyFileStartOffset = Long.MIN_VALUE;
+                        while (bufferConsumeQueue.hasNext()
+                                && nextBeginOffset < maxOffset) {
+                            CqUnit cqUnit = bufferConsumeQueue.next();
+                            long offsetPy = cqUnit.getPos();
+                            int sizePy = cqUnit.getSize();
+
+                            boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
+
+                            if (cqUnit.getQueueOffset() - offset > maxFilterMessageCount) {
+                                break;
+                            }
+
+                            if (this.isTheBatchFull(sizePy, cqUnit.getBatchNum(), maxMsgNums, maxPullSize, getResult.getBufferTotalSize(), getResult.getMessageCount(),
+                                    isInDisk)) {
+                                break;
+                            }
+
+                            if (getResult.getBufferTotalSize() >= maxPullSize) {
+                                break;
+                            }
+
+                            maxPhyOffsetPulling = offsetPy;
+
+                            //Be careful, here should before the isTheBatchFull
+                            nextBeginOffset = cqUnit.getQueueOffset() + cqUnit.getBatchNum();
+
+                            if (nextPhyFileStartOffset != Long.MIN_VALUE) {
+                                if (offsetPy < nextPhyFileStartOffset) {
+                                    continue;
+                                }
+                            }
+
+                            if (messageFilter != null
+                                    && !messageFilter.isMatchedByConsumeQueue(cqUnit.getValidTagsCodeAsLong(), cqUnit.getCqExtUnit())) {

Review comment:
       For batch messages, if the consumer uses tags to consume, will the messages not be consumed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org