You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2021/01/13 03:06:10 UTC

[incubator-tubemq] 01/02: [TUBEMQ-509] Adjust the packet length check when data is loaded

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit cee03a65d0b39af6601af2ff7608bc341ca8f638
Author: gosonzhang <go...@tencent.com>
AuthorDate: Tue Jan 12 14:30:06 2021 +0800

    [TUBEMQ-509] Adjust the packet length check when data is loaded
---
 .../apache/tubemq/server/broker/metadata/ClusterConfigHolder.java | 8 --------
 .../apache/tubemq/server/broker/msgstore/disk/FileSegment.java    | 5 ++---
 .../apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java   | 3 +--
 .../org/apache/tubemq/server/broker/utils/DataStoreUtils.java     | 7 +++----
 4 files changed, 6 insertions(+), 17 deletions(-)

diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java
index c72427d..134e2ef 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java
@@ -33,10 +33,6 @@ public class ClusterConfigHolder {
                     + TBaseConstants.META_MAX_MESSAGE_HEADER_SIZE);
     private static AtomicInteger minMemCacheSize =
             new AtomicInteger(TBaseConstants.META_MIN_MEM_BUFFER_SIZE);
-    private static AtomicInteger maxMsgStoreLength =
-            new AtomicInteger(TBaseConstants.META_MAX_MESSAGE_DATA_SIZE
-                    + TBaseConstants.META_MAX_MESSAGE_HEADER_SIZE
-                    + DataStoreUtils.STORE_DATA_HEADER_LEN);
 
     public ClusterConfigHolder() {
 
@@ -58,7 +54,6 @@ public class ClusterConfigHolder {
                     maxMsgSize.set(tmpMaxSize);
                     minMemCacheSize.set(tmpMaxSize +
                             (tmpMaxSize % 4 + 1) * TBaseConstants.META_MESSAGE_SIZE_ADJUST);
-                    maxMsgStoreLength.set(tmpMaxSize + DataStoreUtils.STORE_DATA_HEADER_LEN);
                 }
             }
         }
@@ -76,7 +71,4 @@ public class ClusterConfigHolder {
         return minMemCacheSize.get();
     }
 
-    public static int getMaxMsgStoreLength() {
-        return maxMsgStoreLength.get();
-    }
 }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java
index 6fda352..949d149 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java
@@ -25,7 +25,6 @@ import java.nio.channels.FileChannel;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.tubemq.corebase.utils.CheckSum;
-import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
 import org.apache.tubemq.server.broker.utils.DataStoreUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -366,7 +365,7 @@ public class FileSegment implements Segment {
                 itemNext = validBytes + DataStoreUtils.STORE_DATA_HEADER_LEN + itemMsglen;
                 if ((itemMsgToken != DataStoreUtils.STORE_DATA_TOKER_BEGIN_VALUE)
                         || (itemMsglen <= 0)
-                        || (itemMsglen > ClusterConfigHolder.getMaxMsgSize())
+                        || (itemMsglen > DataStoreUtils.MAX_MSG_DATA_STORE_SIZE)
                         || (itemNext > totalBytes)) {
                     next = -1;
                     break;
@@ -438,7 +437,7 @@ public class FileSegment implements Segment {
                 if ((itemMsgPartId < 0)
                         || (itemMsgOffset < 0)
                         || (itemMsglen <= 0)
-                        || (itemMsglen > ClusterConfigHolder.getMaxMsgStoreLength())
+                        || (itemMsglen > DataStoreUtils.STORE_MAX_MESSAGE_STORE_LEN)
                         || (itemNext > totalBytes)) {
                     next = -1;
                     break;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index fe23dd3..400b666 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -35,7 +35,6 @@ import org.apache.tubemq.corebase.TErrCodeConstants;
 import org.apache.tubemq.corebase.protobuf.generated.ClientBroker;
 import org.apache.tubemq.corebase.utils.ServiceStatusHolder;
 import org.apache.tubemq.server.broker.BrokerConfig;
-import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
 import org.apache.tubemq.server.broker.msgstore.MessageStore;
 import org.apache.tubemq.server.broker.stats.CountItem;
 import org.apache.tubemq.server.broker.utils.DataStoreUtils;
@@ -274,7 +273,7 @@ public class MsgFileStore implements Closeable {
             // skip when mismatch condition
             if (curIndexDataOffset < 0
                     || curIndexDataSize <= 0
-                    || curIndexDataSize > ClusterConfigHolder.getMaxMsgStoreLength()
+                    || curIndexDataSize > DataStoreUtils.STORE_MAX_MESSAGE_STORE_LEN
                     || curIndexDataOffset < curDataMinOffset) {
                 readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN;
                 continue;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/DataStoreUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/DataStoreUtils.java
index 6f0fa13..6d79bf4 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/DataStoreUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/DataStoreUtils.java
@@ -49,9 +49,6 @@ public class DataStoreUtils {
     // + data               0
     //
     public static final int MAX_MSG_TRANSFER_SIZE = 1024 * 1024;
-    public static final int MAX_MSG_DATA_STORE_SIZE =
-            TBaseConstants.META_MAX_MESSAGE_DATA_SIZE * 2;
-    public static final int MAX_READ_BUFFER_ADJUST = MAX_MSG_DATA_STORE_SIZE * 10;
 
     public static final int STORE_DATA_PREFX_LEN = 48;
     public static final int STORE_DATA_HEADER_LEN = STORE_DATA_PREFX_LEN + 4;
@@ -85,7 +82,9 @@ public class DataStoreUtils {
     public static final int INDEX_POS_KEY_CODE = 16;
     public static final int INDEX_POS_TIME_RECV = 20;
 
-
+    public static final int MAX_MSG_DATA_STORE_SIZE =
+            TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT
+                    + TBaseConstants.META_MB_UNIT_SIZE * 8;
     public static final int STORE_MAX_MESSAGE_STORE_LEN
             = STORE_DATA_HEADER_LEN + MAX_MSG_DATA_STORE_SIZE;