You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2021/01/13 03:06:10 UTC
[incubator-tubemq] 01/02: [TUBEMQ-509] Adjust the packet length
check when data is loaded
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit cee03a65d0b39af6601af2ff7608bc341ca8f638
Author: gosonzhang <go...@tencent.com>
AuthorDate: Tue Jan 12 14:30:06 2021 +0800
[TUBEMQ-509] Adjust the packet length check when data is loaded
---
.../apache/tubemq/server/broker/metadata/ClusterConfigHolder.java | 8 --------
.../apache/tubemq/server/broker/msgstore/disk/FileSegment.java | 5 ++---
.../apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java | 3 +--
.../org/apache/tubemq/server/broker/utils/DataStoreUtils.java | 7 +++----
4 files changed, 6 insertions(+), 17 deletions(-)
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java
index c72427d..134e2ef 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java
@@ -33,10 +33,6 @@ public class ClusterConfigHolder {
+ TBaseConstants.META_MAX_MESSAGE_HEADER_SIZE);
private static AtomicInteger minMemCacheSize =
new AtomicInteger(TBaseConstants.META_MIN_MEM_BUFFER_SIZE);
- private static AtomicInteger maxMsgStoreLength =
- new AtomicInteger(TBaseConstants.META_MAX_MESSAGE_DATA_SIZE
- + TBaseConstants.META_MAX_MESSAGE_HEADER_SIZE
- + DataStoreUtils.STORE_DATA_HEADER_LEN);
public ClusterConfigHolder() {
@@ -58,7 +54,6 @@ public class ClusterConfigHolder {
maxMsgSize.set(tmpMaxSize);
minMemCacheSize.set(tmpMaxSize +
(tmpMaxSize % 4 + 1) * TBaseConstants.META_MESSAGE_SIZE_ADJUST);
- maxMsgStoreLength.set(tmpMaxSize + DataStoreUtils.STORE_DATA_HEADER_LEN);
}
}
}
@@ -76,7 +71,4 @@ public class ClusterConfigHolder {
return minMemCacheSize.get();
}
- public static int getMaxMsgStoreLength() {
- return maxMsgStoreLength.get();
- }
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java
index 6fda352..949d149 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java
@@ -25,7 +25,6 @@ import java.nio.channels.FileChannel;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.tubemq.corebase.utils.CheckSum;
-import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
import org.apache.tubemq.server.broker.utils.DataStoreUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -366,7 +365,7 @@ public class FileSegment implements Segment {
itemNext = validBytes + DataStoreUtils.STORE_DATA_HEADER_LEN + itemMsglen;
if ((itemMsgToken != DataStoreUtils.STORE_DATA_TOKER_BEGIN_VALUE)
|| (itemMsglen <= 0)
- || (itemMsglen > ClusterConfigHolder.getMaxMsgSize())
+ || (itemMsglen > DataStoreUtils.MAX_MSG_DATA_STORE_SIZE)
|| (itemNext > totalBytes)) {
next = -1;
break;
@@ -438,7 +437,7 @@ public class FileSegment implements Segment {
if ((itemMsgPartId < 0)
|| (itemMsgOffset < 0)
|| (itemMsglen <= 0)
- || (itemMsglen > ClusterConfigHolder.getMaxMsgStoreLength())
+ || (itemMsglen > DataStoreUtils.STORE_MAX_MESSAGE_STORE_LEN)
|| (itemNext > totalBytes)) {
next = -1;
break;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index fe23dd3..400b666 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -35,7 +35,6 @@ import org.apache.tubemq.corebase.TErrCodeConstants;
import org.apache.tubemq.corebase.protobuf.generated.ClientBroker;
import org.apache.tubemq.corebase.utils.ServiceStatusHolder;
import org.apache.tubemq.server.broker.BrokerConfig;
-import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
import org.apache.tubemq.server.broker.msgstore.MessageStore;
import org.apache.tubemq.server.broker.stats.CountItem;
import org.apache.tubemq.server.broker.utils.DataStoreUtils;
@@ -274,7 +273,7 @@ public class MsgFileStore implements Closeable {
// skip when mismatch condition
if (curIndexDataOffset < 0
|| curIndexDataSize <= 0
- || curIndexDataSize > ClusterConfigHolder.getMaxMsgStoreLength()
+ || curIndexDataSize > DataStoreUtils.STORE_MAX_MESSAGE_STORE_LEN
|| curIndexDataOffset < curDataMinOffset) {
readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN;
continue;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/DataStoreUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/DataStoreUtils.java
index 6f0fa13..6d79bf4 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/DataStoreUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/DataStoreUtils.java
@@ -49,9 +49,6 @@ public class DataStoreUtils {
// + data 0
//
public static final int MAX_MSG_TRANSFER_SIZE = 1024 * 1024;
- public static final int MAX_MSG_DATA_STORE_SIZE =
- TBaseConstants.META_MAX_MESSAGE_DATA_SIZE * 2;
- public static final int MAX_READ_BUFFER_ADJUST = MAX_MSG_DATA_STORE_SIZE * 10;
public static final int STORE_DATA_PREFX_LEN = 48;
public static final int STORE_DATA_HEADER_LEN = STORE_DATA_PREFX_LEN + 4;
@@ -85,7 +82,9 @@ public class DataStoreUtils {
public static final int INDEX_POS_KEY_CODE = 16;
public static final int INDEX_POS_TIME_RECV = 20;
-
+ public static final int MAX_MSG_DATA_STORE_SIZE =
+ TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT
+ + TBaseConstants.META_MB_UNIT_SIZE * 8;
public static final int STORE_MAX_MESSAGE_STORE_LEN
= STORE_DATA_HEADER_LEN + MAX_MSG_DATA_STORE_SIZE;