You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/02/27 03:40:28 UTC

[rocketmq] branch develop updated: [ISSUE #6121] Optimize some code style in store module (#6122)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 487473eb7 [ISSUE #6121] Optimize some code style in store module (#6122)
487473eb7 is described below

commit 487473eb74c8a2de720ee20c676405e082276963
Author: TheR1sing3un <87...@users.noreply.github.com>
AuthorDate: Mon Feb 27 11:40:20 2023 +0800

    [ISSUE #6121] Optimize some code style in store module (#6122)
    
    * style(store): optimize some typos in store module
    
    1. optimize some typos in store module
    
    * style(store): rename DefaultHAClient#REPORT_HEADER to DefaultHAClient#REPORT_HEADER_SIZE
    
    1. rename DefaultHAClient#REPORT_HEADER to
    DefaultHAClient#REPORT_HEADER_SIZE
    
    * style(store): optimize comment about DefaultHAClient#REPORT_HEADER_SIZE and DefaultHAConnection#TRANSFER_HEADER_SIZE
    
    1. optimize comment about DefaultHAClient#REPORT_HEADER_SIZE and
    DefaultHAConnection#TRANSFER_HEADER_SIZE
---
 .../apache/rocketmq/store/ha/DefaultHAClient.java  | 30 ++++++++++++++++------
 .../rocketmq/store/ha/DefaultHAConnection.java     | 27 ++++++++++++++-----
 .../org/apache/rocketmq/store/ha/FlowMonitor.java  |  2 +-
 .../store/ha/autoswitch/AutoSwitchHAClient.java    |  8 +++---
 4 files changed, 48 insertions(+), 19 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAClient.java b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAClient.java
index 02668558a..06878c185 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAClient.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAClient.java
@@ -35,11 +35,26 @@ import org.apache.rocketmq.store.DefaultMessageStore;
 
 public class DefaultHAClient extends ServiceThread implements HAClient {
 
+    /**
+     * Report header buffer size. Schema: slaveMaxOffset. Format:
+     *
+     * <pre>
+     * ┌───────────────────────────────────────────────┐
+     * │                  slaveMaxOffset               │
+     * │                    (8bytes)                   │
+     * ├───────────────────────────────────────────────┤
+     * │                                               │
+     * │                  Report Header                │
+     * </pre>
+     * <p>
+     */
+    public static final int REPORT_HEADER_SIZE = 8;
+
     private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
     private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
     private final AtomicReference<String> masterHaAddress = new AtomicReference<>();
     private final AtomicReference<String> masterAddress = new AtomicReference<>();
-    private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
+    private final ByteBuffer reportOffset = ByteBuffer.allocate(REPORT_HEADER_SIZE);
     private SocketChannel socketChannel;
     private Selector selector;
     /**
@@ -94,10 +109,10 @@ public class DefaultHAClient extends ServiceThread implements HAClient {
 
     private boolean reportSlaveMaxOffset(final long maxOffset) {
         this.reportOffset.position(0);
-        this.reportOffset.limit(8);
+        this.reportOffset.limit(REPORT_HEADER_SIZE);
         this.reportOffset.putLong(maxOffset);
         this.reportOffset.position(0);
-        this.reportOffset.limit(8);
+        this.reportOffset.limit(REPORT_HEADER_SIZE);
 
         for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
             try {
@@ -167,12 +182,11 @@ public class DefaultHAClient extends ServiceThread implements HAClient {
     }
 
     private boolean dispatchReadRequest() {
-        final int msgHeaderSize = 8 + 4; // phyoffset + size
         int readSocketPos = this.byteBufferRead.position();
 
         while (true) {
             int diff = this.byteBufferRead.position() - this.dispatchPosition;
-            if (diff >= msgHeaderSize) {
+            if (diff >= DefaultHAConnection.TRANSFER_HEADER_SIZE) {
                 long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
                 int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
 
@@ -186,15 +200,15 @@ public class DefaultHAClient extends ServiceThread implements HAClient {
                     }
                 }
 
-                if (diff >= (msgHeaderSize + bodySize)) {
+                if (diff >= (DefaultHAConnection.TRANSFER_HEADER_SIZE + bodySize)) {
                     byte[] bodyData = byteBufferRead.array();
-                    int dataStart = this.dispatchPosition + msgHeaderSize;
+                    int dataStart = this.dispatchPosition + DefaultHAConnection.TRANSFER_HEADER_SIZE;
 
                     this.defaultMessageStore.appendToCommitLog(
                         masterPhyOffset, bodyData, dataStart, bodySize);
 
                     this.byteBufferRead.position(readSocketPos);
-                    this.dispatchPosition += msgHeaderSize + bodySize;
+                    this.dispatchPosition += DefaultHAConnection.TRANSFER_HEADER_SIZE + bodySize;
 
                     if (!reportSlaveMaxOffsetPlus()) {
                         return false;
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java
index 8b3598666..e05e0ce23 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java
@@ -31,6 +31,22 @@ import org.apache.rocketmq.remoting.netty.NettySystemConfig;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 
 public class DefaultHAConnection implements HAConnection {
+
+    /**
+     * Transfer Header buffer size. Schema: physic offset and body size. Format:
+     *
+     * <pre>
+     * ┌───────────────────────────────────────────────┬───────────────────────┐
+     * │                  physicOffset                 │         bodySize      │
+     * │                    (8bytes)                   │         (4bytes)      │
+     * ├───────────────────────────────────────────────┴───────────────────────┤
+     * │                                                                       │
+     * │                           Transfer Header                             │
+     * </pre>
+     * <p>
+     */
+    public static final int TRANSFER_HEADER_SIZE = 8 + 4;
+
     private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
     private final DefaultHAService haService;
     private final SocketChannel socketChannel;
@@ -204,8 +220,8 @@ public class DefaultHAConnection implements HAConnection {
                     if (readSize > 0) {
                         readSizeZeroTimes = 0;
                         this.lastReadTimestamp = DefaultHAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
-                        if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
-                            int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
+                        if ((this.byteBufferRead.position() - this.processPosition) >= DefaultHAClient.REPORT_HEADER_SIZE) {
+                            int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % DefaultHAClient.REPORT_HEADER_SIZE);
                             long readOffset = this.byteBufferRead.getLong(pos - 8);
                             this.processPosition = pos;
 
@@ -239,8 +255,7 @@ public class DefaultHAConnection implements HAConnection {
         private final Selector selector;
         private final SocketChannel socketChannel;
 
-        private final int headerSize = 8 + 4;
-        private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(headerSize);
+        private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(TRANSFER_HEADER_SIZE);
         private long nextTransferFromWhere = -1;
         private SelectMappedBufferResult selectMappedBufferResult;
         private boolean lastWriteOver = true;
@@ -298,7 +313,7 @@ public class DefaultHAConnection implements HAConnection {
 
                             // Build Header
                             this.byteBufferHeader.position(0);
-                            this.byteBufferHeader.limit(headerSize);
+                            this.byteBufferHeader.limit(TRANSFER_HEADER_SIZE);
                             this.byteBufferHeader.putLong(this.nextTransferFromWhere);
                             this.byteBufferHeader.putInt(0);
                             this.byteBufferHeader.flip();
@@ -340,7 +355,7 @@ public class DefaultHAConnection implements HAConnection {
 
                         // Build Header
                         this.byteBufferHeader.position(0);
-                        this.byteBufferHeader.limit(headerSize);
+                        this.byteBufferHeader.limit(TRANSFER_HEADER_SIZE);
                         this.byteBufferHeader.putLong(thisOffset);
                         this.byteBufferHeader.putInt(size);
                         this.byteBufferHeader.flip();
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/FlowMonitor.java b/store/src/main/java/org/apache/rocketmq/store/ha/FlowMonitor.java
index f64fbf33a..810f2865c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/FlowMonitor.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/FlowMonitor.java
@@ -45,7 +45,7 @@ public class FlowMonitor extends ServiceThread {
     }
 
     public int canTransferMaxByteNum() {
-        //Flow control is not started at present
+        // Flow control is not started at present
         if (this.isFlowControlEnable()) {
             long res = Math.max(this.maxTransferByteInSecond() - this.transferredByte.get(), 0);
             return res > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) res;
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
index 2c3ab85f7..554c64fd1 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -495,7 +495,7 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
                         int masterEpoch = byteBufferRead.getInt(processPosition + AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE - 4);
                         long masterEpochStartOffset = 0;
                         long confirmOffset = 0;
-                        // if master send transfer header data, set masterEpochStartOffset and confirmOffset value.
+                        // If master send transfer header data, set masterEpochStartOffset and confirmOffset value.
                         if (masterState == HAConnectionState.TRANSFER.ordinal() && diff >= AutoSwitchHAConnection.TRANSFER_HEADER_SIZE) {
                             masterEpochStartOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.TRANSFER_HEADER_SIZE - 16);
                             confirmOffset = byteBufferRead.getLong(processPosition + AutoSwitchHAConnection.TRANSFER_HEADER_SIZE - 8);
@@ -509,12 +509,12 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
                             return false;
                         }
 
-                        //flag whether the received data is complete
+                        // Flag whether the received data is complete
                         boolean isComplete = true;
                         switch (AutoSwitchHAClient.this.currentState) {
                             case HANDSHAKE: {
                                 if (diff < AutoSwitchHAConnection.HANDSHAKE_HEADER_SIZE + bodySize) {
-                                    //The received HANDSHAKE data is not complete
+                                    // The received HANDSHAKE data is not complete
                                     isComplete = false;
                                     break;
                                 }
@@ -540,7 +540,7 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
                             break;
                             case TRANSFER: {
                                 if (diff < AutoSwitchHAConnection.TRANSFER_HEADER_SIZE + bodySize) {
-                                    //The received TRANSFER data is not complete
+                                    // The received TRANSFER data is not complete
                                     isComplete = false;
                                     break;
                                 }