You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/05/15 04:58:30 UTC

[rocketmq] branch develop updated: [ISSUE #4315] Optimize ha module's code

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new a28dc651d [ISSUE #4315] Optimize ha module's code
a28dc651d is described below

commit a28dc651dcd1b250cb50ee29f036e04e97d8c3b8
Author: hzh0425 <64...@qq.com>
AuthorDate: Sun May 15 12:58:11 2022 +0800

    [ISSUE #4315] Optimize ha module's code
    
    * feature: optimize ha module code
    
    * review
---
 .../org/apache/rocketmq/store/ha/HAConnection.java | 48 +++++++++-------------
 .../org/apache/rocketmq/store/ha/HAService.java    |  7 ++--
 2 files changed, 23 insertions(+), 32 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
index 4c26971c0..5d88f00f2 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
@@ -34,8 +34,8 @@ public class HAConnection {
     private final HAService haService;
     private final SocketChannel socketChannel;
     private final String clientAddr;
-    private WriteSocketService writeSocketService;
-    private ReadSocketService readSocketService;
+    private final WriteSocketService writeSocketService;
+    private final ReadSocketService readSocketService;
 
     private volatile long slaveRequestOffset = -1;
     private volatile long slaveAckOffset = -1;
@@ -83,6 +83,22 @@ public class HAConnection {
         return socketChannel;
     }
 
+    private void stopChannelAndSelector(SocketChannel channel, Selector selector, String serviceName) {
+        SelectionKey sk = channel.keyFor(selector);
+        if (sk != null) {
+            sk.cancel();
+        }
+
+        try {
+            selector.close();
+            channel.close();
+        } catch (IOException e) {
+            log.error("", e);
+        }
+
+        log.info(serviceName + " service end");
+    }
+
     class ReadSocketService extends ServiceThread {
         private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
         private final Selector selector;
@@ -130,19 +146,7 @@ public class HAConnection {
 
             HAConnection.this.haService.getConnectionCount().decrementAndGet();
 
-            SelectionKey sk = this.socketChannel.keyFor(this.selector);
-            if (sk != null) {
-                sk.cancel();
-            }
-
-            try {
-                this.selector.close();
-                this.socketChannel.close();
-            } catch (IOException e) {
-                HAConnection.log.error("", e);
-            }
-
-            HAConnection.log.info(this.getServiceName() + " service end");
+            HAConnection.this.stopChannelAndSelector(this.socketChannel, this.selector, this.getServiceName());
         }
 
         @Override
@@ -323,19 +327,7 @@ public class HAConnection {
 
             haService.removeConnection(HAConnection.this);
 
-            SelectionKey sk = this.socketChannel.keyFor(this.selector);
-            if (sk != null) {
-                sk.cancel();
-            }
-
-            try {
-                this.selector.close();
-                this.socketChannel.close();
-            } catch (IOException e) {
-                HAConnection.log.error("", e);
-            }
-
-            HAConnection.log.info(this.getServiceName() + " service end");
+            HAConnection.this.stopChannelAndSelector(this.socketChannel, this.selector, this.getServiceName());
         }
 
         private boolean transferData() throws Exception {
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
index 12c5a16f2..aa7ce4f4d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
@@ -330,8 +330,8 @@ public class HAService {
         private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
         private final AtomicReference<String> masterAddress = new AtomicReference<>();
         private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
+        private final Selector selector;
         private SocketChannel socketChannel;
-        private Selector selector;
         private long lastWriteTimestamp = System.currentTimeMillis();
 
         private long currentReportedOffset = 0;
@@ -354,10 +354,9 @@ public class HAService {
         private boolean isTimeToReportOffset() {
             long interval =
                 HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
-            boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
-                .getHaSendHeartbeatInterval();
 
-            return needHeart;
+            return interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
+                .getHaSendHeartbeatInterval();
         }
 
         private boolean reportSlaveMaxOffset(final long maxOffset) {