You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/05/15 04:58:30 UTC
[rocketmq] branch develop updated: [ISSUE #4315] Optimize ha module's code
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new a28dc651d [ISSUE #4315] Optimize ha module's code
a28dc651d is described below
commit a28dc651dcd1b250cb50ee29f036e04e97d8c3b8
Author: hzh0425 <64...@qq.com>
AuthorDate: Sun May 15 12:58:11 2022 +0800
[ISSUE #4315] Optimize ha module's code
* feature: optimize ha module code
* review
---
.../org/apache/rocketmq/store/ha/HAConnection.java | 48 +++++++++-------------
.../org/apache/rocketmq/store/ha/HAService.java | 7 ++--
2 files changed, 23 insertions(+), 32 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
index 4c26971c0..5d88f00f2 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
@@ -34,8 +34,8 @@ public class HAConnection {
private final HAService haService;
private final SocketChannel socketChannel;
private final String clientAddr;
- private WriteSocketService writeSocketService;
- private ReadSocketService readSocketService;
+ private final WriteSocketService writeSocketService;
+ private final ReadSocketService readSocketService;
private volatile long slaveRequestOffset = -1;
private volatile long slaveAckOffset = -1;
@@ -83,6 +83,22 @@ public class HAConnection {
return socketChannel;
}
+ private void stopChannelAndSelector(SocketChannel channel, Selector selector, String serviceName) {
+ SelectionKey sk = channel.keyFor(selector);
+ if (sk != null) {
+ sk.cancel();
+ }
+
+ try {
+ selector.close();
+ channel.close();
+ } catch (IOException e) {
+ log.error("", e);
+ }
+
+ log.info(serviceName + " service end");
+ }
+
class ReadSocketService extends ServiceThread {
private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
private final Selector selector;
@@ -130,19 +146,7 @@ public class HAConnection {
HAConnection.this.haService.getConnectionCount().decrementAndGet();
- SelectionKey sk = this.socketChannel.keyFor(this.selector);
- if (sk != null) {
- sk.cancel();
- }
-
- try {
- this.selector.close();
- this.socketChannel.close();
- } catch (IOException e) {
- HAConnection.log.error("", e);
- }
-
- HAConnection.log.info(this.getServiceName() + " service end");
+ HAConnection.this.stopChannelAndSelector(this.socketChannel, this.selector, this.getServiceName());
}
@Override
@@ -323,19 +327,7 @@ public class HAConnection {
haService.removeConnection(HAConnection.this);
- SelectionKey sk = this.socketChannel.keyFor(this.selector);
- if (sk != null) {
- sk.cancel();
- }
-
- try {
- this.selector.close();
- this.socketChannel.close();
- } catch (IOException e) {
- HAConnection.log.error("", e);
- }
-
- HAConnection.log.info(this.getServiceName() + " service end");
+ HAConnection.this.stopChannelAndSelector(this.socketChannel, this.selector, this.getServiceName());
}
private boolean transferData() throws Exception {
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
index 12c5a16f2..aa7ce4f4d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
@@ -330,8 +330,8 @@ public class HAService {
private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
private final AtomicReference<String> masterAddress = new AtomicReference<>();
private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
+ private final Selector selector;
private SocketChannel socketChannel;
- private Selector selector;
private long lastWriteTimestamp = System.currentTimeMillis();
private long currentReportedOffset = 0;
@@ -354,10 +354,9 @@ public class HAService {
private boolean isTimeToReportOffset() {
long interval =
HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
- boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
- .getHaSendHeartbeatInterval();
- return needHeart;
+ return interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
+ .getHaSendHeartbeatInterval();
}
private boolean reportSlaveMaxOffset(final long maxOffset) {