You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/09/13 12:09:25 UTC
[incubator-inlong] branch master updated: [INLONG-1544] Add replica
consumption logic in broker node (#1545)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new cb67782 [INLONG-1544] Add replica consumption logic in broker node (#1545)
cb67782 is described below
commit cb677825244f5f9e01f43cd58836343357a20672
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Sep 13 20:09:18 2021 +0800
[INLONG-1544] Add replica consumption logic in broker node (#1545)
---
.../server/broker/msgstore/MessageStore.java | 139 +++++++++++++++++++--
.../broker/msgstore/MessageStoreManager.java | 17 +++
.../server/broker/msgstore/StoreService.java | 2 +
.../server/broker/msgstore/disk/MsgFileStore.java | 25 ++--
.../server/broker/msgstore/mem/MsgMemStore.java | 54 ++++----
.../server/broker/nodeinfo/ReplicaNodeInfo.java | 62 +++++++++
.../tubemq/server/broker/utils/DataStoreUtils.java | 20 ++-
.../tubemq/server/common/TServerConstants.java | 2 +
.../server/common/utils/WebParameterUtils.java | 6 +-
.../broker/msgstore/mem/MsgMemStoreTest.java | 2 +-
10 files changed, 280 insertions(+), 49 deletions(-)
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
index 4d2d7fd..c108677 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
@@ -21,6 +21,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -47,6 +48,7 @@ import org.apache.inlong.tubemq.server.broker.msgstore.mem.GetCacheMsgResult;
import org.apache.inlong.tubemq.server.broker.msgstore.mem.MsgMemStatisInfo;
import org.apache.inlong.tubemq.server.broker.msgstore.mem.MsgMemStore;
import org.apache.inlong.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
+import org.apache.inlong.tubemq.server.broker.nodeinfo.ReplicaNodeInfo;
import org.apache.inlong.tubemq.server.broker.stats.CountItem;
import org.apache.inlong.tubemq.server.broker.utils.DataStoreUtils;
import org.apache.inlong.tubemq.server.common.utils.AppendResult;
@@ -177,6 +179,7 @@ public class MessageStore implements Closeable {
}
int result = 0;
boolean inMemCache = false;
+ boolean isReplicaCsm = false;
int maxIndexReadLength = memMaxIndexReadCnt.get();
GetCacheMsgResult memMsgRlt = new GetCacheMsgResult(false, TErrCodeConstants.NOT_FOUND,
requestOffset, "Can't found Message by index in cache");
@@ -197,18 +200,20 @@ public class MessageStore implements Closeable {
if (reqSwitch > 2) {
memMsgRlt =
// read from main memory.
- msgMemStore.getMessages(consumerNodeInfo.getLastDataRdOffset(),
+ msgMemStore.getMessages(isReplicaCsm, false,
+ consumerNodeInfo.getLastDataRdOffset(),
requestOffset, msgStoreMgr.getMaxMsgTransferSize(),
- maxIndexReadLength, partitionId, false,
+ maxIndexReadLength, partitionId,
consumerNodeInfo.isFilterConsume(),
consumerNodeInfo.getFilterCondCodeSet());
}
} else {
// read from backup memory.
memMsgRlt =
- msgMemStoreBeingFlush.getMessages(consumerNodeInfo.getLastDataRdOffset(),
+ msgMemStoreBeingFlush.getMessages(isReplicaCsm, true,
+ consumerNodeInfo.getLastDataRdOffset(),
requestOffset, msgStoreMgr.getMaxMsgTransferSize(),
- maxIndexReadLength, partitionId, true,
+ maxIndexReadLength, partitionId,
consumerNodeInfo.isFilterConsume(),
consumerNodeInfo.getFilterCondCodeSet());
}
@@ -228,8 +233,8 @@ public class MessageStore implements Closeable {
final StringBuilder strBuffer = new StringBuilder(512);
for (ByteBuffer dataBuffer : memMsgRlt.cacheMsgList) {
ClientBroker.TransferedMessage transferedMessage =
- DataStoreUtils.getTransferMsg(dataBuffer,
- dataBuffer.array().length,
+ DataStoreUtils.getTransferMsg(isReplicaCsm,
+ dataBuffer, dataBuffer.array().length,
countMap, statisKeyBase, strBuffer);
if (transferedMessage != null) {
transferedMessageList.add(transferedMessage);
@@ -277,7 +282,7 @@ public class MessageStore implements Closeable {
msgSizeLimit = this.maxAllowRdSize;
}
GetMessageResult retResult =
- msgFileStore.getMessages(partitionId,
+ msgFileStore.getMessages(isReplicaCsm, partitionId,
consumerNodeInfo.getLastDataRdOffset(), reqNewOffset,
indexBuffer, consumerNodeInfo.isFilterConsume(),
consumerNodeInfo.getFilterCondCodeSet(),
@@ -362,6 +367,126 @@ public class MessageStore implements Closeable {
return false;
}
+ /***
+ * Get batch message from message store. Support the given offset.
+ *
+ * @param requestOffset
+ * @param partitionId
+ * @param replicaNodeInfo
+ * @param statisKeyBase
+ * @param msgSizeLimit
+ * @return
+ * @throws IOException
+ */
+ public GetMessageResult getBathMessages(long requestOffset, int partitionId,
+ ReplicaNodeInfo replicaNodeInfo,
+ String statisKeyBase,
+ int msgSizeLimit) throws IOException {
+ // #lizard forgives
+ if (this.closed.get()) {
+ throw new IllegalStateException(new StringBuilder(512)
+ .append("[Data Store] Closed MessageStore for storeKey ")
+ .append(this.storeKey).toString());
+ }
+ int result = 0;
+ boolean inMemCache = false;
+ boolean isReplicaCsm = true;
+ int maxIndexReadLength = memMaxIndexReadCnt.get();
+ GetCacheMsgResult memMsgRlt = new GetCacheMsgResult(false, TErrCodeConstants.NOT_FOUND,
+ requestOffset, "Can't found Message by index in cache");
+ // in read memory situation, read main memory or backup memory by consumer's config.
+ long maxIndexOffset = TBaseConstants.META_VALUE_UNDEFINED;
+ if (requestOffset >= this.msgFileStore.getIndexMaxOffset()) {
+ this.writeCacheMutex.readLock().lock();
+ try {
+ maxIndexOffset = this.msgMemStore.getIndexLastWritePos();
+ result = this.msgMemStoreBeingFlush.isOffsetInHold(requestOffset);
+ if (result > 0) {
+ // read from main memory.
+ inMemCache = true;
+ memMsgRlt =
+ msgMemStore.getMessages(isReplicaCsm, false,
+ replicaNodeInfo.getLastDataRdOffset(),
+ requestOffset, msgStoreMgr.getMaxMsgTransferSize(),
+ maxIndexReadLength, partitionId,
+ false, Collections.EMPTY_SET);
+
+ } else if (result == 0) {
+ // read from backup memory.
+ inMemCache = true;
+ memMsgRlt =
+ msgMemStoreBeingFlush.getMessages(isReplicaCsm, true,
+ replicaNodeInfo.getLastDataRdOffset(),
+ requestOffset, msgStoreMgr.getMaxMsgTransferSize(),
+ maxIndexReadLength, partitionId,
+ false, Collections.EMPTY_SET);
+ }
+ } finally {
+ this.writeCacheMutex.readLock().unlock();
+ }
+ }
+ if (inMemCache) {
+ // return not found when data is under memory sink operation.
+ if (memMsgRlt.isSuccess) {
+ HashMap<String, CountItem> countMap =
+ new HashMap<>();
+ List<ClientBroker.TransferedMessage> transferedMessageList =
+ new ArrayList<>();
+ if (!memMsgRlt.cacheMsgList.isEmpty()) {
+ final StringBuilder strBuffer = new StringBuilder(512);
+ for (ByteBuffer dataBuffer : memMsgRlt.cacheMsgList) {
+ ClientBroker.TransferedMessage transferedMessage =
+ DataStoreUtils.getTransferMsg(isReplicaCsm,
+ dataBuffer, dataBuffer.array().length,
+ countMap, statisKeyBase, strBuffer);
+ if (transferedMessage != null) {
+ transferedMessageList.add(transferedMessage);
+ }
+ }
+ }
+ GetMessageResult getResult =
+ new GetMessageResult(true, 0, memMsgRlt.errInfo, requestOffset,
+ memMsgRlt.dltOffset, memMsgRlt.lastRdDataOff,
+ memMsgRlt.totalMsgSize, countMap, transferedMessageList);
+ getResult.setMaxOffset(maxIndexOffset);
+ return getResult;
+ } else {
+ return new GetMessageResult(false, memMsgRlt.retCode, requestOffset,
+ memMsgRlt.dltOffset, memMsgRlt.errInfo);
+ }
+ }
+ // read from file, before reading, adjust request's offset.
+ long reqNewOffset = Math.max(requestOffset, this.msgFileStore.getIndexMinOffset());
+ maxIndexReadLength = fileMaxIndexReadSize.get();
+ final ByteBuffer indexBuffer = ByteBuffer.allocate(maxIndexReadLength);
+ Segment indexRecordView =
+ this.msgFileStore.indexSlice(reqNewOffset, maxIndexReadLength);
+ if (indexRecordView == null) {
+ if (reqNewOffset < this.msgFileStore.getIndexMinOffset()) {
+ return new GetMessageResult(false, TErrCodeConstants.MOVED,
+ reqNewOffset, 0, "current offset is exceed min offset!");
+ } else {
+ return new GetMessageResult(false, TErrCodeConstants.NOT_FOUND,
+ reqNewOffset, 0, "current offset is exceed max offset!");
+ }
+ }
+ indexRecordView.read(indexBuffer, reqNewOffset);
+ indexBuffer.flip();
+ indexRecordView.relViewRef();
+ if ((msgFileStore.getDataHighMaxOffset() - replicaNodeInfo.getLastDataRdOffset()
+ >= this.tubeConfig.getDoubleDefaultDeduceReadSize())
+ && msgSizeLimit > this.maxAllowRdSize) {
+ msgSizeLimit = this.maxAllowRdSize;
+ }
+ GetMessageResult retResult =
+ msgFileStore.getMessages(isReplicaCsm, partitionId,
+ replicaNodeInfo.getLastDataRdOffset(), reqNewOffset,
+ indexBuffer, false, Collections.EMPTY_SET,
+ statisKeyBase, msgSizeLimit);
+ retResult.setMaxOffset(getIndexMaxOffset());
+ return retResult;
+ }
+
public String getCurMemMsgSizeStatisInfo(boolean needRefresh) {
return msgMemStatisInfo.getCurMsgSizeStatisInfo(needRefresh);
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
index 614bdd1..a88bf32 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -277,6 +277,23 @@ public class MessageStoreManager implements StoreService {
}
/***
+ * Get message store by topic and storeId.
+ *
+ * @param topic topic name
+ * @param storeId store id
+ * @return
+ */
+ @Override
+ public MessageStore getMessageStoresByTopicAndStoreId(String topic, int storeId) {
+ final ConcurrentHashMap<Integer, MessageStore> map
+ = this.dataStores.get(topic);
+ if (map != null) {
+ return map.get(storeId);
+ }
+ return null;
+ }
+
+ /***
* Get or create message store.
*
* @param topic
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/StoreService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/StoreService.java
index 7ab2328..444271f 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/StoreService.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/StoreService.java
@@ -37,6 +37,8 @@ public interface StoreService {
Collection<MessageStore> getMessageStoresByTopic(String topic);
+ MessageStore getMessageStoresByTopicAndStoreId(String topic, int storeId);
+
MessageStore getOrCreateMessageStore(String topic,
int partition) throws Throwable;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index e47a069..9ee6fe0 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -220,6 +220,7 @@ public class MsgFileStore implements Closeable {
/***
* Get message from index and data files.
*
+ * @param isReplicaCsm
* @param partitionId
* @param lastRdOffset
* @param reqOffset
@@ -230,12 +231,13 @@ public class MsgFileStore implements Closeable {
* @param maxMsgTransferSize
* @return
*/
- public GetMessageResult getMessages(final int partitionId, final long lastRdOffset,
- final long reqOffset, final ByteBuffer indexBuffer,
- final boolean isFilterConsume,
- final Set<Integer> filterKeySet,
- final String statisKeyBase,
- final int maxMsgTransferSize) {
+ public GetMessageResult getMessages(boolean isReplicaCsm,
+ int partitionId, long lastRdOffset,
+ long reqOffset, ByteBuffer indexBuffer,
+ boolean isFilterConsume,
+ Set<Integer> filterKeySet,
+ String statisKeyBase,
+ int maxMsgTransferSize) {
// #lizard forgives
// Orderly read from index file, then random read from data file.
int retCode = 0;
@@ -285,9 +287,9 @@ public class MsgFileStore implements Closeable {
break;
}
// conduct filter operation.
- if (curIndexPartitionId != partitionId
- || (isFilterConsume
- && !filterKeySet.contains(curIndexKeyCode))) {
+ if (!isReplicaCsm
+ && (curIndexPartitionId != partitionId
+ || (isFilterConsume && !filterKeySet.contains(curIndexKeyCode)))) {
lastRdDataOffset = maxDataLimitOffset;
readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN;
continue;
@@ -340,8 +342,9 @@ public class MsgFileStore implements Closeable {
readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN;
lastRdDataOffset = maxDataLimitOffset;
ClientBroker.TransferedMessage transferedMessage =
- DataStoreUtils.getTransferMsg(dataBuffer,
- curIndexDataSize, countMap, statisKeyBase, sBuilder);
+ DataStoreUtils.getTransferMsg(isReplicaCsm,
+ dataBuffer, curIndexDataSize,
+ countMap, statisKeyBase, sBuilder);
if (transferedMessage == null) {
continue;
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
index 4bf7bc4..1550754 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
@@ -124,21 +124,21 @@ public class MsgMemStore implements Closeable {
/***
* Read from memory, read index, then data.
*
+ * @param isReplicaCsm
+ * @param isSecond
* @param lstRdDataOffset
* @param lstRdIndexOffset
* @param maxReadSize
* @param maxReadCount
* @param partitionId
- * @param isSecond
* @param isFilterConsume
* @param filterKeySet
* @return
*/
- public GetCacheMsgResult getMessages(final long lstRdDataOffset, final long lstRdIndexOffset,
- final int maxReadSize, final int maxReadCount,
- final int partitionId, final boolean isSecond,
- final boolean isFilterConsume,
- final Set<Integer> filterKeySet) {
+ public GetCacheMsgResult getMessages(boolean isReplicaCsm, boolean isSecond,
+ long lstRdDataOffset, long lstRdIndexOffset,
+ int maxReadSize, int maxReadCount, int partitionId,
+ boolean isFilterConsume, Set<Integer> filterKeySet) {
// #lizard forgives
Integer lastWritePos = 0;
boolean hasMsg = false;
@@ -159,22 +159,26 @@ public class MsgMemStore implements Closeable {
int startReadOff = (int) (lstRdIndexOffset - this.writeIndexStartPos);
this.writeLock.lock();
try {
- if (isFilterConsume) {
- // filter conduct. accelerate by keysMap.
- for (Integer keyCode : filterKeySet) {
- if (keyCode != null) {
- lastWritePos = this.keysMap.get(keyCode);
- if ((lastWritePos != null) && (lastWritePos >= startReadOff)) {
- hasMsg = true;
- break;
+ if (isReplicaCsm) {
+ hasMsg = true;
+ } else {
+ if (isFilterConsume) {
+ // filter conduct. accelerate by keysMap.
+ for (Integer keyCode : filterKeySet) {
+ if (keyCode != null) {
+ lastWritePos = this.keysMap.get(keyCode);
+ if ((lastWritePos != null) && (lastWritePos >= startReadOff)) {
+ hasMsg = true;
+ break;
+ }
}
}
- }
- } else {
- // orderly consume by partition id.
- lastWritePos = this.queuesMap.get(partitionId);
- if ((lastWritePos != null) && (lastWritePos >= startReadOff)) {
- hasMsg = true;
+ } else {
+ // orderly consume by partition id.
+ lastWritePos = this.queuesMap.get(partitionId);
+ if ((lastWritePos != null) && (lastWritePos >= startReadOff)) {
+ hasMsg = true;
+ }
}
}
currDataOffset = this.cacheDataOffset.get();
@@ -230,10 +234,12 @@ public class MsgMemStore implements Closeable {
readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN;
continue;
}
- if ((cPartitionId != partitionId)
- || (isFilterConsume && (!filterKeySet.contains(cKeyCode)))) {
- readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN;
- continue;
+ if (!isReplicaCsm) {
+ if ((cPartitionId != partitionId)
+ || (isFilterConsume && (!filterKeySet.contains(cKeyCode)))) {
+ readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN;
+ continue;
+ }
}
// read data file.
byte[] tmpArray = new byte[cDataSize];
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ReplicaNodeInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ReplicaNodeInfo.java
new file mode 100644
index 0000000..44292ee
--- /dev/null
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ReplicaNodeInfo.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.broker.nodeinfo;
+
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import org.apache.inlong.tubemq.server.broker.msgstore.MessageStoreManager;
+
+
+public class ReplicaNodeInfo {
+
+ private final MessageStoreManager storeManager;
+ // replicaNode id
+ private String replicaNodeId;
+ // replica node's address
+ private String rmtAddrInfo;
+ private String topicName;
+ private int storeId;
+ private String storeKey;
+ private long createTime = System.currentTimeMillis();
+ private long lastGetTime = 0L;
+ private long lastDataRdOffset = TBaseConstants.META_VALUE_UNDEFINED;
+ private int sentMsgSize = 0;
+
+ public ReplicaNodeInfo(MessageStoreManager storeManager,
+ String replicaNodeId, String topicName, int storeId) {
+ this.storeManager = storeManager;
+ this.replicaNodeId = replicaNodeId;
+ this.topicName = topicName;
+ this.storeId = storeId;
+ }
+
+ public long getLastGetTime() {
+ return lastGetTime;
+ }
+
+ public void setLastGetTime(long lastGetTime) {
+ this.lastGetTime = lastGetTime;
+ }
+
+ public long getLastDataRdOffset() {
+ return lastDataRdOffset;
+ }
+
+ public void setLastDataRdOffset(long lastDataRdOffset) {
+ this.lastDataRdOffset = lastDataRdOffset;
+ }
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java
index 3ea36cb..0d0fa4e 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java
@@ -107,6 +107,7 @@ public class DataStoreUtils {
/***
* Convert inner message to protobuf format, then reply to client.
*
+ * @param isReplicaCsm
* @param dataBuffer
* @param dataTotalSize
* @param countMap
@@ -114,10 +115,11 @@ public class DataStoreUtils {
* @param sBuilder
* @return
*/
- public static ClientBroker.TransferedMessage getTransferMsg(final ByteBuffer dataBuffer, int dataTotalSize,
- final HashMap<String, CountItem> countMap,
- final String statisKeyBase,
- final StringBuilder sBuilder) {
+ public static ClientBroker.TransferedMessage getTransferMsg(boolean isReplicaCsm,
+ ByteBuffer dataBuffer, int dataTotalSize,
+ HashMap<String, CountItem> countMap,
+ String statisKeyBase,
+ StringBuilder sBuilder) {
if (dataBuffer.array().length < dataTotalSize) {
return null;
}
@@ -136,6 +138,10 @@ public class DataStoreUtils {
}
final long msgId = dataBuffer.getLong(DataStoreUtils.STORE_HEADER_POS_MSGID);
final int flag = dataBuffer.getInt(DataStoreUtils.STORE_HEADER_POS_MSGFLAG);
+ final int partitionId = dataBuffer.getInt(DataStoreUtils.STORE_HEADER_POS_QUEUEID);
+ final int indexKeyCode = dataBuffer.getInt(DataStoreUtils.STORE_HEADER_POS_KEYCODE);
+ final int reportAddr = dataBuffer.getInt(DataStoreUtils.STORE_HEADER_POS_REPORTADDR);
+ final long recvTimeInMs = dataBuffer.getLong(DataStoreUtils.STORE_HEADER_POS_RECEIVEDTIME);
final int payLoadLen2 = payLoadLen;
final byte[] payLoadData = new byte[payLoadLen];
System.arraycopy(dataBuffer.array(), payLoadOffset, payLoadData, 0, payLoadLen);
@@ -145,6 +151,12 @@ public class DataStoreUtils {
dataBuilder.setCheckSum(checkSum);
dataBuilder.setFlag(flag);
dataBuilder.setPayLoadData(ByteString.copyFrom(payLoadData));
+ if (isReplicaCsm) {
+ dataBuilder.setPartitionId(partitionId);
+ dataBuilder.setIndexKeyCode(indexKeyCode);
+ dataBuilder.setReportAddr(reportAddr);
+ dataBuilder.setRecvTimeInMs(recvTimeInMs);
+ }
// get statistic data
int attrLen = 0;
String attribute = null;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
index fb36112..ebc39c0 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
@@ -37,8 +37,10 @@ public final class TServerConstants {
public static final int QRY_PRIORITY_MAX_VALUE = 303;
public static final int TOPIC_STOREBLOCK_NUM_MIN = 1;
+ public static final int TOPIC_STOREBLOCK_NUM_MAX = 1000;
public static final int TOPIC_PARTITION_NUM_MIN = 1;
+ public static final int TOPIC_PARTITION_NUM_MAX = 9000;
public static final int TOPIC_DSK_UNFLUSHTHRESHOLD_MIN = 0;
public static final int TOPIC_DSK_UNFLUSHTHRESHOLD_DEF = 1000;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
index a5338c4..ac756b7 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
@@ -414,14 +414,16 @@ public class WebParameterUtils {
// get numTopicStores parameter value
if (!WebParameterUtils.getIntParamValue(paramCntr, WebFieldDef.NUMTOPICSTORES, false,
(defVal == null ? TBaseConstants.META_VALUE_UNDEFINED : defVal.getNumTopicStores()),
- TServerConstants.TOPIC_STOREBLOCK_NUM_MIN, sBuffer, result)) {
+ TServerConstants.TOPIC_STOREBLOCK_NUM_MIN, TServerConstants.TOPIC_STOREBLOCK_NUM_MAX,
+ sBuffer, result)) {
return result.isSuccess();
}
newConf.setNumTopicStores((int) result.retData1);
// get numPartitions parameter value
if (!WebParameterUtils.getIntParamValue(paramCntr, WebFieldDef.NUMPARTITIONS, false,
(defVal == null ? TBaseConstants.META_VALUE_UNDEFINED : defVal.getNumPartitions()),
- TServerConstants.TOPIC_PARTITION_NUM_MIN, sBuffer, result)) {
+ TServerConstants.TOPIC_PARTITION_NUM_MIN, TServerConstants.TOPIC_PARTITION_NUM_MAX,
+ sBuffer, result)) {
return result.isSuccess();
}
newConf.setNumPartitions((int) result.retData1);
diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
index 4bccc36..6b7d38d 100644
--- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
+++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
@@ -53,6 +53,6 @@ public class MsgMemStoreTest {
msgMemStore.appendMsg(msgMemStatisInfo, 0, 0,
System.currentTimeMillis(), 3, bf, appendResult);
// get messages
- GetCacheMsgResult getCacheMsgResult = msgMemStore.getMessages(0, 2, 1024, 1000, 0, false, false, null);
+ GetCacheMsgResult getCacheMsgResult = msgMemStore.getMessages(false, false, 0, 2, 1024, 1000, 0, false, null);
}
}