You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/03/10 05:39:49 UTC
[rocketmq] branch develop updated: [ISSUE #6306] Fix unexpected state from slave (#6307)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 7b23042eb [ISSUE #6306] Fix unexpected state from slave (#6307)
7b23042eb is described below
commit 7b23042eb2a0f445d0e06570f481dd4967eb99c5
Author: fujian-zfj <fu...@alibaba-inc.com>
AuthorDate: Fri Mar 10 13:39:28 2023 +0800
[ISSUE #6306] Fix unexpected state from slave (#6307)
* typo int readme[ecosystem]
* fix unexpected state from slave
---
.../store/ha/autoswitch/AutoSwitchHAClient.java | 24 +++++++++++-----------
.../ha/autoswitch/AutoSwitchHAConnection.java | 2 +-
2 files changed, 13 insertions(+), 13 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
index b95d3814a..49a59e251 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -332,21 +332,21 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
}
}
- private boolean reportSlaveOffset(final long offsetToReport) throws IOException {
+ private boolean reportSlaveOffset(HAConnectionState currentState, final long offsetToReport) throws IOException {
this.transferHeaderBuffer.position(0);
this.transferHeaderBuffer.limit(TRANSFER_HEADER_SIZE);
- this.transferHeaderBuffer.putInt(this.currentState.ordinal());
+ this.transferHeaderBuffer.putInt(currentState.ordinal());
this.transferHeaderBuffer.putLong(offsetToReport);
this.transferHeaderBuffer.flip();
return this.haWriter.write(this.socketChannel, this.transferHeaderBuffer);
}
- private boolean reportSlaveMaxOffset() throws IOException {
+ private boolean reportSlaveMaxOffset(HAConnectionState currentState) throws IOException {
boolean result = true;
final long maxPhyOffset = this.messageStore.getMaxPhyOffset();
if (maxPhyOffset > this.currentReportedOffset) {
this.currentReportedOffset = maxPhyOffset;
- result = reportSlaveOffset(this.currentReportedOffset);
+ result = reportSlaveOffset(currentState, this.currentReportedOffset);
}
return result;
}
@@ -369,11 +369,11 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
return this.socketChannel != null;
}
- private boolean transferFromMaster() throws IOException {
+ private boolean transferFromMaster(HAConnectionState currentState) throws IOException {
boolean result;
if (isTimeToReportOffset()) {
LOGGER.info("Slave report current offset {}", this.currentReportedOffset);
- result = reportSlaveOffset(this.currentReportedOffset);
+ result = reportSlaveOffset(currentState, this.currentReportedOffset);
if (!result) {
return false;
}
@@ -386,7 +386,7 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
return false;
}
- return this.reportSlaveMaxOffset();
+ return this.reportSlaveMaxOffset(currentState);
}
@Override
@@ -415,7 +415,7 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
handshakeWithMaster();
continue;
case TRANSFER:
- if (!transferFromMaster()) {
+ if (!transferFromMaster(HAConnectionState.TRANSFER)) {
closeMasterAndWait();
continue;
}
@@ -445,7 +445,7 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
/**
* Compare the master and slave's epoch file, find consistent point, do truncate.
*/
- private boolean doTruncate(List<EpochEntry> masterEpochEntries, long masterEndOffset) throws IOException {
+ private boolean doTruncate(List<EpochEntry> masterEpochEntries, long masterEndOffset, HAConnectionState currentState) throws IOException {
if (this.epochCache.getEntrySize() == 0) {
// If epochMap is empty, means the broker is a new replicas
LOGGER.info("Slave local epochCache is empty, skip truncate log");
@@ -475,7 +475,7 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
changeCurrentState(HAConnectionState.TRANSFER);
this.currentReportedOffset = truncateOffset;
}
- if (!reportSlaveMaxOffset()) {
+ if (!reportSlaveMaxOffset(currentState)) {
LOGGER.error("AutoSwitchHAClient report max offset to master failed");
return false;
}
@@ -534,7 +534,7 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
byteBufferRead.position(readSocketPos);
AutoSwitchHAClient.this.processPosition += bodySize;
LOGGER.info("Receive handshake, masterMaxPosition {}, masterEpochEntries:{}, try truncate log", masterOffset, epochEntries);
- if (!doTruncate(epochEntries, masterOffset)) {
+ if (!doTruncate(epochEntries, masterOffset, HAConnectionState.HANDSHAKE)) {
waitForRunning(1000 * 2);
LOGGER.error("AutoSwitchHAClient truncate log failed in handshake state");
return false;
@@ -573,7 +573,7 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
haService.updateConfirmOffset(Math.min(confirmOffset, messageStore.getMaxPhyOffset()));
- if (!reportSlaveMaxOffset()) {
+ if (!reportSlaveMaxOffset(HAConnectionState.TRANSFER)) {
LOGGER.error("AutoSwitchHAClient report max offset to master failed");
return false;
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index 57f9e9619..8f79b55a9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -363,7 +363,7 @@ public class AutoSwitchHAConnection implements HAConnection {
break;
default:
LOGGER.error("Current state illegal {}", currentState);
- break;
+ return false;
}
if (!slaveState.equals(currentState)) {