You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/10/10 10:16:22 UTC
[inlong] branch master updated: [INLONG-6126][TubeMQ] Optimize StoreRepairAdmin logic (#6127)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d25db080e [INLONG-6126][TubeMQ] Optimize StoreRepairAdmin logic (#6127)
d25db080e is described below
commit d25db080e67000ddb5c44df984a2448d891cfe89
Author: Goson Zhang <46...@qq.com>
AuthorDate: Mon Oct 10 18:16:17 2022 +0800
[INLONG-6126][TubeMQ] Optimize StoreRepairAdmin logic (#6127)
---
.../inlong/tubemq/server/master/MasterConfig.java | 2 +-
.../master/utils/SimpleVisitTokenManager.java | 3 +-
.../server/master/web/MasterStatusCheckFilter.java | 5 +-
.../tubemq/server/tools/StoreRepairAdmin.java | 83 +++++++++++++---------
.../tubemq/server/tools/cli/CliBrokerAdmin.java | 1 -
.../tubemq/server/tools/cli/CliProducer.java | 3 +-
6 files changed, 54 insertions(+), 43 deletions(-)
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java
index 97887b118..6ee01cdfc 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java
@@ -62,7 +62,7 @@ public class MasterConfig extends AbstractFileConfig {
private long stepChgWaitPeriodMs = 12 * 1000;
private String confModAuthToken = "ASDFGHJKL";
private String webResourcePath = "../resources";
- private int maxGroupBrokerConsumeRate = 50;
+ private int maxGroupBrokerConsumeRate = 1000;
private int maxGroupRebalanceWaitPeriod = 2;
private int maxAutoForbiddenCnt = 5;
private long socketSendBuffer = -1;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java
index afa95194c..71edb02ab 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java
@@ -57,7 +57,7 @@ public class SimpleVisitTokenManager extends AbstractDaemonService {
protected void loopProcess(StringBuilder strBuff) {
try {
buildVisitTokens(false, strBuff);
- } catch (Throwable t) {
+ } catch (Throwable t) {
logger.error("[VisitToken Manager] Daemon generator thread throw error ", t);
}
}
@@ -80,5 +80,4 @@ public class SimpleVisitTokenManager extends AbstractDaemonService {
.append(TokenConstants.ARRAY_SEP).append(freshVisitAuthorized.get()).toString();
strBuff.delete(0, strBuff.length());
}
-
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/MasterStatusCheckFilter.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/MasterStatusCheckFilter.java
index 1a8ede854..efe0b8188 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/MasterStatusCheckFilter.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/MasterStatusCheckFilter.java
@@ -40,7 +40,6 @@ public class MasterStatusCheckFilter implements Filter {
this.master = master;
this.defMetaDataService =
this.master.getMetaDataService();
-
}
@Override
@@ -49,8 +48,8 @@ public class MasterStatusCheckFilter implements Filter {
@Override
public void doFilter(ServletRequest request,
- ServletResponse response,
- FilterChain chain) throws IOException, ServletException {
+ ServletResponse response,
+ FilterChain chain) throws IOException, ServletException {
HttpServletRequest req = (HttpServletRequest) request;
HttpServletResponse resp = (HttpServletResponse) response;
if (!defMetaDataService.isSelfMaster()) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/StoreRepairAdmin.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/StoreRepairAdmin.java
index 41a19f3c0..dc52c0ca3 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/StoreRepairAdmin.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/StoreRepairAdmin.java
@@ -32,6 +32,9 @@ import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import org.apache.inlong.tubemq.corebase.utils.CheckSum;
import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
import org.apache.inlong.tubemq.server.broker.msgstore.disk.FileSegment;
import org.apache.inlong.tubemq.server.broker.msgstore.disk.FileSegmentList;
@@ -164,9 +167,6 @@ public class StoreRepairAdmin {
}
private static class IndexRepairStore implements Closeable {
- private static final String DATA_SUFFIX = ".tube";
- private static final String INDEX_SUFFIX = ".index";
- private static final int ONE_M_BYTES = 10 * 1024 * 1024;
private final String topic;
private final int storeId;
private final String basePath;
@@ -331,9 +331,9 @@ public class StoreRepairAdmin {
if (segments.length == 0) {
return;
}
- long gQueueOffset = -1;
Segment curPartSeg = null;
- final ByteBuffer dataBuffer = ByteBuffer.allocate(ONE_M_BYTES);
+ final ByteBuffer dataBuffer =
+ ByteBuffer.allocate(TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT);
final ByteBuffer indexBuffer =
ByteBuffer.allocate(DataStoreUtils.STORE_INDEX_HEAD_LEN);
for (Segment curSegment : segments) {
@@ -344,54 +344,72 @@ public class StoreRepairAdmin {
long curOffset = 0L;
while (curOffset < curSegment.getCachedSize()) {
dataBuffer.clear();
- curSegment.read(dataBuffer, curOffset);
+ curSegment.relRead(dataBuffer, curOffset);
dataBuffer.flip();
- int dataStart = 0;
int dataRealLimit = dataBuffer.limit();
- for (dataStart = 0; dataStart < dataRealLimit; ) {
- if (dataRealLimit - dataStart < DataStoreUtils.STORE_DATA_HEADER_LEN) {
- dataStart += DataStoreUtils.STORE_DATA_HEADER_LEN;
+ if (dataRealLimit < DataStoreUtils.STORE_DATA_HEADER_LEN) {
+ break;
+ }
+ int dataStart = 0;
+ while (dataStart < dataRealLimit) {
+ if (dataRealLimit - dataStart <= DataStoreUtils.STORE_DATA_HEADER_LEN) {
break;
}
+ // read message fields
final int msgLen =
dataBuffer.getInt(dataStart + DataStoreUtils.STORE_HEADER_POS_LENGTH);
final int msgToken =
dataBuffer.getInt(dataStart + DataStoreUtils.STORE_HEADER_POS_DATATYPE);
- if (msgToken != DataStoreUtils.STORE_DATA_TOKER_BEGIN_VALUE) {
- dataStart += 1;
- continue;
- }
- final int msgSize = msgLen + 4;
- final long msgOffset =
- curSegment.getStart() + curOffset + dataStart;
- final long queueOffset =
- dataBuffer.getLong(dataStart + DataStoreUtils.STORE_HEADER_POS_QUEUE_LOGICOFF);
+ final int checkSum =
+ dataBuffer.getInt(dataStart + DataStoreUtils.STORE_HEADER_POS_CHECKSUM);
final int partitionId =
dataBuffer.getInt(dataStart + DataStoreUtils.STORE_HEADER_POS_QUEUEID);
- final int keyCode =
- dataBuffer.getInt(dataStart + DataStoreUtils.STORE_HEADER_POS_KEYCODE);
+ final long queueOffset =
+ dataBuffer.getLong(dataStart + DataStoreUtils.STORE_HEADER_POS_QUEUE_LOGICOFF);
final long timeRecv =
dataBuffer.getLong(dataStart + DataStoreUtils.STORE_HEADER_POS_RECEIVEDTIME);
- dataStart += msgSize;
+ final int keyCode =
+ dataBuffer.getInt(dataStart + DataStoreUtils.STORE_HEADER_POS_KEYCODE);
+ final int msgSize = msgLen + 4;
+ final long msgOffset = queueOffset - queueOffset % DataStoreUtils.STORE_INDEX_HEAD_LEN;
+ int payLoadLen = msgLen - DataStoreUtils.STORE_DATA_PREFX_LEN;
+ int payLoadOffset = dataStart + DataStoreUtils.STORE_DATA_HEADER_LEN;
+ if (msgToken != DataStoreUtils.STORE_DATA_TOKER_BEGIN_VALUE
+ || payLoadLen <= 0
+ || payLoadLen > DataStoreUtils.STORE_MAX_MESSAGE_STORE_LEN) {
+ dataStart += 1;
+ continue;
+ }
+ if (payLoadLen > (dataRealLimit
+ - dataStart - DataStoreUtils.STORE_DATA_HEADER_LEN)) {
+ break;
+ }
+ // check message crc
+ final byte[] payLoadData = new byte[payLoadLen];
+ System.arraycopy(dataBuffer.array(), payLoadOffset,
+ payLoadData, 0, payLoadLen);
+ if (checkSum != CheckSum.crc32(payLoadData)) {
+ dataStart += 1;
+ continue;
+ }
+ // build index item
indexBuffer.clear();
indexBuffer.putInt(partitionId);
- indexBuffer.putLong(msgOffset);
+ indexBuffer.putLong(curSegment.getStart()
+ + curOffset + dataStart);
indexBuffer.putInt(msgSize);
indexBuffer.putInt(keyCode);
indexBuffer.putLong(timeRecv);
indexBuffer.flip();
+ dataStart += msgSize;
if (curPartSeg == null) {
- if (gQueueOffset < 0) {
- gQueueOffset = queueOffset;
- }
- File newFile =
- new File(this.indexDir,
- DataStoreUtils.nameFromOffset(gQueueOffset, INDEX_SUFFIX));
+ File newFile = new File(this.indexDir,
+ DataStoreUtils.nameFromOffset(msgOffset, DataStoreUtils.INDEX_FILE_SUFFIX));
curPartSeg =
- new FileSegment(queueOffset, newFile, SegmentType.INDEX);
+ new FileSegment(msgOffset, newFile, SegmentType.INDEX);
}
+ // append index message
curPartSeg.append(indexBuffer, timeRecv, timeRecv);
- gQueueOffset += DataStoreUtils.STORE_INDEX_HEAD_LEN;
if (curPartSeg.getCachedSize() >= maxIndexSegmentSize) {
curPartSeg.flush(true);
curPartSeg.close();
@@ -415,13 +433,10 @@ public class StoreRepairAdmin {
if (curPartSeg != null) {
curPartSeg.flush(true);
curPartSeg.close();
- curPartSeg = null;
}
} catch (Throwable e2) {
logger.error("Close Index file error ", e2);
}
}
-
}
-
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliBrokerAdmin.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliBrokerAdmin.java
index c19f27bfe..e94a12c25 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliBrokerAdmin.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliBrokerAdmin.java
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
/**
* This class is use to process CLI Broker Admin process for script #{bin/tubemq-broker-admin.sh}.
*
- *
*/
public class CliBrokerAdmin extends CliAbstractBase {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java
index d98d7a966..59fc9a5b4 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java
@@ -46,8 +46,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * This class is use to process CLI Producer process for script #{bin/tubemq-producer-test.sh}.
- *
+ * This class is used to process CLI Producer process for script #{bin/tubemq-producer-test.sh}.
*
*/
public class CliProducer extends CliAbstractBase {