You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/09/13 12:22:53 UTC

[incubator-inlong] branch master updated: Revert "[INLONG-1544] Add replica consumption logic in broker node (#1545)" (#1546)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b5f5b4  Revert "[INLONG-1544] Add replica consumption logic in broker node (#1545)" (#1546)
0b5f5b4 is described below

commit 0b5f5b41957e74623e1b7652ec179e6358c9ddc5
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Sep 13 20:20:50 2021 +0800

    Revert "[INLONG-1544] Add replica consumption logic in broker node (#1545)" (#1546)
    
    This reverts commit cb677825244f5f9e01f43cd58836343357a20672.
---
 .../server/broker/msgstore/MessageStore.java       | 139 ++-------------------
 .../broker/msgstore/MessageStoreManager.java       |  17 ---
 .../server/broker/msgstore/StoreService.java       |   2 -
 .../server/broker/msgstore/disk/MsgFileStore.java  |  25 ++--
 .../server/broker/msgstore/mem/MsgMemStore.java    |  54 ++++----
 .../server/broker/nodeinfo/ReplicaNodeInfo.java    |  62 ---------
 .../tubemq/server/broker/utils/DataStoreUtils.java |  20 +--
 .../tubemq/server/common/TServerConstants.java     |   2 -
 .../server/common/utils/WebParameterUtils.java     |   6 +-
 .../broker/msgstore/mem/MsgMemStoreTest.java       |   2 +-
 10 files changed, 49 insertions(+), 280 deletions(-)

diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
index c108677..4d2d7fd 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
@@ -21,7 +21,6 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -48,7 +47,6 @@ import org.apache.inlong.tubemq.server.broker.msgstore.mem.GetCacheMsgResult;
 import org.apache.inlong.tubemq.server.broker.msgstore.mem.MsgMemStatisInfo;
 import org.apache.inlong.tubemq.server.broker.msgstore.mem.MsgMemStore;
 import org.apache.inlong.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
-import org.apache.inlong.tubemq.server.broker.nodeinfo.ReplicaNodeInfo;
 import org.apache.inlong.tubemq.server.broker.stats.CountItem;
 import org.apache.inlong.tubemq.server.broker.utils.DataStoreUtils;
 import org.apache.inlong.tubemq.server.common.utils.AppendResult;
@@ -179,7 +177,6 @@ public class MessageStore implements Closeable {
         }
         int result = 0;
         boolean inMemCache = false;
-        boolean isReplicaCsm = false;
         int maxIndexReadLength = memMaxIndexReadCnt.get();
         GetCacheMsgResult memMsgRlt = new GetCacheMsgResult(false, TErrCodeConstants.NOT_FOUND,
                 requestOffset, "Can't found Message by index in cache");
@@ -200,20 +197,18 @@ public class MessageStore implements Closeable {
                             if (reqSwitch > 2) {
                                 memMsgRlt =
                                         // read from main memory.
-                                        msgMemStore.getMessages(isReplicaCsm, false,
-                                                consumerNodeInfo.getLastDataRdOffset(),
+                                        msgMemStore.getMessages(consumerNodeInfo.getLastDataRdOffset(),
                                                 requestOffset, msgStoreMgr.getMaxMsgTransferSize(),
-                                                maxIndexReadLength, partitionId,
+                                                maxIndexReadLength, partitionId, false,
                                                 consumerNodeInfo.isFilterConsume(),
                                                 consumerNodeInfo.getFilterCondCodeSet());
                             }
                         } else {
                             // read from backup memory.
                             memMsgRlt =
-                                    msgMemStoreBeingFlush.getMessages(isReplicaCsm, true,
-                                            consumerNodeInfo.getLastDataRdOffset(),
+                                    msgMemStoreBeingFlush.getMessages(consumerNodeInfo.getLastDataRdOffset(),
                                             requestOffset, msgStoreMgr.getMaxMsgTransferSize(),
-                                            maxIndexReadLength, partitionId,
+                                            maxIndexReadLength, partitionId, true,
                                             consumerNodeInfo.isFilterConsume(),
                                             consumerNodeInfo.getFilterCondCodeSet());
                         }
@@ -233,8 +228,8 @@ public class MessageStore implements Closeable {
                         final StringBuilder strBuffer = new StringBuilder(512);
                         for (ByteBuffer dataBuffer : memMsgRlt.cacheMsgList) {
                             ClientBroker.TransferedMessage transferedMessage =
-                                    DataStoreUtils.getTransferMsg(isReplicaCsm,
-                                            dataBuffer, dataBuffer.array().length,
+                                    DataStoreUtils.getTransferMsg(dataBuffer,
+                                            dataBuffer.array().length,
                                             countMap, statisKeyBase, strBuffer);
                             if (transferedMessage != null) {
                                 transferedMessageList.add(transferedMessage);
@@ -282,7 +277,7 @@ public class MessageStore implements Closeable {
             msgSizeLimit = this.maxAllowRdSize;
         }
         GetMessageResult retResult =
-            msgFileStore.getMessages(isReplicaCsm, partitionId,
+            msgFileStore.getMessages(partitionId,
                 consumerNodeInfo.getLastDataRdOffset(), reqNewOffset,
                 indexBuffer, consumerNodeInfo.isFilterConsume(),
                 consumerNodeInfo.getFilterCondCodeSet(),
@@ -367,126 +362,6 @@ public class MessageStore implements Closeable {
         return false;
     }
 
-    /***
-     * Get batch message from message store. Support the given offset.
-     *
-     * @param requestOffset
-     * @param partitionId
-     * @param replicaNodeInfo
-     * @param statisKeyBase
-     * @param msgSizeLimit
-     * @return
-     * @throws IOException
-     */
-    public GetMessageResult getBathMessages(long requestOffset, int partitionId,
-                                            ReplicaNodeInfo replicaNodeInfo,
-                                            String statisKeyBase,
-                                            int msgSizeLimit) throws IOException {
-        // #lizard forgives
-        if (this.closed.get()) {
-            throw new IllegalStateException(new StringBuilder(512)
-                    .append("[Data Store] Closed MessageStore for storeKey ")
-                    .append(this.storeKey).toString());
-        }
-        int result = 0;
-        boolean inMemCache = false;
-        boolean isReplicaCsm = true;
-        int maxIndexReadLength = memMaxIndexReadCnt.get();
-        GetCacheMsgResult memMsgRlt = new GetCacheMsgResult(false, TErrCodeConstants.NOT_FOUND,
-                requestOffset, "Can't found Message by index in cache");
-        // in read memory situation, read main memory or backup memory by consumer's config.
-        long maxIndexOffset = TBaseConstants.META_VALUE_UNDEFINED;
-        if (requestOffset >= this.msgFileStore.getIndexMaxOffset()) {
-            this.writeCacheMutex.readLock().lock();
-            try {
-                maxIndexOffset = this.msgMemStore.getIndexLastWritePos();
-                result = this.msgMemStoreBeingFlush.isOffsetInHold(requestOffset);
-                if (result > 0) {
-                    // read from main memory.
-                    inMemCache = true;
-                    memMsgRlt =
-                            msgMemStore.getMessages(isReplicaCsm, false,
-                                    replicaNodeInfo.getLastDataRdOffset(),
-                                    requestOffset, msgStoreMgr.getMaxMsgTransferSize(),
-                                    maxIndexReadLength, partitionId,
-                                    false, Collections.EMPTY_SET);
-
-                } else if (result == 0) {
-                    // read from backup memory.
-                    inMemCache = true;
-                    memMsgRlt =
-                            msgMemStoreBeingFlush.getMessages(isReplicaCsm, true,
-                                    replicaNodeInfo.getLastDataRdOffset(),
-                                    requestOffset, msgStoreMgr.getMaxMsgTransferSize(),
-                                    maxIndexReadLength, partitionId,
-                                    false, Collections.EMPTY_SET);
-                }
-            } finally {
-                this.writeCacheMutex.readLock().unlock();
-            }
-        }
-        if (inMemCache) {
-            // return not found when data is under memory sink operation.
-            if (memMsgRlt.isSuccess) {
-                HashMap<String, CountItem> countMap =
-                        new HashMap<>();
-                List<ClientBroker.TransferedMessage> transferedMessageList =
-                        new ArrayList<>();
-                if (!memMsgRlt.cacheMsgList.isEmpty()) {
-                    final StringBuilder strBuffer = new StringBuilder(512);
-                    for (ByteBuffer dataBuffer : memMsgRlt.cacheMsgList) {
-                        ClientBroker.TransferedMessage transferedMessage =
-                                DataStoreUtils.getTransferMsg(isReplicaCsm,
-                                        dataBuffer, dataBuffer.array().length,
-                                        countMap, statisKeyBase, strBuffer);
-                        if (transferedMessage != null) {
-                            transferedMessageList.add(transferedMessage);
-                        }
-                    }
-                }
-                GetMessageResult getResult =
-                        new GetMessageResult(true, 0, memMsgRlt.errInfo, requestOffset,
-                                memMsgRlt.dltOffset, memMsgRlt.lastRdDataOff,
-                                memMsgRlt.totalMsgSize, countMap, transferedMessageList);
-                getResult.setMaxOffset(maxIndexOffset);
-                return getResult;
-            } else {
-                return new GetMessageResult(false, memMsgRlt.retCode, requestOffset,
-                        memMsgRlt.dltOffset, memMsgRlt.errInfo);
-            }
-        }
-        // read from file, before reading, adjust request's offset.
-        long reqNewOffset = Math.max(requestOffset, this.msgFileStore.getIndexMinOffset());
-        maxIndexReadLength = fileMaxIndexReadSize.get();
-        final ByteBuffer indexBuffer = ByteBuffer.allocate(maxIndexReadLength);
-        Segment indexRecordView =
-                this.msgFileStore.indexSlice(reqNewOffset, maxIndexReadLength);
-        if (indexRecordView == null) {
-            if (reqNewOffset < this.msgFileStore.getIndexMinOffset()) {
-                return new GetMessageResult(false, TErrCodeConstants.MOVED,
-                        reqNewOffset, 0, "current offset is exceed min offset!");
-            } else {
-                return new GetMessageResult(false, TErrCodeConstants.NOT_FOUND,
-                        reqNewOffset, 0, "current offset is exceed max offset!");
-            }
-        }
-        indexRecordView.read(indexBuffer, reqNewOffset);
-        indexBuffer.flip();
-        indexRecordView.relViewRef();
-        if ((msgFileStore.getDataHighMaxOffset() - replicaNodeInfo.getLastDataRdOffset()
-                >= this.tubeConfig.getDoubleDefaultDeduceReadSize())
-                && msgSizeLimit > this.maxAllowRdSize) {
-            msgSizeLimit = this.maxAllowRdSize;
-        }
-        GetMessageResult retResult =
-                msgFileStore.getMessages(isReplicaCsm, partitionId,
-                        replicaNodeInfo.getLastDataRdOffset(), reqNewOffset,
-                        indexBuffer, false, Collections.EMPTY_SET,
-                        statisKeyBase, msgSizeLimit);
-        retResult.setMaxOffset(getIndexMaxOffset());
-        return retResult;
-    }
-
     public String getCurMemMsgSizeStatisInfo(boolean needRefresh) {
         return msgMemStatisInfo.getCurMsgSizeStatisInfo(needRefresh);
     }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
index a88bf32..614bdd1 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -277,23 +277,6 @@ public class MessageStoreManager implements StoreService {
     }
 
     /***
-     * Get message store by topic and storeId.
-     *
-     * @param topic    topic name
-     * @param storeId  store id
-     * @return
-     */
-    @Override
-    public MessageStore getMessageStoresByTopicAndStoreId(String topic, int storeId) {
-        final ConcurrentHashMap<Integer, MessageStore> map
-                = this.dataStores.get(topic);
-        if (map != null) {
-            return map.get(storeId);
-        }
-        return null;
-    }
-
-    /***
      * Get or create message store.
      *
      * @param topic
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/StoreService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/StoreService.java
index 444271f..7ab2328 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/StoreService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/StoreService.java
@@ -37,8 +37,6 @@ public interface StoreService {
 
     Collection<MessageStore> getMessageStoresByTopic(String topic);
 
-    MessageStore getMessageStoresByTopicAndStoreId(String topic, int storeId);
-
     MessageStore getOrCreateMessageStore(String topic,
                                          int partition) throws Throwable;
 
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index 9ee6fe0..e47a069 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -220,7 +220,6 @@ public class MsgFileStore implements Closeable {
     /***
      * Get message from index and data files.
      *
-     * @param isReplicaCsm
      * @param partitionId
      * @param lastRdOffset
      * @param reqOffset
@@ -231,13 +230,12 @@ public class MsgFileStore implements Closeable {
      * @param maxMsgTransferSize
      * @return
      */
-    public GetMessageResult getMessages(boolean isReplicaCsm,
-                                        int partitionId, long lastRdOffset,
-                                        long reqOffset, ByteBuffer indexBuffer,
-                                        boolean isFilterConsume,
-                                        Set<Integer> filterKeySet,
-                                        String statisKeyBase,
-                                        int maxMsgTransferSize) {
+    public GetMessageResult getMessages(final int partitionId, final long lastRdOffset,
+                                        final long reqOffset, final ByteBuffer indexBuffer,
+                                        final boolean isFilterConsume,
+                                        final Set<Integer> filterKeySet,
+                                        final String statisKeyBase,
+                                        final int maxMsgTransferSize) {
         // #lizard forgives
         // Orderly read from index file, then random read from data file.
         int retCode = 0;
@@ -287,9 +285,9 @@ public class MsgFileStore implements Closeable {
                 break;
             }
             // conduct filter operation.
-            if (!isReplicaCsm
-                    && (curIndexPartitionId != partitionId
-                    || (isFilterConsume && !filterKeySet.contains(curIndexKeyCode)))) {
+            if (curIndexPartitionId != partitionId
+                    || (isFilterConsume
+                    && !filterKeySet.contains(curIndexKeyCode))) {
                 lastRdDataOffset = maxDataLimitOffset;
                 readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN;
                 continue;
@@ -342,9 +340,8 @@ public class MsgFileStore implements Closeable {
             readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN;
             lastRdDataOffset = maxDataLimitOffset;
             ClientBroker.TransferedMessage transferedMessage =
-                    DataStoreUtils.getTransferMsg(isReplicaCsm,
-                            dataBuffer, curIndexDataSize,
-                            countMap, statisKeyBase, sBuilder);
+                    DataStoreUtils.getTransferMsg(dataBuffer,
+                            curIndexDataSize, countMap, statisKeyBase, sBuilder);
             if (transferedMessage == null) {
                 continue;
             }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
index 1550754..4bf7bc4 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
@@ -124,21 +124,21 @@ public class MsgMemStore implements Closeable {
     /***
      * Read from memory, read index, then data.
      *
-     * @param isReplicaCsm
-     * @param isSecond
      * @param lstRdDataOffset
      * @param lstRdIndexOffset
      * @param maxReadSize
      * @param maxReadCount
      * @param partitionId
+     * @param isSecond
      * @param isFilterConsume
      * @param filterKeySet
      * @return
      */
-    public GetCacheMsgResult getMessages(boolean isReplicaCsm, boolean isSecond,
-                                         long lstRdDataOffset, long lstRdIndexOffset,
-                                         int maxReadSize, int maxReadCount, int partitionId,
-                                         boolean isFilterConsume, Set<Integer> filterKeySet) {
+    public GetCacheMsgResult getMessages(final long lstRdDataOffset, final long lstRdIndexOffset,
+                                         final int maxReadSize, final int maxReadCount,
+                                         final int partitionId, final boolean isSecond,
+                                         final boolean isFilterConsume,
+                                         final Set<Integer> filterKeySet) {
         // #lizard forgives
         Integer lastWritePos = 0;
         boolean hasMsg = false;
@@ -159,26 +159,22 @@ public class MsgMemStore implements Closeable {
         int startReadOff = (int) (lstRdIndexOffset - this.writeIndexStartPos);
         this.writeLock.lock();
         try {
-            if (isReplicaCsm) {
-                hasMsg = true;
-            } else {
-                if (isFilterConsume) {
-                    // filter conduct. accelerate by keysMap.
-                    for (Integer keyCode : filterKeySet) {
-                        if (keyCode != null) {
-                            lastWritePos = this.keysMap.get(keyCode);
-                            if ((lastWritePos != null) && (lastWritePos >= startReadOff)) {
-                                hasMsg = true;
-                                break;
-                            }
+            if (isFilterConsume) {
+                // filter conduct. accelerate by keysMap.
+                for (Integer keyCode : filterKeySet) {
+                    if (keyCode != null) {
+                        lastWritePos = this.keysMap.get(keyCode);
+                        if ((lastWritePos != null) && (lastWritePos >= startReadOff)) {
+                            hasMsg = true;
+                            break;
                         }
                     }
-                } else {
-                    // orderly consume by partition id.
-                    lastWritePos = this.queuesMap.get(partitionId);
-                    if ((lastWritePos != null) && (lastWritePos >= startReadOff)) {
-                        hasMsg = true;
-                    }
+                }
+            } else {
+                // orderly consume by partition id.
+                lastWritePos = this.queuesMap.get(partitionId);
+                if ((lastWritePos != null) && (lastWritePos >= startReadOff)) {
+                    hasMsg = true;
                 }
             }
             currDataOffset = this.cacheDataOffset.get();
@@ -234,12 +230,10 @@ public class MsgMemStore implements Closeable {
                 readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN;
                 continue;
             }
-            if (!isReplicaCsm) {
-                if ((cPartitionId != partitionId)
-                        || (isFilterConsume && (!filterKeySet.contains(cKeyCode)))) {
-                    readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN;
-                    continue;
-                }
+            if ((cPartitionId != partitionId)
+                    || (isFilterConsume && (!filterKeySet.contains(cKeyCode)))) {
+                readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN;
+                continue;
             }
             // read data file.
             byte[] tmpArray = new byte[cDataSize];
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ReplicaNodeInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ReplicaNodeInfo.java
deleted file mode 100644
index 44292ee..0000000
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ReplicaNodeInfo.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.server.broker.nodeinfo;
-
-import org.apache.inlong.tubemq.corebase.TBaseConstants;
-import org.apache.inlong.tubemq.server.broker.msgstore.MessageStoreManager;
-
-
-public class ReplicaNodeInfo {
-
-    private final MessageStoreManager storeManager;
-    // replicaNode id
-    private String replicaNodeId;
-    // replica node's address
-    private String rmtAddrInfo;
-    private String topicName;
-    private int storeId;
-    private String storeKey;
-    private long createTime = System.currentTimeMillis();
-    private long lastGetTime = 0L;
-    private long lastDataRdOffset = TBaseConstants.META_VALUE_UNDEFINED;
-    private int sentMsgSize = 0;
-
-    public ReplicaNodeInfo(MessageStoreManager storeManager,
-                           String replicaNodeId, String topicName, int storeId) {
-        this.storeManager = storeManager;
-        this.replicaNodeId = replicaNodeId;
-        this.topicName = topicName;
-        this.storeId = storeId;
-    }
-
-    public long getLastGetTime() {
-        return lastGetTime;
-    }
-
-    public void setLastGetTime(long lastGetTime) {
-        this.lastGetTime = lastGetTime;
-    }
-
-    public long getLastDataRdOffset() {
-        return lastDataRdOffset;
-    }
-
-    public void setLastDataRdOffset(long lastDataRdOffset) {
-        this.lastDataRdOffset = lastDataRdOffset;
-    }
-}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java
index 0d0fa4e..3ea36cb 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java
@@ -107,7 +107,6 @@ public class DataStoreUtils {
     /***
      * Convert inner message to protobuf format, then reply to client.
      *
-     * @param isReplicaCsm
      * @param dataBuffer
      * @param dataTotalSize
      * @param countMap
@@ -115,11 +114,10 @@ public class DataStoreUtils {
      * @param sBuilder
      * @return
      */
-    public static ClientBroker.TransferedMessage getTransferMsg(boolean isReplicaCsm,
-                                                                ByteBuffer dataBuffer, int dataTotalSize,
-                                                                HashMap<String, CountItem> countMap,
-                                                                String statisKeyBase,
-                                                                StringBuilder sBuilder) {
+    public static ClientBroker.TransferedMessage getTransferMsg(final ByteBuffer dataBuffer, int dataTotalSize,
+                                                                final HashMap<String, CountItem> countMap,
+                                                                final String statisKeyBase,
+                                                                final StringBuilder sBuilder) {
         if (dataBuffer.array().length < dataTotalSize) {
             return null;
         }
@@ -138,10 +136,6 @@ public class DataStoreUtils {
         }
         final long msgId = dataBuffer.getLong(DataStoreUtils.STORE_HEADER_POS_MSGID);
         final int flag = dataBuffer.getInt(DataStoreUtils.STORE_HEADER_POS_MSGFLAG);
-        final int partitionId = dataBuffer.getInt(DataStoreUtils.STORE_HEADER_POS_QUEUEID);
-        final int indexKeyCode = dataBuffer.getInt(DataStoreUtils.STORE_HEADER_POS_KEYCODE);
-        final int reportAddr = dataBuffer.getInt(DataStoreUtils.STORE_HEADER_POS_REPORTADDR);
-        final long recvTimeInMs = dataBuffer.getLong(DataStoreUtils.STORE_HEADER_POS_RECEIVEDTIME);
         final int payLoadLen2 = payLoadLen;
         final byte[] payLoadData = new byte[payLoadLen];
         System.arraycopy(dataBuffer.array(), payLoadOffset, payLoadData, 0, payLoadLen);
@@ -151,12 +145,6 @@ public class DataStoreUtils {
         dataBuilder.setCheckSum(checkSum);
         dataBuilder.setFlag(flag);
         dataBuilder.setPayLoadData(ByteString.copyFrom(payLoadData));
-        if (isReplicaCsm) {
-            dataBuilder.setPartitionId(partitionId);
-            dataBuilder.setIndexKeyCode(indexKeyCode);
-            dataBuilder.setReportAddr(reportAddr);
-            dataBuilder.setRecvTimeInMs(recvTimeInMs);
-        }
         // get statistic data
         int attrLen = 0;
         String attribute = null;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
index ebc39c0..fb36112 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
@@ -37,10 +37,8 @@ public final class TServerConstants {
     public static final int QRY_PRIORITY_MAX_VALUE = 303;
 
     public static final int TOPIC_STOREBLOCK_NUM_MIN = 1;
-    public static final int TOPIC_STOREBLOCK_NUM_MAX = 1000;
 
     public static final int TOPIC_PARTITION_NUM_MIN = 1;
-    public static final int TOPIC_PARTITION_NUM_MAX = 9000;
 
     public static final int TOPIC_DSK_UNFLUSHTHRESHOLD_MIN = 0;
     public static final int TOPIC_DSK_UNFLUSHTHRESHOLD_DEF = 1000;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
index ac756b7..a5338c4 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
@@ -414,16 +414,14 @@ public class WebParameterUtils {
         // get numTopicStores parameter value
         if (!WebParameterUtils.getIntParamValue(paramCntr, WebFieldDef.NUMTOPICSTORES, false,
                 (defVal == null ? TBaseConstants.META_VALUE_UNDEFINED : defVal.getNumTopicStores()),
-                TServerConstants.TOPIC_STOREBLOCK_NUM_MIN, TServerConstants.TOPIC_STOREBLOCK_NUM_MAX,
-                sBuffer, result)) {
+                TServerConstants.TOPIC_STOREBLOCK_NUM_MIN, sBuffer, result)) {
             return result.isSuccess();
         }
         newConf.setNumTopicStores((int) result.retData1);
         // get numPartitions parameter value
         if (!WebParameterUtils.getIntParamValue(paramCntr, WebFieldDef.NUMPARTITIONS, false,
                 (defVal == null ? TBaseConstants.META_VALUE_UNDEFINED : defVal.getNumPartitions()),
-                TServerConstants.TOPIC_PARTITION_NUM_MIN, TServerConstants.TOPIC_PARTITION_NUM_MAX,
-                sBuffer, result)) {
+                TServerConstants.TOPIC_PARTITION_NUM_MIN, sBuffer, result)) {
             return result.isSuccess();
         }
         newConf.setNumPartitions((int) result.retData1);
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
index 6b7d38d..4bccc36 100644
--- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
@@ -53,6 +53,6 @@ public class MsgMemStoreTest {
         msgMemStore.appendMsg(msgMemStatisInfo, 0, 0,
                 System.currentTimeMillis(), 3, bf, appendResult);
         // get messages
-        GetCacheMsgResult getCacheMsgResult = msgMemStore.getMessages(false, false, 0, 2, 1024, 1000, 0, false, null);
+        GetCacheMsgResult getCacheMsgResult = msgMemStore.getMessages(0, 2, 1024, 1000, 0, false, false, null);
     }
 }