You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/05/21 02:48:06 UTC

[GitHub] [rocketmq] hzh0425 opened a new pull request, #4355: [Summer of code] Modify the definition of syncStateSet

hzh0425 opened a new pull request, #4355:
URL: https://github.com/apache/rocketmq/pull/4355

   ## What is the purpose of the change
   
   tracking issue: https://github.com/apache/rocketmq/issues/4330
   
   ## Brief changelog
   
   Modify the definition of syncStateSet in AutoSwitchHASerivce, and introduce the confirmOffset mechanism.
   
   ## Verifying this change
   
   XXXX
   
   Follow this checklist to help us incorporate your contribution quickly and easily. Notice, `it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR`.
   
   - [x] Make sure there is a [Github issue](https://github.com/apache/rocketmq/issues) filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue. 
   - [x] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
   - [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
   - [x] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/rocketmq/tree/master/test).
   - [x] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test`  to make sure integration-test pass.
   - [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #4355: [Summer of code] Shrink and expand InSyncStateSet

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on code in PR #4355:
URL: https://github.com/apache/rocketmq/pull/4355#discussion_r879109393


##########
store/src/main/java/org/apache/rocketmq/store/CommitLog.java:
##########
@@ -1077,13 +1077,13 @@ private boolean needHandleHA(MessageExt messageExt) {
     }
 
     private CompletableFuture<PutMessageResult> handleDiskFlushAndHA(PutMessageResult putMessageResult,
-        MessageExt messageExt, int needAckNums, boolean needHandleHA) {
+        MessageExt messageExt, int needAckNums, boolean needHandleHA, boolean allAckInSyncStateSet) {

Review Comment:
   是不是可以needAckNums=-1代表allAckInSyncStateSet,而不需要allAckInSyncStateSet参数一直往下传



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java:
##########
@@ -89,12 +103,14 @@ public boolean changeToMaster(int masterEpoch) {
         this.epochCache.appendEntry(newEpochEntry);
 
         this.defaultMessageStore.recoverTopicQueueTable();
+
+        this.syncStateSet.clear();
+        this.syncStateSet.add(this.localAddress);

Review Comment:
   再另一个线程里面会用syncStateSet.size判断ack,但这里更新分成了两步,所以应该会并发问题



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java:
##########
@@ -118,28 +134,93 @@ public boolean changeToSlave(String newMasterAddr, int newMasterEpoch, Long slav
         }
     }
 
-    @Override
+    @Override public HAClient getHAClient() {
+        return this.haClient;
+    }
+
+    @Override public void updateHaMasterAddress(String newAddr) {
+        if (this.haClient != null) {
+            this.haClient.updateHaMasterAddress(newAddr);
+        }
+    }
+
+    @Override public void updateMasterAddress(String newAddr) {
+    }
+
+    public void registerSyncStateSetChangedListener(final Consumer<Set<String>> listener) {
+        this.syncStateSetChangedListeners.add(listener);
+    }
+
+    public void notifySyncStateSetChanged(final Set<String> newSyncStateSet) {
+        this.executorService.submit(() -> {
+            for (Consumer<Set<String>> listener : syncStateSetChangedListeners) {
+                listener.accept(newSyncStateSet);
+            }
+        });
+    }
+
     public void setSyncStateSet(final Set<String> syncStateSet) {
-        this.syncStateSet = new HashSet<>(syncStateSet);
+        this.syncStateSet.clear();
+        this.syncStateSet.addAll(syncStateSet);

Review Comment:
   再另一个线程里面会用syncStateSet.size判断ack,但这里更新分成了两步,所以应该会并发问题



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java:
##########
@@ -279,12 +296,17 @@ protected boolean processReadResult(ByteBuffer byteBufferRead) {
                                     long slaveMaxOffset = byteBufferRead.getLong(readPosition + 4);
                                     ReadSocketService.this.processPosition += AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
 
-                                    slaveAckOffset = slaveMaxOffset;
+                                    AutoSwitchHAConnection.this.slaveAckOffset = slaveMaxOffset;
                                     if (slaveRequestOffset < 0) {
                                         slaveRequestOffset = slaveMaxOffset;
                                     }
                                     LOGGER.info("slave[" + clientAddress + "] request offset " + slaveMaxOffset);
                                     byteBufferRead.position(readSocketPos);
+                                    if (slaveMaxOffset >= AutoSwitchHAConnection.this.lastMasterMaxOffset) {
+                                        AutoSwitchHAConnection.this.lastCatchUpTimeMs = Math.max(AutoSwitchHAConnection.this.lastTransferTimeMs, AutoSwitchHAConnection.this.lastCatchUpTimeMs);
+                                        AutoSwitchHAConnection.this.haService.maybeExpandInSyncStateSet(AutoSwitchHAConnection.this.slaveAddress, slaveMaxOffset);
+                                    }

Review Comment:
   这里应该也有并发问题,比如slaveMaxOffset >= AutoSwitchHAConnection.this.lastMasterMaxOffset,但写那里lastTransferTimeMs已经更新了,导致lastCatchUpTimeMs错误。



##########
store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java:
##########
@@ -71,28 +74,51 @@ private void doWaitTransfer() {
                     boolean transferOK = false;
 
                     long deadLine = req.getDeadLine();
+                    final boolean allAckInSyncStateSet = req.isAllAckInSyncStateSet();
 
                     for (int i = 0; !transferOK && deadLine - System.nanoTime() > 0; i++) {
                         if (i > 0) {
                             this.notifyTransferObject.waitForRunning(1000);
                         }
 
-                        if (req.getAckNums() <= 1) {
+                        if (req.getAckNums() <= 1 && !allAckInSyncStateSet) {
                             transferOK = haService.getPush2SlaveMaxOffset().get() >= req.getNextOffset();
                             continue;
                         }
 
-                        int ackNums = 0;
-                        for (HAConnection conn : haService.getConnectionList()) {
-                            // TODO: We must ensure every HAConnection represents a different slave
-                            // Solution: Consider assign a unique and fixed IP:ADDR for each different slave
-                            if (conn.getSlaveAckOffset() >= req.getNextOffset()) {
-                                ackNums++;
-                            }
-                            if (ackNums >= req.getAckNums()) {
+                        if (allAckInSyncStateSet && this.haService instanceof AutoSwitchHAService) {
+                            // In this mode, we must wait for all replicas that in InSyncStateSet.
+                            final AutoSwitchHAService autoSwitchHAService = (AutoSwitchHAService) this.haService;
+                            final Set<String> syncStateSet = autoSwitchHAService.getSyncStateSet();
+                            if (syncStateSet.size() <= 1) {
+                                // Only master
                                 transferOK = true;
                                 break;
                             }
+                            int ackNums = 0;
+                            for (HAConnection conn : haService.getConnectionList()) {
+                                final AutoSwitchHAConnection autoSwitchHAConnection = (AutoSwitchHAConnection) conn;
+                                if (syncStateSet.contains(autoSwitchHAConnection.getClientAddress()) && autoSwitchHAConnection.getSlaveAckOffset() >= req.getNextOffset()) {
+                                    ackNums ++;
+                                }
+                                if (ackNums >= syncStateSet.size()) {
+                                    transferOK = true;
+                                    break;
+                                }
+                            }

Review Comment:
   这里的ackNums没有把master自己算进去。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] hzh0425 commented on pull request #4355: [Summer of code] Shrink and expand InSyncStateSet

Posted by GitBox <gi...@apache.org>.
hzh0425 commented on PR #4355:
URL: https://github.com/apache/rocketmq/pull/4355#issuecomment-1134146082

   /cc @RongtongJin 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] coveralls commented on pull request #4355: [Summer of code] Modify the definition of syncStateSet

Posted by GitBox <gi...@apache.org>.
coveralls commented on PR #4355:
URL: https://github.com/apache/rocketmq/pull/4355#issuecomment-1133527727

   
   [![Coverage Status](https://coveralls.io/builds/49333221/badge)](https://coveralls.io/builds/49333221)
   
   Coverage decreased (-0.05%) to 47.646% when pulling **51de8dce6340599710ae970fadb31dbd93ca0bbb on hzh0425:feature/confirm_offset** into **f905da411a71eecba9f896db52227a7b3081b6a6 on apache:5.0.0-beta-dledger-controller**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] codecov-commenter commented on pull request #4355: [Summer of code] Modify the definition of syncStateSet

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #4355:
URL: https://github.com/apache/rocketmq/pull/4355#issuecomment-1133525867

   # [Codecov](https://codecov.io/gh/apache/rocketmq/pull/4355?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#4355](https://codecov.io/gh/apache/rocketmq/pull/4355?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (51de8dc) into [5.0.0-beta-dledger-controller](https://codecov.io/gh/apache/rocketmq/commit/f905da411a71eecba9f896db52227a7b3081b6a6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f905da4) will **decrease** coverage by `0.06%`.
   > The diff coverage is `65.11%`.
   
   ```diff
   @@                         Coverage Diff                         @@
   ##             5.0.0-beta-dledger-controller    #4355      +/-   ##
   ===================================================================
   - Coverage                            43.50%   43.44%   -0.07%     
   - Complexity                            6317     6323       +6     
   ===================================================================
     Files                                  835      835              
     Lines                                59285    59373      +88     
     Branches                              8082     8095      +13     
   ===================================================================
   + Hits                                 25791    25792       +1     
   - Misses                               30164    30240      +76     
   - Partials                              3330     3341      +11     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/rocketmq/pull/4355?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../rocketmq/broker/hacontroller/ReplicasManager.java](https://codecov.io/gh/apache/rocketmq/pull/4355/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvaGFjb250cm9sbGVyL1JlcGxpY2FzTWFuYWdlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...org/apache/rocketmq/store/ha/DefaultHAService.java](https://codecov.io/gh/apache/rocketmq/pull/4355/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2hhL0RlZmF1bHRIQVNlcnZpY2UuamF2YQ==) | `66.66% <ø> (+0.79%)` | :arrow_up: |
   | [...ache/rocketmq/store/config/MessageStoreConfig.java](https://codecov.io/gh/apache/rocketmq/pull/4355/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2NvbmZpZy9NZXNzYWdlU3RvcmVDb25maWcuamF2YQ==) | `55.92% <62.50%> (+0.10%)` | :arrow_up: |
   | [...apache/rocketmq/store/ha/GroupTransferService.java](https://codecov.io/gh/apache/rocketmq/pull/4355/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2hhL0dyb3VwVHJhbnNmZXJTZXJ2aWNlLmphdmE=) | `83.33% <62.50%> (-10.11%)` | :arrow_down: |
   | [...ketmq/store/ha/autoswitch/AutoSwitchHAService.java](https://codecov.io/gh/apache/rocketmq/pull/4355/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2hhL2F1dG9zd2l0Y2gvQXV0b1N3aXRjaEhBU2VydmljZS5qYXZh) | `56.52% <70.14%> (+14.55%)` | :arrow_up: |
   | [...main/java/org/apache/rocketmq/store/CommitLog.java](https://codecov.io/gh/apache/rocketmq/pull/4355/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL0NvbW1pdExvZy5qYXZh) | `67.96% <88.88%> (+0.29%)` | :arrow_up: |
   | [...mq/store/ha/autoswitch/AutoSwitchHAConnection.java](https://codecov.io/gh/apache/rocketmq/pull/4355/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2hhL2F1dG9zd2l0Y2gvQXV0b1N3aXRjaEhBQ29ubmVjdGlvbi5qYXZh) | `73.18% <90.00%> (-2.18%)` | :arrow_down: |
   | [.../apache/rocketmq/store/ha/io/AbstractHAReader.java](https://codecov.io/gh/apache/rocketmq/pull/4355/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2hhL2lvL0Fic3RyYWN0SEFSZWFkZXIuamF2YQ==) | `79.31% <0.00%> (-10.35%)` | :arrow_down: |
   | [...e/rocketmq/store/ha/autoswitch/EpochFileCache.java](https://codecov.io/gh/apache/rocketmq/pull/4355/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2hhL2F1dG9zd2l0Y2gvRXBvY2hGaWxlQ2FjaGUuamF2YQ==) | `79.16% <0.00%> (-7.64%)` | :arrow_down: |
   | [...cketmq/store/ha/autoswitch/AutoSwitchHAClient.java](https://codecov.io/gh/apache/rocketmq/pull/4355/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2hhL2F1dG9zd2l0Y2gvQXV0b1N3aXRjaEhBQ2xpZW50LmphdmE=) | `75.56% <0.00%> (-3.39%)` | :arrow_down: |
   | ... and [22 more](https://codecov.io/gh/apache/rocketmq/pull/4355/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/rocketmq/pull/4355?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/rocketmq/pull/4355?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [f905da4...51de8dc](https://codecov.io/gh/apache/rocketmq/pull/4355?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #4355: [Summer of code] Shrink and expand InSyncStateSet

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on code in PR #4355:
URL: https://github.com/apache/rocketmq/pull/4355#discussion_r880001214


##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java:
##########
@@ -89,12 +103,14 @@ public boolean changeToMaster(int masterEpoch) {
         this.epochCache.appendEntry(newEpochEntry);
 
         this.defaultMessageStore.recoverTopicQueueTable();
+
+        this.syncStateSet.clear();
+        this.syncStateSet.add(this.localAddress);

Review Comment:
   应该调用setSyncStateSet方法



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #4355: [Summer of code] Shrink and expand InSyncStateSet

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on code in PR #4355:
URL: https://github.com/apache/rocketmq/pull/4355#discussion_r880143036


##########
store/src/main/java/org/apache/rocketmq/store/CommitLog.java:
##########
@@ -805,13 +805,17 @@ public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBroke
         boolean needHandleHA = needHandleHA(msg);
         int needAckNums = 1;
 
-        if (needHandleHA) {
-            int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
-                this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1);
-            needAckNums = calcNeedAckNums(inSyncReplicas);
-            if (needAckNums > inSyncReplicas) {
-                // Tell the producer, don't have enough slaves to handle the send request
-                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
+        if (this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {
+            needAckNums = -1;
+        } else {
+            if (needHandleHA) {
+                int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
+                    this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1);
+                needAckNums = calcNeedAckNums(inSyncReplicas);
+                if (needAckNums > inSyncReplicas) {
+                    // Tell the producer, don't have enough slaves to handle the send request
+                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
+                }

Review Comment:
   可以放在一个if else里,不需要else里面嵌套



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] hzh0425 commented on a diff in pull request #4355: [Summer of code] Shrink and expand InSyncStateSet

Posted by GitBox <gi...@apache.org>.
hzh0425 commented on code in PR #4355:
URL: https://github.com/apache/rocketmq/pull/4355#discussion_r879522362


##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java:
##########
@@ -279,12 +296,17 @@ protected boolean processReadResult(ByteBuffer byteBufferRead) {
                                     long slaveMaxOffset = byteBufferRead.getLong(readPosition + 4);
                                     ReadSocketService.this.processPosition += AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
 
-                                    slaveAckOffset = slaveMaxOffset;
+                                    AutoSwitchHAConnection.this.slaveAckOffset = slaveMaxOffset;
                                     if (slaveRequestOffset < 0) {
                                         slaveRequestOffset = slaveMaxOffset;
                                     }
                                     LOGGER.info("slave[" + clientAddress + "] request offset " + slaveMaxOffset);
                                     byteBufferRead.position(readSocketPos);
+                                    if (slaveMaxOffset >= AutoSwitchHAConnection.this.lastMasterMaxOffset) {
+                                        AutoSwitchHAConnection.this.lastCatchUpTimeMs = Math.max(AutoSwitchHAConnection.this.lastTransferTimeMs, AutoSwitchHAConnection.this.lastCatchUpTimeMs);
+                                        AutoSwitchHAConnection.this.haService.maybeExpandInSyncStateSet(AutoSwitchHAConnection.this.slaveAddress, slaveMaxOffset);
+                                    }

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin merged pull request #4355: [Summer of code] Shrink and expand InSyncStateSet

Posted by GitBox <gi...@apache.org>.
RongtongJin merged PR #4355:
URL: https://github.com/apache/rocketmq/pull/4355


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org