You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/09/13 12:22:53 UTC
[incubator-inlong] branch master updated: Revert "[INLONG-1544] Add
replica consumption logic in broker node (#1545)" (#1546)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0b5f5b4 Revert "[INLONG-1544] Add replica consumption logic in broker node (#1545)" (#1546)
0b5f5b4 is described below
commit 0b5f5b41957e74623e1b7652ec179e6358c9ddc5
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Sep 13 20:20:50 2021 +0800
Revert "[INLONG-1544] Add replica consumption logic in broker node (#1545)" (#1546)
This reverts commit cb677825244f5f9e01f43cd58836343357a20672.
---
.../server/broker/msgstore/MessageStore.java | 139 ++-------------------
.../broker/msgstore/MessageStoreManager.java | 17 ---
.../server/broker/msgstore/StoreService.java | 2 -
.../server/broker/msgstore/disk/MsgFileStore.java | 25 ++--
.../server/broker/msgstore/mem/MsgMemStore.java | 54 ++++----
.../server/broker/nodeinfo/ReplicaNodeInfo.java | 62 ---------
.../tubemq/server/broker/utils/DataStoreUtils.java | 20 +--
.../tubemq/server/common/TServerConstants.java | 2 -
.../server/common/utils/WebParameterUtils.java | 6 +-
.../broker/msgstore/mem/MsgMemStoreTest.java | 2 +-
10 files changed, 49 insertions(+), 280 deletions(-)
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
index c108677..4d2d7fd 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
@@ -21,7 +21,6 @@ import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -48,7 +47,6 @@ import org.apache.inlong.tubemq.server.broker.msgstore.mem.GetCacheMsgResult;
import org.apache.inlong.tubemq.server.broker.msgstore.mem.MsgMemStatisInfo;
import org.apache.inlong.tubemq.server.broker.msgstore.mem.MsgMemStore;
import org.apache.inlong.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
-import org.apache.inlong.tubemq.server.broker.nodeinfo.ReplicaNodeInfo;
import org.apache.inlong.tubemq.server.broker.stats.CountItem;
import org.apache.inlong.tubemq.server.broker.utils.DataStoreUtils;
import org.apache.inlong.tubemq.server.common.utils.AppendResult;
@@ -179,7 +177,6 @@ public class MessageStore implements Closeable {
}
int result = 0;
boolean inMemCache = false;
- boolean isReplicaCsm = false;
int maxIndexReadLength = memMaxIndexReadCnt.get();
GetCacheMsgResult memMsgRlt = new GetCacheMsgResult(false, TErrCodeConstants.NOT_FOUND,
requestOffset, "Can't found Message by index in cache");
@@ -200,20 +197,18 @@ public class MessageStore implements Closeable {
if (reqSwitch > 2) {
memMsgRlt =
// read from main memory.
- msgMemStore.getMessages(isReplicaCsm, false,
- consumerNodeInfo.getLastDataRdOffset(),
+ msgMemStore.getMessages(consumerNodeInfo.getLastDataRdOffset(),
requestOffset, msgStoreMgr.getMaxMsgTransferSize(),
- maxIndexReadLength, partitionId,
+ maxIndexReadLength, partitionId, false,
consumerNodeInfo.isFilterConsume(),
consumerNodeInfo.getFilterCondCodeSet());
}
} else {
// read from backup memory.
memMsgRlt =
- msgMemStoreBeingFlush.getMessages(isReplicaCsm, true,
- consumerNodeInfo.getLastDataRdOffset(),
+ msgMemStoreBeingFlush.getMessages(consumerNodeInfo.getLastDataRdOffset(),
requestOffset, msgStoreMgr.getMaxMsgTransferSize(),
- maxIndexReadLength, partitionId,
+ maxIndexReadLength, partitionId, true,
consumerNodeInfo.isFilterConsume(),
consumerNodeInfo.getFilterCondCodeSet());
}
@@ -233,8 +228,8 @@ public class MessageStore implements Closeable {
final StringBuilder strBuffer = new StringBuilder(512);
for (ByteBuffer dataBuffer : memMsgRlt.cacheMsgList) {
ClientBroker.TransferedMessage transferedMessage =
- DataStoreUtils.getTransferMsg(isReplicaCsm,
- dataBuffer, dataBuffer.array().length,
+ DataStoreUtils.getTransferMsg(dataBuffer,
+ dataBuffer.array().length,
countMap, statisKeyBase, strBuffer);
if (transferedMessage != null) {
transferedMessageList.add(transferedMessage);
@@ -282,7 +277,7 @@ public class MessageStore implements Closeable {
msgSizeLimit = this.maxAllowRdSize;
}
GetMessageResult retResult =
- msgFileStore.getMessages(isReplicaCsm, partitionId,
+ msgFileStore.getMessages(partitionId,
consumerNodeInfo.getLastDataRdOffset(), reqNewOffset,
indexBuffer, consumerNodeInfo.isFilterConsume(),
consumerNodeInfo.getFilterCondCodeSet(),
@@ -367,126 +362,6 @@ public class MessageStore implements Closeable {
return false;
}
- /***
- * Get batch message from message store. Support the given offset.
- *
- * @param requestOffset
- * @param partitionId
- * @param replicaNodeInfo
- * @param statisKeyBase
- * @param msgSizeLimit
- * @return
- * @throws IOException
- */
- public GetMessageResult getBathMessages(long requestOffset, int partitionId,
- ReplicaNodeInfo replicaNodeInfo,
- String statisKeyBase,
- int msgSizeLimit) throws IOException {
- // #lizard forgives
- if (this.closed.get()) {
- throw new IllegalStateException(new StringBuilder(512)
- .append("[Data Store] Closed MessageStore for storeKey ")
- .append(this.storeKey).toString());
- }
- int result = 0;
- boolean inMemCache = false;
- boolean isReplicaCsm = true;
- int maxIndexReadLength = memMaxIndexReadCnt.get();
- GetCacheMsgResult memMsgRlt = new GetCacheMsgResult(false, TErrCodeConstants.NOT_FOUND,
- requestOffset, "Can't found Message by index in cache");
- // in read memory situation, read main memory or backup memory by consumer's config.
- long maxIndexOffset = TBaseConstants.META_VALUE_UNDEFINED;
- if (requestOffset >= this.msgFileStore.getIndexMaxOffset()) {
- this.writeCacheMutex.readLock().lock();
- try {
- maxIndexOffset = this.msgMemStore.getIndexLastWritePos();
- result = this.msgMemStoreBeingFlush.isOffsetInHold(requestOffset);
- if (result > 0) {
- // read from main memory.
- inMemCache = true;
- memMsgRlt =
- msgMemStore.getMessages(isReplicaCsm, false,
- replicaNodeInfo.getLastDataRdOffset(),
- requestOffset, msgStoreMgr.getMaxMsgTransferSize(),
- maxIndexReadLength, partitionId,
- false, Collections.EMPTY_SET);
-
- } else if (result == 0) {
- // read from backup memory.
- inMemCache = true;
- memMsgRlt =
- msgMemStoreBeingFlush.getMessages(isReplicaCsm, true,
- replicaNodeInfo.getLastDataRdOffset(),
- requestOffset, msgStoreMgr.getMaxMsgTransferSize(),
- maxIndexReadLength, partitionId,
- false, Collections.EMPTY_SET);
- }
- } finally {
- this.writeCacheMutex.readLock().unlock();
- }
- }
- if (inMemCache) {
- // return not found when data is under memory sink operation.
- if (memMsgRlt.isSuccess) {
- HashMap<String, CountItem> countMap =
- new HashMap<>();
- List<ClientBroker.TransferedMessage> transferedMessageList =
- new ArrayList<>();
- if (!memMsgRlt.cacheMsgList.isEmpty()) {
- final StringBuilder strBuffer = new StringBuilder(512);
- for (ByteBuffer dataBuffer : memMsgRlt.cacheMsgList) {
- ClientBroker.TransferedMessage transferedMessage =
- DataStoreUtils.getTransferMsg(isReplicaCsm,
- dataBuffer, dataBuffer.array().length,
- countMap, statisKeyBase, strBuffer);
- if (transferedMessage != null) {
- transferedMessageList.add(transferedMessage);
- }
- }
- }
- GetMessageResult getResult =
- new GetMessageResult(true, 0, memMsgRlt.errInfo, requestOffset,
- memMsgRlt.dltOffset, memMsgRlt.lastRdDataOff,
- memMsgRlt.totalMsgSize, countMap, transferedMessageList);
- getResult.setMaxOffset(maxIndexOffset);
- return getResult;
- } else {
- return new GetMessageResult(false, memMsgRlt.retCode, requestOffset,
- memMsgRlt.dltOffset, memMsgRlt.errInfo);
- }
- }
- // read from file, before reading, adjust request's offset.
- long reqNewOffset = Math.max(requestOffset, this.msgFileStore.getIndexMinOffset());
- maxIndexReadLength = fileMaxIndexReadSize.get();
- final ByteBuffer indexBuffer = ByteBuffer.allocate(maxIndexReadLength);
- Segment indexRecordView =
- this.msgFileStore.indexSlice(reqNewOffset, maxIndexReadLength);
- if (indexRecordView == null) {
- if (reqNewOffset < this.msgFileStore.getIndexMinOffset()) {
- return new GetMessageResult(false, TErrCodeConstants.MOVED,
- reqNewOffset, 0, "current offset is exceed min offset!");
- } else {
- return new GetMessageResult(false, TErrCodeConstants.NOT_FOUND,
- reqNewOffset, 0, "current offset is exceed max offset!");
- }
- }
- indexRecordView.read(indexBuffer, reqNewOffset);
- indexBuffer.flip();
- indexRecordView.relViewRef();
- if ((msgFileStore.getDataHighMaxOffset() - replicaNodeInfo.getLastDataRdOffset()
- >= this.tubeConfig.getDoubleDefaultDeduceReadSize())
- && msgSizeLimit > this.maxAllowRdSize) {
- msgSizeLimit = this.maxAllowRdSize;
- }
- GetMessageResult retResult =
- msgFileStore.getMessages(isReplicaCsm, partitionId,
- replicaNodeInfo.getLastDataRdOffset(), reqNewOffset,
- indexBuffer, false, Collections.EMPTY_SET,
- statisKeyBase, msgSizeLimit);
- retResult.setMaxOffset(getIndexMaxOffset());
- return retResult;
- }
-
public String getCurMemMsgSizeStatisInfo(boolean needRefresh) {
return msgMemStatisInfo.getCurMsgSizeStatisInfo(needRefresh);
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
index a88bf32..614bdd1 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -277,23 +277,6 @@ public class MessageStoreManager implements StoreService {
}
/***
- * Get message store by topic and storeId.
- *
- * @param topic topic name
- * @param storeId store id
- * @return
- */
- @Override
- public MessageStore getMessageStoresByTopicAndStoreId(String topic, int storeId) {
- final ConcurrentHashMap<Integer, MessageStore> map
- = this.dataStores.get(topic);
- if (map != null) {
- return map.get(storeId);
- }
- return null;
- }
-
- /***
* Get or create message store.
*
* @param topic
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/StoreService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/StoreService.java
index 444271f..7ab2328 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/StoreService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/StoreService.java
@@ -37,8 +37,6 @@ public interface StoreService {
Collection<MessageStore> getMessageStoresByTopic(String topic);
- MessageStore getMessageStoresByTopicAndStoreId(String topic, int storeId);
-
MessageStore getOrCreateMessageStore(String topic,
int partition) throws Throwable;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index 9ee6fe0..e47a069 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -220,7 +220,6 @@ public class MsgFileStore implements Closeable {
/***
* Get message from index and data files.
*
- * @param isReplicaCsm
* @param partitionId
* @param lastRdOffset
* @param reqOffset
@@ -231,13 +230,12 @@ public class MsgFileStore implements Closeable {
* @param maxMsgTransferSize
* @return
*/
- public GetMessageResult getMessages(boolean isReplicaCsm,
- int partitionId, long lastRdOffset,
- long reqOffset, ByteBuffer indexBuffer,
- boolean isFilterConsume,
- Set<Integer> filterKeySet,
- String statisKeyBase,
- int maxMsgTransferSize) {
+ public GetMessageResult getMessages(final int partitionId, final long lastRdOffset,
+ final long reqOffset, final ByteBuffer indexBuffer,
+ final boolean isFilterConsume,
+ final Set<Integer> filterKeySet,
+ final String statisKeyBase,
+ final int maxMsgTransferSize) {
// #lizard forgives
// Orderly read from index file, then random read from data file.
int retCode = 0;
@@ -287,9 +285,9 @@ public class MsgFileStore implements Closeable {
break;
}
// conduct filter operation.
- if (!isReplicaCsm
- && (curIndexPartitionId != partitionId
- || (isFilterConsume && !filterKeySet.contains(curIndexKeyCode)))) {
+ if (curIndexPartitionId != partitionId
+ || (isFilterConsume
+ && !filterKeySet.contains(curIndexKeyCode))) {
lastRdDataOffset = maxDataLimitOffset;
readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN;
continue;
@@ -342,9 +340,8 @@ public class MsgFileStore implements Closeable {
readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN;
lastRdDataOffset = maxDataLimitOffset;
ClientBroker.TransferedMessage transferedMessage =
- DataStoreUtils.getTransferMsg(isReplicaCsm,
- dataBuffer, curIndexDataSize,
- countMap, statisKeyBase, sBuilder);
+ DataStoreUtils.getTransferMsg(dataBuffer,
+ curIndexDataSize, countMap, statisKeyBase, sBuilder);
if (transferedMessage == null) {
continue;
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
index 1550754..4bf7bc4 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
@@ -124,21 +124,21 @@ public class MsgMemStore implements Closeable {
/***
* Read from memory, read index, then data.
*
- * @param isReplicaCsm
- * @param isSecond
* @param lstRdDataOffset
* @param lstRdIndexOffset
* @param maxReadSize
* @param maxReadCount
* @param partitionId
+ * @param isSecond
* @param isFilterConsume
* @param filterKeySet
* @return
*/
- public GetCacheMsgResult getMessages(boolean isReplicaCsm, boolean isSecond,
- long lstRdDataOffset, long lstRdIndexOffset,
- int maxReadSize, int maxReadCount, int partitionId,
- boolean isFilterConsume, Set<Integer> filterKeySet) {
+ public GetCacheMsgResult getMessages(final long lstRdDataOffset, final long lstRdIndexOffset,
+ final int maxReadSize, final int maxReadCount,
+ final int partitionId, final boolean isSecond,
+ final boolean isFilterConsume,
+ final Set<Integer> filterKeySet) {
// #lizard forgives
Integer lastWritePos = 0;
boolean hasMsg = false;
@@ -159,26 +159,22 @@ public class MsgMemStore implements Closeable {
int startReadOff = (int) (lstRdIndexOffset - this.writeIndexStartPos);
this.writeLock.lock();
try {
- if (isReplicaCsm) {
- hasMsg = true;
- } else {
- if (isFilterConsume) {
- // filter conduct. accelerate by keysMap.
- for (Integer keyCode : filterKeySet) {
- if (keyCode != null) {
- lastWritePos = this.keysMap.get(keyCode);
- if ((lastWritePos != null) && (lastWritePos >= startReadOff)) {
- hasMsg = true;
- break;
- }
+ if (isFilterConsume) {
+ // filter conduct. accelerate by keysMap.
+ for (Integer keyCode : filterKeySet) {
+ if (keyCode != null) {
+ lastWritePos = this.keysMap.get(keyCode);
+ if ((lastWritePos != null) && (lastWritePos >= startReadOff)) {
+ hasMsg = true;
+ break;
}
}
- } else {
- // orderly consume by partition id.
- lastWritePos = this.queuesMap.get(partitionId);
- if ((lastWritePos != null) && (lastWritePos >= startReadOff)) {
- hasMsg = true;
- }
+ }
+ } else {
+ // orderly consume by partition id.
+ lastWritePos = this.queuesMap.get(partitionId);
+ if ((lastWritePos != null) && (lastWritePos >= startReadOff)) {
+ hasMsg = true;
}
}
currDataOffset = this.cacheDataOffset.get();
@@ -234,12 +230,10 @@ public class MsgMemStore implements Closeable {
readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN;
continue;
}
- if (!isReplicaCsm) {
- if ((cPartitionId != partitionId)
- || (isFilterConsume && (!filterKeySet.contains(cKeyCode)))) {
- readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN;
- continue;
- }
+ if ((cPartitionId != partitionId)
+ || (isFilterConsume && (!filterKeySet.contains(cKeyCode)))) {
+ readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN;
+ continue;
}
// read data file.
byte[] tmpArray = new byte[cDataSize];
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ReplicaNodeInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ReplicaNodeInfo.java
deleted file mode 100644
index 44292ee..0000000
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ReplicaNodeInfo.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.server.broker.nodeinfo;
-
-import org.apache.inlong.tubemq.corebase.TBaseConstants;
-import org.apache.inlong.tubemq.server.broker.msgstore.MessageStoreManager;
-
-
-public class ReplicaNodeInfo {
-
- private final MessageStoreManager storeManager;
- // replicaNode id
- private String replicaNodeId;
- // replica node's address
- private String rmtAddrInfo;
- private String topicName;
- private int storeId;
- private String storeKey;
- private long createTime = System.currentTimeMillis();
- private long lastGetTime = 0L;
- private long lastDataRdOffset = TBaseConstants.META_VALUE_UNDEFINED;
- private int sentMsgSize = 0;
-
- public ReplicaNodeInfo(MessageStoreManager storeManager,
- String replicaNodeId, String topicName, int storeId) {
- this.storeManager = storeManager;
- this.replicaNodeId = replicaNodeId;
- this.topicName = topicName;
- this.storeId = storeId;
- }
-
- public long getLastGetTime() {
- return lastGetTime;
- }
-
- public void setLastGetTime(long lastGetTime) {
- this.lastGetTime = lastGetTime;
- }
-
- public long getLastDataRdOffset() {
- return lastDataRdOffset;
- }
-
- public void setLastDataRdOffset(long lastDataRdOffset) {
- this.lastDataRdOffset = lastDataRdOffset;
- }
-}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java
index 0d0fa4e..3ea36cb 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java
@@ -107,7 +107,6 @@ public class DataStoreUtils {
/***
* Convert inner message to protobuf format, then reply to client.
*
- * @param isReplicaCsm
* @param dataBuffer
* @param dataTotalSize
* @param countMap
@@ -115,11 +114,10 @@ public class DataStoreUtils {
* @param sBuilder
* @return
*/
- public static ClientBroker.TransferedMessage getTransferMsg(boolean isReplicaCsm,
- ByteBuffer dataBuffer, int dataTotalSize,
- HashMap<String, CountItem> countMap,
- String statisKeyBase,
- StringBuilder sBuilder) {
+ public static ClientBroker.TransferedMessage getTransferMsg(final ByteBuffer dataBuffer, int dataTotalSize,
+ final HashMap<String, CountItem> countMap,
+ final String statisKeyBase,
+ final StringBuilder sBuilder) {
if (dataBuffer.array().length < dataTotalSize) {
return null;
}
@@ -138,10 +136,6 @@ public class DataStoreUtils {
}
final long msgId = dataBuffer.getLong(DataStoreUtils.STORE_HEADER_POS_MSGID);
final int flag = dataBuffer.getInt(DataStoreUtils.STORE_HEADER_POS_MSGFLAG);
- final int partitionId = dataBuffer.getInt(DataStoreUtils.STORE_HEADER_POS_QUEUEID);
- final int indexKeyCode = dataBuffer.getInt(DataStoreUtils.STORE_HEADER_POS_KEYCODE);
- final int reportAddr = dataBuffer.getInt(DataStoreUtils.STORE_HEADER_POS_REPORTADDR);
- final long recvTimeInMs = dataBuffer.getLong(DataStoreUtils.STORE_HEADER_POS_RECEIVEDTIME);
final int payLoadLen2 = payLoadLen;
final byte[] payLoadData = new byte[payLoadLen];
System.arraycopy(dataBuffer.array(), payLoadOffset, payLoadData, 0, payLoadLen);
@@ -151,12 +145,6 @@ public class DataStoreUtils {
dataBuilder.setCheckSum(checkSum);
dataBuilder.setFlag(flag);
dataBuilder.setPayLoadData(ByteString.copyFrom(payLoadData));
- if (isReplicaCsm) {
- dataBuilder.setPartitionId(partitionId);
- dataBuilder.setIndexKeyCode(indexKeyCode);
- dataBuilder.setReportAddr(reportAddr);
- dataBuilder.setRecvTimeInMs(recvTimeInMs);
- }
// get statistic data
int attrLen = 0;
String attribute = null;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
index ebc39c0..fb36112 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
@@ -37,10 +37,8 @@ public final class TServerConstants {
public static final int QRY_PRIORITY_MAX_VALUE = 303;
public static final int TOPIC_STOREBLOCK_NUM_MIN = 1;
- public static final int TOPIC_STOREBLOCK_NUM_MAX = 1000;
public static final int TOPIC_PARTITION_NUM_MIN = 1;
- public static final int TOPIC_PARTITION_NUM_MAX = 9000;
public static final int TOPIC_DSK_UNFLUSHTHRESHOLD_MIN = 0;
public static final int TOPIC_DSK_UNFLUSHTHRESHOLD_DEF = 1000;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
index ac756b7..a5338c4 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
@@ -414,16 +414,14 @@ public class WebParameterUtils {
// get numTopicStores parameter value
if (!WebParameterUtils.getIntParamValue(paramCntr, WebFieldDef.NUMTOPICSTORES, false,
(defVal == null ? TBaseConstants.META_VALUE_UNDEFINED : defVal.getNumTopicStores()),
- TServerConstants.TOPIC_STOREBLOCK_NUM_MIN, TServerConstants.TOPIC_STOREBLOCK_NUM_MAX,
- sBuffer, result)) {
+ TServerConstants.TOPIC_STOREBLOCK_NUM_MIN, sBuffer, result)) {
return result.isSuccess();
}
newConf.setNumTopicStores((int) result.retData1);
// get numPartitions parameter value
if (!WebParameterUtils.getIntParamValue(paramCntr, WebFieldDef.NUMPARTITIONS, false,
(defVal == null ? TBaseConstants.META_VALUE_UNDEFINED : defVal.getNumPartitions()),
- TServerConstants.TOPIC_PARTITION_NUM_MIN, TServerConstants.TOPIC_PARTITION_NUM_MAX,
- sBuffer, result)) {
+ TServerConstants.TOPIC_PARTITION_NUM_MIN, sBuffer, result)) {
return result.isSuccess();
}
newConf.setNumPartitions((int) result.retData1);
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
index 6b7d38d..4bccc36 100644
--- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
@@ -53,6 +53,6 @@ public class MsgMemStoreTest {
msgMemStore.appendMsg(msgMemStatisInfo, 0, 0,
System.currentTimeMillis(), 3, bf, appendResult);
// get messages
- GetCacheMsgResult getCacheMsgResult = msgMemStore.getMessages(false, false, 0, 2, 1024, 1000, 0, false, null);
+ GetCacheMsgResult getCacheMsgResult = msgMemStore.getMessages(0, 2, 1024, 1000, 0, false, false, null);
}
}