You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/10/10 10:16:22 UTC

[inlong] branch master updated: [INLONG-6126][TubeMQ] Optimize StoreRepairAdmin logic (#6127)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d25db080e [INLONG-6126][TubeMQ] Optimize StoreRepairAdmin logic (#6127)
d25db080e is described below

commit d25db080e67000ddb5c44df984a2448d891cfe89
Author: Goson Zhang <46...@qq.com>
AuthorDate: Mon Oct 10 18:16:17 2022 +0800

    [INLONG-6126][TubeMQ] Optimize StoreRepairAdmin logic (#6127)
---
 .../inlong/tubemq/server/master/MasterConfig.java  |  2 +-
 .../master/utils/SimpleVisitTokenManager.java      |  3 +-
 .../server/master/web/MasterStatusCheckFilter.java |  5 +-
 .../tubemq/server/tools/StoreRepairAdmin.java      | 83 +++++++++++++---------
 .../tubemq/server/tools/cli/CliBrokerAdmin.java    |  1 -
 .../tubemq/server/tools/cli/CliProducer.java       |  3 +-
 6 files changed, 54 insertions(+), 43 deletions(-)

diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java
index 97887b118..6ee01cdfc 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java
@@ -62,7 +62,7 @@ public class MasterConfig extends AbstractFileConfig {
     private long stepChgWaitPeriodMs = 12 * 1000;
     private String confModAuthToken = "ASDFGHJKL";
     private String webResourcePath = "../resources";
-    private int maxGroupBrokerConsumeRate = 50;
+    private int maxGroupBrokerConsumeRate = 1000;
     private int maxGroupRebalanceWaitPeriod = 2;
     private int maxAutoForbiddenCnt = 5;
     private long socketSendBuffer = -1;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java
index afa95194c..71edb02ab 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java
@@ -57,7 +57,7 @@ public class SimpleVisitTokenManager extends AbstractDaemonService {
     protected void loopProcess(StringBuilder strBuff) {
         try {
             buildVisitTokens(false, strBuff);
-        }  catch (Throwable t) {
+        } catch (Throwable t) {
             logger.error("[VisitToken Manager] Daemon generator thread throw error ", t);
         }
     }
@@ -80,5 +80,4 @@ public class SimpleVisitTokenManager extends AbstractDaemonService {
                 .append(TokenConstants.ARRAY_SEP).append(freshVisitAuthorized.get()).toString();
         strBuff.delete(0, strBuff.length());
     }
-
 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/MasterStatusCheckFilter.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/MasterStatusCheckFilter.java
index 1a8ede854..efe0b8188 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/MasterStatusCheckFilter.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/MasterStatusCheckFilter.java
@@ -40,7 +40,6 @@ public class MasterStatusCheckFilter implements Filter {
         this.master = master;
         this.defMetaDataService =
                 this.master.getMetaDataService();
-
     }
 
     @Override
@@ -49,8 +48,8 @@ public class MasterStatusCheckFilter implements Filter {
 
     @Override
     public void doFilter(ServletRequest request,
-            ServletResponse response,
-            FilterChain chain) throws IOException, ServletException {
+                         ServletResponse response,
+                         FilterChain chain) throws IOException, ServletException {
         HttpServletRequest req = (HttpServletRequest) request;
         HttpServletResponse resp = (HttpServletResponse) response;
         if (!defMetaDataService.isSelfMaster()) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/StoreRepairAdmin.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/StoreRepairAdmin.java
index 41a19f3c0..dc52c0ca3 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/StoreRepairAdmin.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/StoreRepairAdmin.java
@@ -32,6 +32,9 @@ import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import org.apache.inlong.tubemq.corebase.utils.CheckSum;
 import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
 import org.apache.inlong.tubemq.server.broker.msgstore.disk.FileSegment;
 import org.apache.inlong.tubemq.server.broker.msgstore.disk.FileSegmentList;
@@ -164,9 +167,6 @@ public class StoreRepairAdmin {
     }
 
     private static class IndexRepairStore implements Closeable {
-        private static final String DATA_SUFFIX = ".tube";
-        private static final String INDEX_SUFFIX = ".index";
-        private static final int ONE_M_BYTES = 10 * 1024 * 1024;
         private final String topic;
         private final int storeId;
         private final String basePath;
@@ -331,9 +331,9 @@ public class StoreRepairAdmin {
             if (segments.length == 0) {
                 return;
             }
-            long gQueueOffset = -1;
             Segment curPartSeg = null;
-            final ByteBuffer dataBuffer = ByteBuffer.allocate(ONE_M_BYTES);
+            final ByteBuffer dataBuffer =
+                    ByteBuffer.allocate(TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT);
             final ByteBuffer indexBuffer =
                     ByteBuffer.allocate(DataStoreUtils.STORE_INDEX_HEAD_LEN);
             for (Segment curSegment : segments) {
@@ -344,54 +344,72 @@ public class StoreRepairAdmin {
                     long curOffset = 0L;
                     while (curOffset < curSegment.getCachedSize()) {
                         dataBuffer.clear();
-                        curSegment.read(dataBuffer, curOffset);
+                        curSegment.relRead(dataBuffer, curOffset);
                         dataBuffer.flip();
-                        int dataStart = 0;
                         int dataRealLimit = dataBuffer.limit();
-                        for (dataStart = 0; dataStart < dataRealLimit; ) {
-                            if (dataRealLimit - dataStart < DataStoreUtils.STORE_DATA_HEADER_LEN) {
-                                dataStart += DataStoreUtils.STORE_DATA_HEADER_LEN;
+                        if (dataRealLimit < DataStoreUtils.STORE_DATA_HEADER_LEN) {
+                            break;
+                        }
+                        int dataStart = 0;
+                        while (dataStart < dataRealLimit) {
+                            if (dataRealLimit - dataStart <= DataStoreUtils.STORE_DATA_HEADER_LEN) {
                                 break;
                             }
+                            // read message fields
                             final int msgLen =
                                     dataBuffer.getInt(dataStart + DataStoreUtils.STORE_HEADER_POS_LENGTH);
                             final int msgToken =
                                     dataBuffer.getInt(dataStart + DataStoreUtils.STORE_HEADER_POS_DATATYPE);
-                            if (msgToken != DataStoreUtils.STORE_DATA_TOKER_BEGIN_VALUE) {
-                                dataStart += 1;
-                                continue;
-                            }
-                            final int msgSize = msgLen + 4;
-                            final long msgOffset =
-                                    curSegment.getStart() + curOffset + dataStart;
-                            final long queueOffset =
-                                    dataBuffer.getLong(dataStart + DataStoreUtils.STORE_HEADER_POS_QUEUE_LOGICOFF);
+                            final int checkSum =
+                                    dataBuffer.getInt(dataStart + DataStoreUtils.STORE_HEADER_POS_CHECKSUM);
                             final int partitionId =
                                     dataBuffer.getInt(dataStart + DataStoreUtils.STORE_HEADER_POS_QUEUEID);
-                            final int keyCode =
-                                    dataBuffer.getInt(dataStart + DataStoreUtils.STORE_HEADER_POS_KEYCODE);
+                            final long queueOffset =
+                                    dataBuffer.getLong(dataStart + DataStoreUtils.STORE_HEADER_POS_QUEUE_LOGICOFF);
                             final long timeRecv =
                                     dataBuffer.getLong(dataStart + DataStoreUtils.STORE_HEADER_POS_RECEIVEDTIME);
-                            dataStart += msgSize;
+                            final int keyCode =
+                                    dataBuffer.getInt(dataStart + DataStoreUtils.STORE_HEADER_POS_KEYCODE);
+                            final int msgSize = msgLen + 4;
+                            final long msgOffset = queueOffset - queueOffset % DataStoreUtils.STORE_INDEX_HEAD_LEN;
+                            int payLoadLen = msgLen - DataStoreUtils.STORE_DATA_PREFX_LEN;
+                            int payLoadOffset = dataStart + DataStoreUtils.STORE_DATA_HEADER_LEN;
+                            if (msgToken != DataStoreUtils.STORE_DATA_TOKER_BEGIN_VALUE
+                                    || payLoadLen <= 0
+                                    || payLoadLen > DataStoreUtils.STORE_MAX_MESSAGE_STORE_LEN) {
+                                dataStart += 1;
+                                continue;
+                            }
+                            if (payLoadLen > (dataRealLimit
+                                    - dataStart - DataStoreUtils.STORE_DATA_HEADER_LEN)) {
+                                break;
+                            }
+                            // check message crc
+                            final byte[] payLoadData = new byte[payLoadLen];
+                            System.arraycopy(dataBuffer.array(), payLoadOffset,
+                                    payLoadData, 0, payLoadLen);
+                            if (checkSum != CheckSum.crc32(payLoadData)) {
+                                dataStart += 1;
+                                continue;
+                            }
+                            // build index item
                             indexBuffer.clear();
                             indexBuffer.putInt(partitionId);
-                            indexBuffer.putLong(msgOffset);
+                            indexBuffer.putLong(curSegment.getStart()
+                                    + curOffset + dataStart);
                             indexBuffer.putInt(msgSize);
                             indexBuffer.putInt(keyCode);
                             indexBuffer.putLong(timeRecv);
                             indexBuffer.flip();
+                            dataStart += msgSize;
                             if (curPartSeg == null) {
-                                if (gQueueOffset < 0) {
-                                    gQueueOffset = queueOffset;
-                                }
-                                File newFile =
-                                        new File(this.indexDir,
-                                                DataStoreUtils.nameFromOffset(gQueueOffset, INDEX_SUFFIX));
+                                File newFile = new File(this.indexDir,
+                                        DataStoreUtils.nameFromOffset(msgOffset, DataStoreUtils.INDEX_FILE_SUFFIX));
                                 curPartSeg =
-                                        new FileSegment(queueOffset, newFile, SegmentType.INDEX);
+                                        new FileSegment(msgOffset, newFile, SegmentType.INDEX);
                             }
+                            // append index message
                             curPartSeg.append(indexBuffer, timeRecv, timeRecv);
-                            gQueueOffset += DataStoreUtils.STORE_INDEX_HEAD_LEN;
                             if (curPartSeg.getCachedSize() >= maxIndexSegmentSize) {
                                 curPartSeg.flush(true);
                                 curPartSeg.close();
@@ -415,13 +433,10 @@ public class StoreRepairAdmin {
                 if (curPartSeg != null) {
                     curPartSeg.flush(true);
                     curPartSeg.close();
-                    curPartSeg = null;
                 }
             } catch (Throwable e2) {
                 logger.error("Close Index file error ", e2);
             }
         }
-
     }
-
 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliBrokerAdmin.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliBrokerAdmin.java
index c19f27bfe..e94a12c25 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliBrokerAdmin.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliBrokerAdmin.java
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
 /**
  * This class is use to process CLI Broker Admin process for script #{bin/tubemq-broker-admin.sh}.
  *
- *
  */
 public class CliBrokerAdmin extends CliAbstractBase {
 
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java
index d98d7a966..59fc9a5b4 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java
@@ -46,8 +46,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This class is use to process CLI Producer process for script #{bin/tubemq-producer-test.sh}.
- *
+ * This class is used to process CLI Producer process for script #{bin/tubemq-producer-test.sh}.
  *
  */
 public class CliProducer extends CliAbstractBase {