You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/02/27 03:40:28 UTC
[rocketmq] branch develop updated: [ISSUE #6121] Optimize some code style in store module (#6122)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 487473eb7 [ISSUE #6121] Optimize some code style in store module (#6122)
487473eb7 is described below
commit 487473eb74c8a2de720ee20c676405e082276963
Author: TheR1sing3un <87...@users.noreply.github.com>
AuthorDate: Mon Feb 27 11:40:20 2023 +0800
[ISSUE #6121] Optimize some code style in store module (#6122)
* style(store): optimize some typos in store module
1. optimize some typos in store module
* style(store): rename DefaultHAClient#REPORT_HEADER to DefaultHAClient#REPORT_HEADER_SIZE
1. rename DefaultHAClient#REPORT_HEADER to
DefaultHAClient#REPORT_HEADER_SIZE
* style(store): optimize comment about DefaultHAClient#REPORT_HEADER_SIZE and DefaultHAConnection#TRANSFER_HEADER_SIZE
1. optimize comment about DefaultHAClient#REPORT_HEADER_SIZE and
DefaultHAConnection#TRANSFER_HEADER_SIZE
---
.../apache/rocketmq/store/ha/DefaultHAClient.java | 30 ++++++++++++++++------
.../rocketmq/store/ha/DefaultHAConnection.java | 27 ++++++++++++++-----
.../org/apache/rocketmq/store/ha/FlowMonitor.java | 2 +-
.../store/ha/autoswitch/AutoSwitchHAClient.java | 8 +++---
4 files changed, 48 insertions(+), 19 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAClient.java b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAClient.java
index 02668558a..06878c185 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAClient.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAClient.java
@@ -35,11 +35,26 @@ import org.apache.rocketmq.store.DefaultMessageStore;
public class DefaultHAClient extends ServiceThread implements HAClient {
+ /**
+ * Report header buffer size. Schema: slaveMaxOffset. Format:
+ *
+ * <pre>
+ * ┌───────────────────────────────────────────────┐
+ * │ slaveMaxOffset │
+ * │ (8bytes) │
+ * ├───────────────────────────────────────────────┤
+ * │ │
+ * │ Report Header │
+ * </pre>
+ * <p>
+ */
+ public static final int REPORT_HEADER_SIZE = 8;
+
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
private final AtomicReference<String> masterHaAddress = new AtomicReference<>();
private final AtomicReference<String> masterAddress = new AtomicReference<>();
- private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
+ private final ByteBuffer reportOffset = ByteBuffer.allocate(REPORT_HEADER_SIZE);
private SocketChannel socketChannel;
private Selector selector;
/**
@@ -94,10 +109,10 @@ public class DefaultHAClient extends ServiceThread implements HAClient {
private boolean reportSlaveMaxOffset(final long maxOffset) {
this.reportOffset.position(0);
- this.reportOffset.limit(8);
+ this.reportOffset.limit(REPORT_HEADER_SIZE);
this.reportOffset.putLong(maxOffset);
this.reportOffset.position(0);
- this.reportOffset.limit(8);
+ this.reportOffset.limit(REPORT_HEADER_SIZE);
for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
try {
@@ -167,12 +182,11 @@ public class DefaultHAClient extends ServiceThread implements HAClient {
}
private boolean dispatchReadRequest() {
- final int msgHeaderSize = 8 + 4; // phyoffset + size
int readSocketPos = this.byteBufferRead.position();
while (true) {
int diff = this.byteBufferRead.position() - this.dispatchPosition;
- if (diff >= msgHeaderSize) {
+ if (diff >= DefaultHAConnection.TRANSFER_HEADER_SIZE) {
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
@@ -186,15 +200,15 @@ public class DefaultHAClient extends ServiceThread implements HAClient {
}
}
- if (diff >= (msgHeaderSize + bodySize)) {
+ if (diff >= (DefaultHAConnection.TRANSFER_HEADER_SIZE + bodySize)) {
byte[] bodyData = byteBufferRead.array();
- int dataStart = this.dispatchPosition + msgHeaderSize;
+ int dataStart = this.dispatchPosition + DefaultHAConnection.TRANSFER_HEADER_SIZE;
this.defaultMessageStore.appendToCommitLog(
masterPhyOffset, bodyData, dataStart, bodySize);
this.byteBufferRead.position(readSocketPos);
- this.dispatchPosition += msgHeaderSize + bodySize;
+ this.dispatchPosition += DefaultHAConnection.TRANSFER_HEADER_SIZE + bodySize;
if (!reportSlaveMaxOffsetPlus()) {
return false;
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java
index 8b3598666..e05e0ce23 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java
@@ -31,6 +31,22 @@ import org.apache.rocketmq.remoting.netty.NettySystemConfig;
import org.apache.rocketmq.store.SelectMappedBufferResult;
public class DefaultHAConnection implements HAConnection {
+
+ /**
+ * Transfer Header buffer size. Schema: physic offset and body size. Format:
+ *
+ * <pre>
+ * ┌───────────────────────────────────────────────┬───────────────────────┐
+ * │ physicOffset │ bodySize │
+ * │ (8bytes) │ (4bytes) │
+ * ├───────────────────────────────────────────────┴───────────────────────┤
+ * │ │
+ * │ Transfer Header │
+ * </pre>
+ * <p>
+ */
+ public static final int TRANSFER_HEADER_SIZE = 8 + 4;
+
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private final DefaultHAService haService;
private final SocketChannel socketChannel;
@@ -204,8 +220,8 @@ public class DefaultHAConnection implements HAConnection {
if (readSize > 0) {
readSizeZeroTimes = 0;
this.lastReadTimestamp = DefaultHAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
- if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
- int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
+ if ((this.byteBufferRead.position() - this.processPosition) >= DefaultHAClient.REPORT_HEADER_SIZE) {
+ int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % DefaultHAClient.REPORT_HEADER_SIZE);
long readOffset = this.byteBufferRead.getLong(pos - 8);
this.processPosition = pos;
@@ -239,8 +255,7 @@ public class DefaultHAConnection implements HAConnection {
private final Selector selector;
private final SocketChannel socketChannel;
- private final int headerSize = 8 + 4;
- private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(headerSize);
+ private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(TRANSFER_HEADER_SIZE);
private long nextTransferFromWhere = -1;
private SelectMappedBufferResult selectMappedBufferResult;
private boolean lastWriteOver = true;
@@ -298,7 +313,7 @@ public class DefaultHAConnection implements HAConnection {
// Build Header
this.byteBufferHeader.position(0);
- this.byteBufferHeader.limit(headerSize);
+ this.byteBufferHeader.limit(TRANSFER_HEADER_SIZE);
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
this.byteBufferHeader.putInt(0);
this.byteBufferHeader.flip();
@@ -340,7 +355,7 @@ public class DefaultHAConnection implements HAConnection {
// Build Header
this.byteBufferHeader.position(0);
- this.byteBufferHeader.limit(headerSize);
+ this.byteBufferHeader.limit(TRANSFER_HEADER_SIZE);
this.byteBufferHeader.putLong(thisOffset);
this.byteBufferHeader.putInt(size);
this.byteBufferHeader.flip();
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/FlowMonitor.java b/store/src/main/java/org/apache/rocketmq/store/ha/FlowMonitor.java
index f64fbf33a..810f2865c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/FlowMonitor.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/FlowMonitor.java
@@ -45,7 +45,7 @@ public class FlowMonitor extends ServiceThread {
}
public int canTransferMaxByteNum() {
- //Flow control is not started at present
+ // Flow control is not started at present
if (this.isFlowControlEnable()) {
long res = Math.max(this.maxTransferByteInSecond() - this.transferredByte.get(), 0);
return res > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) res;
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
index 2c3ab85f7..554c64fd1 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -495,7 +495,7 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
int masterEpoch = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE - 4);
long masterEpochStartOffset = 0;
long confirmOffset = 0;
- // if master send transfer header data, set masterEpochStartOffset and confirmOffset value.
+ // If master send transfer header data, set masterEpochStartOffset and confirmOffset value.
if (masterState == HAConnectionState.TRANSFER.ordinal() && diff >= AutoSwitchHAConnection.TRANSFER_HEADER_SIZE) {
masterEpochStartOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.TRANSFER_HEADER_SIZE - 16);
confirmOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.TRANSFER_HEADER_SIZE - 8);
@@ -509,12 +509,12 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
return false;
}
- //flag whether the received data is complete
+ // Flag whether the received data is complete
boolean isComplete = true;
switch (AutoSwitchHAClient.this.currentState) {
case HANDSHAKE: {
if (diff < AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE + bodySize) {
- //The received HANDSHAKE data is not complete
+ // The received HANDSHAKE data is not complete
isComplete = false;
break;
}
@@ -540,7 +540,7 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
break;
case TRANSFER: {
if (diff < AutoSwitchHAConnection.TRANSFER_HEADER_SIZE + bodySize) {
- //The received TRANSFER data is not complete
+ // The received TRANSFER data is not complete
isComplete = false;
break;
}