You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/03/10 05:39:49 UTC

[rocketmq] branch develop updated: [ISSUE #6306] Fix unexpected state from slave (#6307)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7b23042eb [ISSUE #6306] Fix unexpected state from slave (#6307)
7b23042eb is described below

commit 7b23042eb2a0f445d0e06570f481dd4967eb99c5
Author: fujian-zfj <fu...@alibaba-inc.com>
AuthorDate: Fri Mar 10 13:39:28 2023 +0800

    [ISSUE #6306] Fix unexpected state from slave (#6307)
    
    * typo int readme[ecosystem]
    
    * fix unexpected state from slave
---
 .../store/ha/autoswitch/AutoSwitchHAClient.java    | 24 +++++++++++-----------
 .../ha/autoswitch/AutoSwitchHAConnection.java      |  2 +-
 2 files changed, 13 insertions(+), 13 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
index b95d3814a..49a59e251 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -332,21 +332,21 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
         }
     }
 
-    private boolean reportSlaveOffset(final long offsetToReport) throws IOException {
+    private boolean reportSlaveOffset(HAConnectionState currentState, final long offsetToReport) throws IOException {
         this.transferHeaderBuffer.position(0);
         this.transferHeaderBuffer.limit(TRANSFER_HEADER_SIZE);
-        this.transferHeaderBuffer.putInt(this.currentState.ordinal());
+        this.transferHeaderBuffer.putInt(currentState.ordinal());
         this.transferHeaderBuffer.putLong(offsetToReport);
         this.transferHeaderBuffer.flip();
         return this.haWriter.write(this.socketChannel, this.transferHeaderBuffer);
     }
 
-    private boolean reportSlaveMaxOffset() throws IOException {
+    private boolean reportSlaveMaxOffset(HAConnectionState currentState) throws IOException {
         boolean result = true;
         final long maxPhyOffset = this.messageStore.getMaxPhyOffset();
         if (maxPhyOffset > this.currentReportedOffset) {
             this.currentReportedOffset = maxPhyOffset;
-            result = reportSlaveOffset(this.currentReportedOffset);
+            result = reportSlaveOffset(currentState, this.currentReportedOffset);
         }
         return result;
     }
@@ -369,11 +369,11 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
         return this.socketChannel != null;
     }
 
-    private boolean transferFromMaster() throws IOException {
+    private boolean transferFromMaster(HAConnectionState currentState) throws IOException {
         boolean result;
         if (isTimeToReportOffset()) {
             LOGGER.info("Slave report current offset {}", this.currentReportedOffset);
-            result = reportSlaveOffset(this.currentReportedOffset);
+            result = reportSlaveOffset(currentState, this.currentReportedOffset);
             if (!result) {
                 return false;
             }
@@ -386,7 +386,7 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
             return false;
         }
 
-        return this.reportSlaveMaxOffset();
+        return this.reportSlaveMaxOffset(currentState);
     }
 
     @Override
@@ -415,7 +415,7 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
                         handshakeWithMaster();
                         continue;
                     case TRANSFER:
-                        if (!transferFromMaster()) {
+                        if (!transferFromMaster(HAConnectionState.TRANSFER)) {
                             closeMasterAndWait();
                             continue;
                         }
@@ -445,7 +445,7 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
     /**
      * Compare the master and slave's epoch file, find consistent point, do truncate.
      */
-    private boolean doTruncate(List<EpochEntry> masterEpochEntries, long masterEndOffset) throws IOException {
+    private boolean doTruncate(List<EpochEntry> masterEpochEntries, long masterEndOffset, HAConnectionState currentState) throws IOException {
         if (this.epochCache.getEntrySize() == 0) {
             // If epochMap is empty, means the broker is a new replicas
             LOGGER.info("Slave local epochCache is empty, skip truncate log");
@@ -475,7 +475,7 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
             changeCurrentState(HAConnectionState.TRANSFER);
             this.currentReportedOffset = truncateOffset;
         }
-        if (!reportSlaveMaxOffset()) {
+        if (!reportSlaveMaxOffset(currentState)) {
             LOGGER.error("AutoSwitchHAClient report max offset to master failed");
             return false;
         }
@@ -534,7 +534,7 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
                                 byteBufferRead.position(readSocketPos);
                                 AutoSwitchHAClient.this.processPosition += bodySize;
                                 LOGGER.info("Receive handshake, masterMaxPosition {}, masterEpochEntries:{}, try truncate log", masterOffset, epochEntries);
-                                if (!doTruncate(epochEntries, masterOffset)) {
+                                if (!doTruncate(epochEntries, masterOffset, HAConnectionState.HANDSHAKE)) {
                                     waitForRunning(1000 * 2);
                                     LOGGER.error("AutoSwitchHAClient truncate log failed in handshake state");
                                     return false;
@@ -573,7 +573,7 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
 
                                 haService.updateConfirmOffset(Math.min(confirmOffset, messageStore.getMaxPhyOffset()));
 
-                                if (!reportSlaveMaxOffset()) {
+                                if (!reportSlaveMaxOffset(HAConnectionState.TRANSFER)) {
                                     LOGGER.error("AutoSwitchHAClient report max offset to master failed");
                                     return false;
                                 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index 57f9e9619..8f79b55a9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -363,7 +363,7 @@ public class AutoSwitchHAConnection implements HAConnection {
                                 break;
                             default:
                                 LOGGER.error("Current state illegal {}", currentState);
-                                break;
+                                return false;
                         }
 
                         if (!slaveState.equals(currentState)) {