You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/09/13 12:09:25 UTC

[incubator-inlong] branch master updated: [INLONG-1544] Add replica consumption logic in broker node (#1545)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new cb67782  [INLONG-1544] Add replica consumption logic in broker node (#1545)
cb67782 is described below

commit cb677825244f5f9e01f43cd58836343357a20672
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Sep 13 20:09:18 2021 +0800

    [INLONG-1544] Add replica consumption logic in broker node (#1545)
---
 .../server/broker/msgstore/MessageStore.java       | 139 +++++++++++++++++++--
 .../broker/msgstore/MessageStoreManager.java       |  17 +++
 .../server/broker/msgstore/StoreService.java       |   2 +
 .../server/broker/msgstore/disk/MsgFileStore.java  |  25 ++--
 .../server/broker/msgstore/mem/MsgMemStore.java    |  54 ++++----
 .../server/broker/nodeinfo/ReplicaNodeInfo.java    |  62 +++++++++
 .../tubemq/server/broker/utils/DataStoreUtils.java |  20 ++-
 .../tubemq/server/common/TServerConstants.java     |   2 +
 .../server/common/utils/WebParameterUtils.java     |   6 +-
 .../broker/msgstore/mem/MsgMemStoreTest.java       |   2 +-
 10 files changed, 280 insertions(+), 49 deletions(-)

diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
index 4d2d7fd..c108677 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
@@ -21,6 +21,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -47,6 +48,7 @@ import org.apache.inlong.tubemq.server.broker.msgstore.mem.GetCacheMsgResult;
 import org.apache.inlong.tubemq.server.broker.msgstore.mem.MsgMemStatisInfo;
 import org.apache.inlong.tubemq.server.broker.msgstore.mem.MsgMemStore;
 import org.apache.inlong.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
+import org.apache.inlong.tubemq.server.broker.nodeinfo.ReplicaNodeInfo;
 import org.apache.inlong.tubemq.server.broker.stats.CountItem;
 import org.apache.inlong.tubemq.server.broker.utils.DataStoreUtils;
 import org.apache.inlong.tubemq.server.common.utils.AppendResult;
@@ -177,6 +179,7 @@ public class MessageStore implements Closeable {
         }
         int result = 0;
         boolean inMemCache = false;
+        boolean isReplicaCsm = false;
         int maxIndexReadLength = memMaxIndexReadCnt.get();
         GetCacheMsgResult memMsgRlt = new GetCacheMsgResult(false, TErrCodeConstants.NOT_FOUND,
                 requestOffset, "Can't found Message by index in cache");
@@ -197,18 +200,20 @@ public class MessageStore implements Closeable {
                             if (reqSwitch > 2) {
                                 memMsgRlt =
                                         // read from main memory.
-                                        msgMemStore.getMessages(consumerNodeInfo.getLastDataRdOffset(),
+                                        msgMemStore.getMessages(isReplicaCsm, false,
+                                                consumerNodeInfo.getLastDataRdOffset(),
                                                 requestOffset, msgStoreMgr.getMaxMsgTransferSize(),
-                                                maxIndexReadLength, partitionId, false,
+                                                maxIndexReadLength, partitionId,
                                                 consumerNodeInfo.isFilterConsume(),
                                                 consumerNodeInfo.getFilterCondCodeSet());
                             }
                         } else {
                             // read from backup memory.
                             memMsgRlt =
-                                    msgMemStoreBeingFlush.getMessages(consumerNodeInfo.getLastDataRdOffset(),
+                                    msgMemStoreBeingFlush.getMessages(isReplicaCsm, true,
+                                            consumerNodeInfo.getLastDataRdOffset(),
                                             requestOffset, msgStoreMgr.getMaxMsgTransferSize(),
-                                            maxIndexReadLength, partitionId, true,
+                                            maxIndexReadLength, partitionId,
                                             consumerNodeInfo.isFilterConsume(),
                                             consumerNodeInfo.getFilterCondCodeSet());
                         }
@@ -228,8 +233,8 @@ public class MessageStore implements Closeable {
                         final StringBuilder strBuffer = new StringBuilder(512);
                         for (ByteBuffer dataBuffer : memMsgRlt.cacheMsgList) {
                             ClientBroker.TransferedMessage transferedMessage =
-                                    DataStoreUtils.getTransferMsg(dataBuffer,
-                                            dataBuffer.array().length,
+                                    DataStoreUtils.getTransferMsg(isReplicaCsm,
+                                            dataBuffer, dataBuffer.array().length,
                                             countMap, statisKeyBase, strBuffer);
                             if (transferedMessage != null) {
                                 transferedMessageList.add(transferedMessage);
@@ -277,7 +282,7 @@ public class MessageStore implements Closeable {
             msgSizeLimit = this.maxAllowRdSize;
         }
         GetMessageResult retResult =
-            msgFileStore.getMessages(partitionId,
+            msgFileStore.getMessages(isReplicaCsm, partitionId,
                 consumerNodeInfo.getLastDataRdOffset(), reqNewOffset,
                 indexBuffer, consumerNodeInfo.isFilterConsume(),
                 consumerNodeInfo.getFilterCondCodeSet(),
@@ -362,6 +367,126 @@ public class MessageStore implements Closeable {
         return false;
     }
 
+    /***
+     * Get batch message from message store. Support the given offset.
+     *
+     * @param requestOffset
+     * @param partitionId
+     * @param replicaNodeInfo
+     * @param statisKeyBase
+     * @param msgSizeLimit
+     * @return
+     * @throws IOException
+     */
+    public GetMessageResult getBathMessages(long requestOffset, int partitionId,
+                                            ReplicaNodeInfo replicaNodeInfo,
+                                            String statisKeyBase,
+                                            int msgSizeLimit) throws IOException {
+        // #lizard forgives
+        if (this.closed.get()) {
+            throw new IllegalStateException(new StringBuilder(512)
+                    .append("[Data Store] Closed MessageStore for storeKey ")
+                    .append(this.storeKey).toString());
+        }
+        int result = 0;
+        boolean inMemCache = false;
+        boolean isReplicaCsm = true;
+        int maxIndexReadLength = memMaxIndexReadCnt.get();
+        GetCacheMsgResult memMsgRlt = new GetCacheMsgResult(false, TErrCodeConstants.NOT_FOUND,
+                requestOffset, "Can't found Message by index in cache");
+        // in read memory situation, read main memory or backup memory by consumer's config.
+        long maxIndexOffset = TBaseConstants.META_VALUE_UNDEFINED;
+        if (requestOffset >= this.msgFileStore.getIndexMaxOffset()) {
+            this.writeCacheMutex.readLock().lock();
+            try {
+                maxIndexOffset = this.msgMemStore.getIndexLastWritePos();
+                result = this.msgMemStoreBeingFlush.isOffsetInHold(requestOffset);
+                if (result > 0) {
+                    // read from main memory.
+                    inMemCache = true;
+                    memMsgRlt =
+                            msgMemStore.getMessages(isReplicaCsm, false,
+                                    replicaNodeInfo.getLastDataRdOffset(),
+                                    requestOffset, msgStoreMgr.getMaxMsgTransferSize(),
+                                    maxIndexReadLength, partitionId,
+                                    false, Collections.EMPTY_SET);
+
+                } else if (result == 0) {
+                    // read from backup memory.
+                    inMemCache = true;
+                    memMsgRlt =
+                            msgMemStoreBeingFlush.getMessages(isReplicaCsm, true,
+                                    replicaNodeInfo.getLastDataRdOffset(),
+                                    requestOffset, msgStoreMgr.getMaxMsgTransferSize(),
+                                    maxIndexReadLength, partitionId,
+                                    false, Collections.EMPTY_SET);
+                }
+            } finally {
+                this.writeCacheMutex.readLock().unlock();
+            }
+        }
+        if (inMemCache) {
+            // return not found when data is under memory sink operation.
+            if (memMsgRlt.isSuccess) {
+                HashMap<String, CountItem> countMap =
+                        new HashMap<>();
+                List<ClientBroker.TransferedMessage> transferedMessageList =
+                        new ArrayList<>();
+                if (!memMsgRlt.cacheMsgList.isEmpty()) {
+                    final StringBuilder strBuffer = new StringBuilder(512);
+                    for (ByteBuffer dataBuffer : memMsgRlt.cacheMsgList) {
+                        ClientBroker.TransferedMessage transferedMessage =
+                                DataStoreUtils.getTransferMsg(isReplicaCsm,
+                                        dataBuffer, dataBuffer.array().length,
+                                        countMap, statisKeyBase, strBuffer);
+                        if (transferedMessage != null) {
+                            transferedMessageList.add(transferedMessage);
+                        }
+                    }
+                }
+                GetMessageResult getResult =
+                        new GetMessageResult(true, 0, memMsgRlt.errInfo, requestOffset,
+                                memMsgRlt.dltOffset, memMsgRlt.lastRdDataOff,
+                                memMsgRlt.totalMsgSize, countMap, transferedMessageList);
+                getResult.setMaxOffset(maxIndexOffset);
+                return getResult;
+            } else {
+                return new GetMessageResult(false, memMsgRlt.retCode, requestOffset,
+                        memMsgRlt.dltOffset, memMsgRlt.errInfo);
+            }
+        }
+        // read from file, before reading, adjust request's offset.
+        long reqNewOffset = Math.max(requestOffset, this.msgFileStore.getIndexMinOffset());
+        maxIndexReadLength = fileMaxIndexReadSize.get();
+        final ByteBuffer indexBuffer = ByteBuffer.allocate(maxIndexReadLength);
+        Segment indexRecordView =
+                this.msgFileStore.indexSlice(reqNewOffset, maxIndexReadLength);
+        if (indexRecordView == null) {
+            if (reqNewOffset < this.msgFileStore.getIndexMinOffset()) {
+                return new GetMessageResult(false, TErrCodeConstants.MOVED,
+                        reqNewOffset, 0, "current offset is exceed min offset!");
+            } else {
+                return new GetMessageResult(false, TErrCodeConstants.NOT_FOUND,
+                        reqNewOffset, 0, "current offset is exceed max offset!");
+            }
+        }
+        indexRecordView.read(indexBuffer, reqNewOffset);
+        indexBuffer.flip();
+        indexRecordView.relViewRef();
+        if ((msgFileStore.getDataHighMaxOffset() - replicaNodeInfo.getLastDataRdOffset()
+                >= this.tubeConfig.getDoubleDefaultDeduceReadSize())
+                && msgSizeLimit > this.maxAllowRdSize) {
+            msgSizeLimit = this.maxAllowRdSize;
+        }
+        GetMessageResult retResult =
+                msgFileStore.getMessages(isReplicaCsm, partitionId,
+                        replicaNodeInfo.getLastDataRdOffset(), reqNewOffset,
+                        indexBuffer, false, Collections.EMPTY_SET,
+                        statisKeyBase, msgSizeLimit);
+        retResult.setMaxOffset(getIndexMaxOffset());
+        return retResult;
+    }
+
     public String getCurMemMsgSizeStatisInfo(boolean needRefresh) {
         return msgMemStatisInfo.getCurMsgSizeStatisInfo(needRefresh);
     }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
index 614bdd1..a88bf32 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -277,6 +277,23 @@ public class MessageStoreManager implements StoreService {
     }
 
     /***
+     * Get message store by topic and storeId.
+     *
+     * @param topic    topic name
+     * @param storeId  store id
+     * @return
+     */
+    @Override
+    public MessageStore getMessageStoresByTopicAndStoreId(String topic, int storeId) {
+        final ConcurrentHashMap<Integer, MessageStore> map
+                = this.dataStores.get(topic);
+        if (map != null) {
+            return map.get(storeId);
+        }
+        return null;
+    }
+
+    /***
      * Get or create message store.
      *
      * @param topic
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/StoreService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/StoreService.java
index 7ab2328..444271f 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/StoreService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/StoreService.java
@@ -37,6 +37,8 @@ public interface StoreService {
 
     Collection<MessageStore> getMessageStoresByTopic(String topic);
 
+    MessageStore getMessageStoresByTopicAndStoreId(String topic, int storeId);
+
     MessageStore getOrCreateMessageStore(String topic,
                                          int partition) throws Throwable;
 
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index e47a069..9ee6fe0 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -220,6 +220,7 @@ public class MsgFileStore implements Closeable {
     /***
      * Get message from index and data files.
      *
+     * @param isReplicaCsm
      * @param partitionId
      * @param lastRdOffset
      * @param reqOffset
@@ -230,12 +231,13 @@ public class MsgFileStore implements Closeable {
      * @param maxMsgTransferSize
      * @return
      */
-    public GetMessageResult getMessages(final int partitionId, final long lastRdOffset,
-                                        final long reqOffset, final ByteBuffer indexBuffer,
-                                        final boolean isFilterConsume,
-                                        final Set<Integer> filterKeySet,
-                                        final String statisKeyBase,
-                                        final int maxMsgTransferSize) {
+    public GetMessageResult getMessages(boolean isReplicaCsm,
+                                        int partitionId, long lastRdOffset,
+                                        long reqOffset, ByteBuffer indexBuffer,
+                                        boolean isFilterConsume,
+                                        Set<Integer> filterKeySet,
+                                        String statisKeyBase,
+                                        int maxMsgTransferSize) {
         // #lizard forgives
         // Orderly read from index file, then random read from data file.
         int retCode = 0;
@@ -285,9 +287,9 @@ public class MsgFileStore implements Closeable {
                 break;
             }
             // conduct filter operation.
-            if (curIndexPartitionId != partitionId
-                    || (isFilterConsume
-                    && !filterKeySet.contains(curIndexKeyCode))) {
+            if (!isReplicaCsm
+                    && (curIndexPartitionId != partitionId
+                    || (isFilterConsume && !filterKeySet.contains(curIndexKeyCode)))) {
                 lastRdDataOffset = maxDataLimitOffset;
                 readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN;
                 continue;
@@ -340,8 +342,9 @@ public class MsgFileStore implements Closeable {
             readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN;
             lastRdDataOffset = maxDataLimitOffset;
             ClientBroker.TransferedMessage transferedMessage =
-                    DataStoreUtils.getTransferMsg(dataBuffer,
-                            curIndexDataSize, countMap, statisKeyBase, sBuilder);
+                    DataStoreUtils.getTransferMsg(isReplicaCsm,
+                            dataBuffer, curIndexDataSize,
+                            countMap, statisKeyBase, sBuilder);
             if (transferedMessage == null) {
                 continue;
             }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
index 4bf7bc4..1550754 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
@@ -124,21 +124,21 @@ public class MsgMemStore implements Closeable {
     /***
      * Read from memory, read index, then data.
      *
+     * @param isReplicaCsm
+     * @param isSecond
      * @param lstRdDataOffset
      * @param lstRdIndexOffset
      * @param maxReadSize
      * @param maxReadCount
      * @param partitionId
-     * @param isSecond
      * @param isFilterConsume
      * @param filterKeySet
      * @return
      */
-    public GetCacheMsgResult getMessages(final long lstRdDataOffset, final long lstRdIndexOffset,
-                                         final int maxReadSize, final int maxReadCount,
-                                         final int partitionId, final boolean isSecond,
-                                         final boolean isFilterConsume,
-                                         final Set<Integer> filterKeySet) {
+    public GetCacheMsgResult getMessages(boolean isReplicaCsm, boolean isSecond,
+                                         long lstRdDataOffset, long lstRdIndexOffset,
+                                         int maxReadSize, int maxReadCount, int partitionId,
+                                         boolean isFilterConsume, Set<Integer> filterKeySet) {
         // #lizard forgives
         Integer lastWritePos = 0;
         boolean hasMsg = false;
@@ -159,22 +159,26 @@ public class MsgMemStore implements Closeable {
         int startReadOff = (int) (lstRdIndexOffset - this.writeIndexStartPos);
         this.writeLock.lock();
         try {
-            if (isFilterConsume) {
-                // filter conduct. accelerate by keysMap.
-                for (Integer keyCode : filterKeySet) {
-                    if (keyCode != null) {
-                        lastWritePos = this.keysMap.get(keyCode);
-                        if ((lastWritePos != null) && (lastWritePos >= startReadOff)) {
-                            hasMsg = true;
-                            break;
+            if (isReplicaCsm) {
+                hasMsg = true;
+            } else {
+                if (isFilterConsume) {
+                    // filter conduct. accelerate by keysMap.
+                    for (Integer keyCode : filterKeySet) {
+                        if (keyCode != null) {
+                            lastWritePos = this.keysMap.get(keyCode);
+                            if ((lastWritePos != null) && (lastWritePos >= startReadOff)) {
+                                hasMsg = true;
+                                break;
+                            }
                         }
                     }
-                }
-            } else {
-                // orderly consume by partition id.
-                lastWritePos = this.queuesMap.get(partitionId);
-                if ((lastWritePos != null) && (lastWritePos >= startReadOff)) {
-                    hasMsg = true;
+                } else {
+                    // orderly consume by partition id.
+                    lastWritePos = this.queuesMap.get(partitionId);
+                    if ((lastWritePos != null) && (lastWritePos >= startReadOff)) {
+                        hasMsg = true;
+                    }
                 }
             }
             currDataOffset = this.cacheDataOffset.get();
@@ -230,10 +234,12 @@ public class MsgMemStore implements Closeable {
                 readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN;
                 continue;
             }
-            if ((cPartitionId != partitionId)
-                    || (isFilterConsume && (!filterKeySet.contains(cKeyCode)))) {
-                readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN;
-                continue;
+            if (!isReplicaCsm) {
+                if ((cPartitionId != partitionId)
+                        || (isFilterConsume && (!filterKeySet.contains(cKeyCode)))) {
+                    readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN;
+                    continue;
+                }
             }
             // read data file.
             byte[] tmpArray = new byte[cDataSize];
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ReplicaNodeInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ReplicaNodeInfo.java
new file mode 100644
index 0000000..44292ee
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ReplicaNodeInfo.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.broker.nodeinfo;
+
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import org.apache.inlong.tubemq.server.broker.msgstore.MessageStoreManager;
+
+
+public class ReplicaNodeInfo {
+
+    private final MessageStoreManager storeManager;
+    // replicaNode id
+    private String replicaNodeId;
+    // replica node's address
+    private String rmtAddrInfo;
+    private String topicName;
+    private int storeId;
+    private String storeKey;
+    private long createTime = System.currentTimeMillis();
+    private long lastGetTime = 0L;
+    private long lastDataRdOffset = TBaseConstants.META_VALUE_UNDEFINED;
+    private int sentMsgSize = 0;
+
+    public ReplicaNodeInfo(MessageStoreManager storeManager,
+                           String replicaNodeId, String topicName, int storeId) {
+        this.storeManager = storeManager;
+        this.replicaNodeId = replicaNodeId;
+        this.topicName = topicName;
+        this.storeId = storeId;
+    }
+
+    public long getLastGetTime() {
+        return lastGetTime;
+    }
+
+    public void setLastGetTime(long lastGetTime) {
+        this.lastGetTime = lastGetTime;
+    }
+
+    public long getLastDataRdOffset() {
+        return lastDataRdOffset;
+    }
+
+    public void setLastDataRdOffset(long lastDataRdOffset) {
+        this.lastDataRdOffset = lastDataRdOffset;
+    }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java
index 3ea36cb..0d0fa4e 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java
@@ -107,6 +107,7 @@ public class DataStoreUtils {
     /***
      * Convert inner message to protobuf format, then reply to client.
      *
+     * @param isReplicaCsm
      * @param dataBuffer
      * @param dataTotalSize
      * @param countMap
@@ -114,10 +115,11 @@ public class DataStoreUtils {
      * @param sBuilder
      * @return
      */
-    public static ClientBroker.TransferedMessage getTransferMsg(final ByteBuffer dataBuffer, int dataTotalSize,
-                                                                final HashMap<String, CountItem> countMap,
-                                                                final String statisKeyBase,
-                                                                final StringBuilder sBuilder) {
+    public static ClientBroker.TransferedMessage getTransferMsg(boolean isReplicaCsm,
+                                                                ByteBuffer dataBuffer, int dataTotalSize,
+                                                                HashMap<String, CountItem> countMap,
+                                                                String statisKeyBase,
+                                                                StringBuilder sBuilder) {
         if (dataBuffer.array().length < dataTotalSize) {
             return null;
         }
@@ -136,6 +138,10 @@ public class DataStoreUtils {
         }
         final long msgId = dataBuffer.getLong(DataStoreUtils.STORE_HEADER_POS_MSGID);
         final int flag = dataBuffer.getInt(DataStoreUtils.STORE_HEADER_POS_MSGFLAG);
+        final int partitionId = dataBuffer.getInt(DataStoreUtils.STORE_HEADER_POS_QUEUEID);
+        final int indexKeyCode = dataBuffer.getInt(DataStoreUtils.STORE_HEADER_POS_KEYCODE);
+        final int reportAddr = dataBuffer.getInt(DataStoreUtils.STORE_HEADER_POS_REPORTADDR);
+        final long recvTimeInMs = dataBuffer.getLong(DataStoreUtils.STORE_HEADER_POS_RECEIVEDTIME);
         final int payLoadLen2 = payLoadLen;
         final byte[] payLoadData = new byte[payLoadLen];
         System.arraycopy(dataBuffer.array(), payLoadOffset, payLoadData, 0, payLoadLen);
@@ -145,6 +151,12 @@ public class DataStoreUtils {
         dataBuilder.setCheckSum(checkSum);
         dataBuilder.setFlag(flag);
         dataBuilder.setPayLoadData(ByteString.copyFrom(payLoadData));
+        if (isReplicaCsm) {
+            dataBuilder.setPartitionId(partitionId);
+            dataBuilder.setIndexKeyCode(indexKeyCode);
+            dataBuilder.setReportAddr(reportAddr);
+            dataBuilder.setRecvTimeInMs(recvTimeInMs);
+        }
         // get statistic data
         int attrLen = 0;
         String attribute = null;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
index fb36112..ebc39c0 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
@@ -37,8 +37,10 @@ public final class TServerConstants {
     public static final int QRY_PRIORITY_MAX_VALUE = 303;
 
     public static final int TOPIC_STOREBLOCK_NUM_MIN = 1;
+    public static final int TOPIC_STOREBLOCK_NUM_MAX = 1000;
 
     public static final int TOPIC_PARTITION_NUM_MIN = 1;
+    public static final int TOPIC_PARTITION_NUM_MAX = 9000;
 
     public static final int TOPIC_DSK_UNFLUSHTHRESHOLD_MIN = 0;
     public static final int TOPIC_DSK_UNFLUSHTHRESHOLD_DEF = 1000;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
index a5338c4..ac756b7 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
@@ -414,14 +414,16 @@ public class WebParameterUtils {
         // get numTopicStores parameter value
         if (!WebParameterUtils.getIntParamValue(paramCntr, WebFieldDef.NUMTOPICSTORES, false,
                 (defVal == null ? TBaseConstants.META_VALUE_UNDEFINED : defVal.getNumTopicStores()),
-                TServerConstants.TOPIC_STOREBLOCK_NUM_MIN, sBuffer, result)) {
+                TServerConstants.TOPIC_STOREBLOCK_NUM_MIN, TServerConstants.TOPIC_STOREBLOCK_NUM_MAX,
+                sBuffer, result)) {
             return result.isSuccess();
         }
         newConf.setNumTopicStores((int) result.retData1);
         // get numPartitions parameter value
         if (!WebParameterUtils.getIntParamValue(paramCntr, WebFieldDef.NUMPARTITIONS, false,
                 (defVal == null ? TBaseConstants.META_VALUE_UNDEFINED : defVal.getNumPartitions()),
-                TServerConstants.TOPIC_PARTITION_NUM_MIN, sBuffer, result)) {
+                TServerConstants.TOPIC_PARTITION_NUM_MIN, TServerConstants.TOPIC_PARTITION_NUM_MAX,
+                sBuffer, result)) {
             return result.isSuccess();
         }
         newConf.setNumPartitions((int) result.retData1);
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
index 4bccc36..6b7d38d 100644
--- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
@@ -53,6 +53,6 @@ public class MsgMemStoreTest {
         msgMemStore.appendMsg(msgMemStatisInfo, 0, 0,
                 System.currentTimeMillis(), 3, bf, appendResult);
         // get messages
-        GetCacheMsgResult getCacheMsgResult = msgMemStore.getMessages(0, 2, 1024, 1000, 0, false, false, null);
+        GetCacheMsgResult getCacheMsgResult = msgMemStore.getMessages(false, false, 0, 2, 1024, 1000, 0, false, null);
     }
 }