You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/08/04 08:35:42 UTC

[GitHub] [rocketmq] lizhimins opened a new pull request, #4780: [ISSUE #4778] Design of optimize log transfer protocol

lizhimins opened a new pull request, #4780:
URL: https://github.com/apache/rocketmq/pull/4780

   [ISSUE #4778] Design of optimize log transfer protocol


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] lizhimins commented on a diff in pull request #4780: [ISSUE #4778] Design of optimize log transfer protocol

Posted by GitBox <gi...@apache.org>.
lizhimins commented on code in PR #4780:
URL: https://github.com/apache/rocketmq/pull/4780#discussion_r956869627


##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java:
##########
@@ -17,152 +17,132 @@
 
 package org.apache.rocketmq.store.ha.autoswitch;
 
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.ha.FlowMonitor;
 import org.apache.rocketmq.store.ha.HAClient;
 import org.apache.rocketmq.store.ha.HAConnectionState;
-import org.apache.rocketmq.store.ha.io.AbstractHAReader;
-import org.apache.rocketmq.store.ha.io.HAWriter;
+import org.apache.rocketmq.store.ha.netty.TransferMessage;
+import org.apache.rocketmq.store.ha.netty.TransferType;
+import org.apache.rocketmq.store.ha.netty.NettyTransferClientHandler;
+import org.apache.rocketmq.store.ha.netty.NettyTransferDecoder;
+import org.apache.rocketmq.store.ha.netty.NettyTransferEncoder;
+import org.apache.rocketmq.store.ha.protocol.ConfirmTruncate;
+import org.apache.rocketmq.store.ha.protocol.HandshakeMaster;
+import org.apache.rocketmq.store.ha.protocol.HandshakeResult;
+import org.apache.rocketmq.store.ha.protocol.HandshakeSlave;
+import org.apache.rocketmq.store.ha.protocol.PushCommitLogAck;
+import org.apache.rocketmq.store.ha.protocol.PushCommitLogData;
 
 public class AutoSwitchHAClient extends ServiceThread implements HAClient {
 
-    /**
-     * Handshake header buffer size. Schema: state ordinal + Two flags + slaveAddressLength
-     * Flag: isSyncFromLastFile(short), isAsyncLearner(short)... we can add more flags in the future if needed
-     */
-    public static final int HANDSHAKE_HEADER_SIZE = 4 + 4 + 4;
-
-    /**
-     * Header + slaveAddress.
-     */
-    public static final int HANDSHAKE_SIZE = HANDSHAKE_HEADER_SIZE + 50;
-
-    /**
-     * Transfer header buffer size. Schema: state ordinal + maxOffset.
-     */
-    public static final int TRANSFER_HEADER_SIZE = 4 + 8;
-    public static final int MIN_HEADER_SIZE = Math.min(HANDSHAKE_HEADER_SIZE, TRANSFER_HEADER_SIZE);
     private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
-    private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
+
     private final AtomicReference<String> masterHaAddress = new AtomicReference<>();
     private final AtomicReference<String> masterAddress = new AtomicReference<>();
     private final AtomicReference<Long> slaveId = new AtomicReference<>();
-    private final ByteBuffer handshakeHeaderBuffer = ByteBuffer.allocate(HANDSHAKE_SIZE);
-    private final ByteBuffer transferHeaderBuffer = ByteBuffer.allocate(TRANSFER_HEADER_SIZE);
+
     private final AutoSwitchHAService haService;
-    private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
-    private final DefaultMessageStore messageStore;
-    private final EpochFileCache epochCache;
-
-    private String localAddress;
-    private SocketChannel socketChannel;
-    private Selector selector;
-    private AbstractHAReader haReader;
-    private HAWriter haWriter;
+    private final EpochStore epochCache;
+    private final DefaultMessageStore defaultMessageStore;
+    public EventLoopGroup workerGroup;
+    public Bootstrap bootstrap;
     private FlowMonitor flowMonitor;
-    /**
-     * last time that slave reads date from master.
-     */
-    private long lastReadTimestamp;
-    /**
-     * last time that slave reports offset to master.
-     */
-    private long lastWriteTimestamp;
-
-    private long currentReportedOffset;
-    private int processPosition;
-    private volatile HAConnectionState currentState;
-    /**
-     * Current epoch
-     */
-    private volatile long currentReceivedEpoch;
-
-    public AutoSwitchHAClient(AutoSwitchHAService haService, DefaultMessageStore defaultMessageStore,
-        EpochFileCache epochCache) throws IOException {
+    private volatile HAConnectionState currentState = HAConnectionState.SHUTDOWN;
+    private volatile long currentReceivedEpoch = -1L;
+    private volatile long currentTransferOffset = -1L;
+    private volatile long lastReadTimestamp;
+    private volatile long lastWriteTimestamp;
+    private ChannelFuture future;
+    private ChannelPromise channelPromise;
+
+    public AutoSwitchHAClient(AutoSwitchHAService haService) {
         this.haService = haService;
-        this.messageStore = defaultMessageStore;
-        this.epochCache = epochCache;
-        init();
+        this.defaultMessageStore = haService.getDefaultMessageStore();
+        this.epochCache = haService.getEpochStore();
     }
 
     public void init() throws IOException {
-        this.selector = RemotingUtil.openSelector();
-        this.flowMonitor = new FlowMonitor(this.messageStore.getMessageStoreConfig());
-        this.haReader = new HAClientReader();
-        haReader.registerHook(readSize -> {
-            if (readSize > 0) {
-                AutoSwitchHAClient.this.flowMonitor.addByteCountTransferred(readSize);
-                lastReadTimestamp = System.currentTimeMillis();
-            }
-        });
-        this.haWriter = new HAWriter();
-        haWriter.registerHook(writeSize -> {
-            if (writeSize > 0) {
-                lastWriteTimestamp = System.currentTimeMillis();
-            }
-        });
+        if (this.flowMonitor == null) {
+            this.flowMonitor = new FlowMonitor(this.defaultMessageStore.getMessageStoreConfig());
+        }
+
+        // init offset
+        this.currentReceivedEpoch = -1L;
+        this.currentTransferOffset = -1L;
+
+        startNettyClient();
         changeCurrentState(HAConnectionState.READY);
-        this.currentReceivedEpoch = -1;
-        this.currentReportedOffset = 0;
-        this.processPosition = 0;
-        this.lastReadTimestamp = System.currentTimeMillis();
-        this.lastWriteTimestamp = System.currentTimeMillis();
-        haService.updateConfirmOffset(-1);
     }
 
-    public void reOpen() throws IOException {
-        shutdown();
-        init();
+    public void changePromise(boolean success) {
+        if (this.channelPromise != null && !this.channelPromise.isDone()) {
+            if (success) {
+                this.channelPromise.setSuccess();
+            } else {
+                this.channelPromise.setFailure(new RuntimeException("promise failure"));
+            }
+        }
     }
 
     @Override
     public String getServiceName() {
-        if (haService.getDefaultMessageStore().getBrokerConfig().isInBrokerContainer()) {
-            return haService.getDefaultMessageStore().getBrokerIdentity().getLoggerIdentifier() + AutoSwitchHAClient.class.getSimpleName();
+        if (defaultMessageStore.getBrokerConfig().isInBrokerContainer()) {
+            return defaultMessageStore.getBrokerIdentity().getLoggerIdentifier()
+                + AutoSwitchHAClient.class.getSimpleName();
         }
         return AutoSwitchHAClient.class.getSimpleName();
     }
 
-    public void setLocalAddress(String localAddress) {
-        this.localAddress = localAddress;
+    public void updateSlaveId(Long newId) {
+        this.slaveId.set(newId);
     }
 
-    public void updateSlaveId(Long newId) {
-        Long currentId = this.slaveId.get();
-        if (this.slaveId.compareAndSet(currentId, newId)) {
-            LOGGER.info("Update slave Id, OLD: {}, New: {}", currentId, newId);

Review Comment:
   Why do we need CAS here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] hzh0425 commented on pull request #4780: [ISSUE #4778] Design of optimize log transfer protocol

Posted by GitBox <gi...@apache.org>.
hzh0425 commented on PR #4780:
URL: https://github.com/apache/rocketmq/pull/4780#issuecomment-1232404441

   Cool, thanks for your work. @lizhimins 
   Can you provide a performance comparison and stability test of the old and new versions?
   If we switch to the netty version, I'm more curious about how much performance can be improved and how stable it is


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] lizhanhui commented on a diff in pull request #4780: [ISSUE #4778] Design of optimize log transfer protocol

Posted by GitBox <gi...@apache.org>.
lizhanhui commented on code in PR #4780:
URL: https://github.com/apache/rocketmq/pull/4780#discussion_r954537750


##########
common/src/main/java/org/apache/rocketmq/common/EpochEntry.java:
##########
@@ -64,10 +65,10 @@ public void setEndOffset(long endOffset) {
     @Override
     public String toString() {
         return "EpochEntry{" +
-            "epoch=" + epoch +
-            ", startOffset=" + startOffset +
-            ", endOffset=" + endOffset +
-            '}';
+                "epoch=" + epoch +

Review Comment:
   Can we use JSON library to generate string representation instead of manual concatenation? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] zhouxinyu commented on a diff in pull request #4780: [ISSUE #4778] Design of optimize log transfer protocol

Posted by GitBox <gi...@apache.org>.
zhouxinyu commented on code in PR #4780:
URL: https://github.com/apache/rocketmq/pull/4780#discussion_r955596560


##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java:
##########
@@ -237,112 +207,177 @@ public long getTransferredByteInSecond() {
     @Override
     public void shutdown() {
         changeCurrentState(HAConnectionState.SHUTDOWN);
-        // Shutdown thread firstly
+        closeMaster();

Review Comment:
   The close sequence is adjusted without any comments, please add some.



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java:
##########
@@ -198,35 +182,21 @@ public void changeCurrentState(HAConnectionState haConnectionState) {
         this.currentState = haConnectionState;
     }
 
-    public void closeMasterAndWait() {
-        this.closeMaster();
-        this.waitForRunning(1000 * 5);
-    }
-
     @Override
     public void closeMaster() {
-        if (null != this.socketChannel) {
+        if (channelPromise != null) {
+            channelPromise.setFailure(new RuntimeException("epoch not match"));
+        }
+        // close channel
+        if (future != null && future.channel() != null) {

Review Comment:
   Too many netty-related objects are exposed to HAClient, suggest providing a new model.



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java:
##########
@@ -237,112 +207,177 @@ public long getTransferredByteInSecond() {
     @Override
     public void shutdown() {
         changeCurrentState(HAConnectionState.SHUTDOWN);
-        // Shutdown thread firstly
+        closeMaster();
         this.flowMonitor.shutdown();
         super.shutdown();
-
-        closeMaster();
-        try {
-            this.selector.close();
-        } catch (IOException e) {
-            LOGGER.warn("Close the selector of AutoSwitchHAClient error, ", e);
-        }
     }
 
-    private boolean isTimeToReportOffset() {
-        long interval = this.messageStore.now() - this.lastWriteTimestamp;
-        return interval > this.messageStore.getMessageStoreConfig().getHaSendHeartbeatInterval();
+    private void sendHandshakeSlave(Channel channel) {
+        BrokerConfig brokerConfig = defaultMessageStore.getBrokerConfig();
+        HandshakeSlave handshakeSlave = new HandshakeSlave();
+        handshakeSlave.setClusterName(brokerConfig.getBrokerClusterName());
+        handshakeSlave.setBrokerName(brokerConfig.getBrokerName());
+        handshakeSlave.setBrokerId(brokerConfig.getBrokerId());
+        handshakeSlave.setBrokerAddr(((AutoSwitchHAService) defaultMessageStore.getHaService()).getLocalAddress());
+        handshakeSlave.setBrokerAppVersion(MQVersion.CURRENT_VERSION);
+        handshakeSlave.setLanguageCode(LanguageCode.JAVA);
+        handshakeSlave.setHaProtocolVersion(2);
+
+        TransferMessage transferMessage = this.haService.buildMessage(TransferType.HANDSHAKE_SLAVE);
+        transferMessage.appendBody(RemotingSerializable.encode(handshakeSlave));
+
+        this.lastWriteTimestamp = System.currentTimeMillis();
+        channel.writeAndFlush(transferMessage);
     }
 
-    private boolean sendHandshakeHeader() throws IOException {
-        this.handshakeHeaderBuffer.position(0);
-        this.handshakeHeaderBuffer.limit(HANDSHAKE_SIZE);
-        // Original state
-        this.handshakeHeaderBuffer.putInt(HAConnectionState.HANDSHAKE.ordinal());
-        // IsSyncFromLastFile
-        short isSyncFromLastFile = this.haService.getDefaultMessageStore().getMessageStoreConfig().isSyncFromLastFile() ? (short) 1 : (short) 0;
-        this.handshakeHeaderBuffer.putShort(isSyncFromLastFile);
-        // IsAsyncLearner role
-        short isAsyncLearner = this.haService.getDefaultMessageStore().getMessageStoreConfig().isAsyncLearner() ? (short) 1 : (short) 0;
-        this.handshakeHeaderBuffer.putShort(isAsyncLearner);
-        // Address length
-        this.handshakeHeaderBuffer.putInt(this.localAddress == null ? 0 : this.localAddress.length());
-        // Slave address
-        this.handshakeHeaderBuffer.put(this.localAddress == null ? new byte[0] : this.localAddress.getBytes());
-
-        this.handshakeHeaderBuffer.flip();
-        return this.haWriter.write(this.socketChannel, this.handshakeHeaderBuffer);
+    public synchronized void startNettyClient() {
+        AutoSwitchHAClient haClient = this;
+        workerGroup = new NioEventLoopGroup();
+        bootstrap = new Bootstrap();
+        bootstrap.group(workerGroup)
+            .channel(NioSocketChannel.class)
+            .option(ChannelOption.SO_KEEPALIVE, true)
+            .option(ChannelOption.SO_SNDBUF, AutoSwitchHAService.WRITE_MAX_BUFFER_SIZE)
+            .option(ChannelOption.SO_RCVBUF, AutoSwitchHAService.READ_MAX_BUFFER_SIZE)
+            .handler(new ChannelInitializer<SocketChannel>() {
+                @Override
+                public void initChannel(SocketChannel ch) {
+                    ch.pipeline()
+                        .addLast(new IdleStateHandler(15, 15, 0))
+                        .addLast(new NettyTransferDecoder(haService))
+                        .addLast(new NettyTransferEncoder(haService))
+                        .addLast(new NettyTransferClientHandler(haClient));
+                }
+            });
     }
 
-    private void handshakeWithMaster() throws IOException {
-        boolean result = this.sendHandshakeHeader();
-        if (!result) {
-            closeMasterAndWait();
+    public void doNettyConnect() throws InterruptedException {
+        if (future != null && future.channel() != null && future.channel().isActive()) {
+            return;
         }
 
-        this.selector.select(5000);
-
-        result = this.haReader.read(this.socketChannel, this.byteBufferRead);
-        if (!result) {
-            closeMasterAndWait();
+        if (StringUtils.isBlank(this.masterHaAddress.get())) {
+            return;
         }
-    }
 
-    private boolean reportSlaveOffset(final long offsetToReport) throws IOException {
-        this.transferHeaderBuffer.position(0);
-        this.transferHeaderBuffer.limit(TRANSFER_HEADER_SIZE);
-        this.transferHeaderBuffer.putInt(this.currentState.ordinal());
-        this.transferHeaderBuffer.putLong(offsetToReport);
-        this.transferHeaderBuffer.flip();
-        return this.haWriter.write(this.socketChannel, this.transferHeaderBuffer);
-    }
+        SocketAddress socketAddress = RemotingUtil.string2SocketAddress(this.masterHaAddress.get());
+        future = bootstrap.connect(socketAddress).addListener((ChannelFutureListener) future -> {
+            if (future.isSuccess()) {
+                LOGGER.info("Client connect to server successfully!");
+            } else {
+                LOGGER.info("Failed to connect to server, try connect after 1000 ms");
+            }
+        });
 
-    private boolean reportSlaveMaxOffset() throws IOException {
-        boolean result = true;
-        final long maxPhyOffset = this.messageStore.getMaxPhyOffset();
-        if (maxPhyOffset > this.currentReportedOffset) {
-            this.currentReportedOffset = maxPhyOffset;
-            result = reportSlaveOffset(this.currentReportedOffset);
+        try {
+            future.await();
+        } catch (InterruptedException e) {
+            throw new RuntimeException("HAClient start server InterruptedException", e);
         }
-        return result;
     }
 
-    public boolean connectMaster() throws IOException {
-        if (null == this.socketChannel) {
-            String addr = this.masterHaAddress.get();
-            if (StringUtils.isNotEmpty(addr)) {
-                SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
-                this.socketChannel = RemotingUtil.connect(socketAddress);
-                if (this.socketChannel != null) {
-                    this.socketChannel.register(this.selector, SelectionKey.OP_READ);
-                    LOGGER.info("AutoSwitchHAClient connect to master {}", addr);
+    public synchronized boolean tryConnectToMaster() throws InterruptedException {
+        try {
+            String address = this.masterHaAddress.get();
+            if (StringUtils.isNotEmpty(address)) {
+                doNettyConnect();
+                channelPromise = new DefaultChannelPromise(future.channel());
+                sendHandshakeSlave(future.channel());
+                channelPromise.await(5000);
+                if (channelPromise.isSuccess()) {
+                    channelPromise = null;

Review Comment:
   It's also rare to maintain the state machine through a shared `ChanelPromise`.



##########
store/src/main/java/org/apache/rocketmq/store/ha/netty/NettyTransferDecoder.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
+
+public class NettyTransferDecoder extends LengthFieldBasedFrameDecoder {
+
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+
+    private static final int FRAME_MAX_LENGTH =
+        Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));
+
+    private final AutoSwitchHAService autoSwitchHAService;
+
+    public NettyTransferDecoder(AutoSwitchHAService autoSwitchHAService) {
+        super(FRAME_MAX_LENGTH, 0, 4, 20, 0);
+        this.autoSwitchHAService = autoSwitchHAService;
+    }
+
+    @Override
+    protected TransferMessage decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
+        try {
+            ByteBuf frame = (ByteBuf) super.decode(ctx, in);
+            if (null == frame) {
+                return null;
+            }
+            int bodyLength = frame.readInt();

Review Comment:
   Do you handle the half `TransferMessage`?



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java:
##########
@@ -237,112 +207,177 @@ public long getTransferredByteInSecond() {
     @Override
     public void shutdown() {
         changeCurrentState(HAConnectionState.SHUTDOWN);
-        // Shutdown thread firstly
+        closeMaster();
         this.flowMonitor.shutdown();
         super.shutdown();
-
-        closeMaster();
-        try {
-            this.selector.close();
-        } catch (IOException e) {
-            LOGGER.warn("Close the selector of AutoSwitchHAClient error, ", e);
-        }
     }
 
-    private boolean isTimeToReportOffset() {
-        long interval = this.messageStore.now() - this.lastWriteTimestamp;
-        return interval > this.messageStore.getMessageStoreConfig().getHaSendHeartbeatInterval();
+    private void sendHandshakeSlave(Channel channel) {
+        BrokerConfig brokerConfig = defaultMessageStore.getBrokerConfig();
+        HandshakeSlave handshakeSlave = new HandshakeSlave();
+        handshakeSlave.setClusterName(brokerConfig.getBrokerClusterName());
+        handshakeSlave.setBrokerName(brokerConfig.getBrokerName());
+        handshakeSlave.setBrokerId(brokerConfig.getBrokerId());
+        handshakeSlave.setBrokerAddr(((AutoSwitchHAService) defaultMessageStore.getHaService()).getLocalAddress());
+        handshakeSlave.setBrokerAppVersion(MQVersion.CURRENT_VERSION);
+        handshakeSlave.setLanguageCode(LanguageCode.JAVA);
+        handshakeSlave.setHaProtocolVersion(2);
+
+        TransferMessage transferMessage = this.haService.buildMessage(TransferType.HANDSHAKE_SLAVE);
+        transferMessage.appendBody(RemotingSerializable.encode(handshakeSlave));
+
+        this.lastWriteTimestamp = System.currentTimeMillis();
+        channel.writeAndFlush(transferMessage);
     }
 
-    private boolean sendHandshakeHeader() throws IOException {
-        this.handshakeHeaderBuffer.position(0);
-        this.handshakeHeaderBuffer.limit(HANDSHAKE_SIZE);
-        // Original state
-        this.handshakeHeaderBuffer.putInt(HAConnectionState.HANDSHAKE.ordinal());
-        // IsSyncFromLastFile
-        short isSyncFromLastFile = this.haService.getDefaultMessageStore().getMessageStoreConfig().isSyncFromLastFile() ? (short) 1 : (short) 0;
-        this.handshakeHeaderBuffer.putShort(isSyncFromLastFile);
-        // IsAsyncLearner role
-        short isAsyncLearner = this.haService.getDefaultMessageStore().getMessageStoreConfig().isAsyncLearner() ? (short) 1 : (short) 0;
-        this.handshakeHeaderBuffer.putShort(isAsyncLearner);
-        // Address length
-        this.handshakeHeaderBuffer.putInt(this.localAddress == null ? 0 : this.localAddress.length());
-        // Slave address
-        this.handshakeHeaderBuffer.put(this.localAddress == null ? new byte[0] : this.localAddress.getBytes());
-
-        this.handshakeHeaderBuffer.flip();
-        return this.haWriter.write(this.socketChannel, this.handshakeHeaderBuffer);
+    public synchronized void startNettyClient() {
+        AutoSwitchHAClient haClient = this;
+        workerGroup = new NioEventLoopGroup();
+        bootstrap = new Bootstrap();
+        bootstrap.group(workerGroup)
+            .channel(NioSocketChannel.class)
+            .option(ChannelOption.SO_KEEPALIVE, true)
+            .option(ChannelOption.SO_SNDBUF, AutoSwitchHAService.WRITE_MAX_BUFFER_SIZE)
+            .option(ChannelOption.SO_RCVBUF, AutoSwitchHAService.READ_MAX_BUFFER_SIZE)
+            .handler(new ChannelInitializer<SocketChannel>() {
+                @Override
+                public void initChannel(SocketChannel ch) {
+                    ch.pipeline()
+                        .addLast(new IdleStateHandler(15, 15, 0))
+                        .addLast(new NettyTransferDecoder(haService))
+                        .addLast(new NettyTransferEncoder(haService))
+                        .addLast(new NettyTransferClientHandler(haClient));
+                }
+            });
     }
 
-    private void handshakeWithMaster() throws IOException {
-        boolean result = this.sendHandshakeHeader();
-        if (!result) {
-            closeMasterAndWait();
+    public void doNettyConnect() throws InterruptedException {
+        if (future != null && future.channel() != null && future.channel().isActive()) {
+            return;

Review Comment:
   Adding some logs is recommended.



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java:
##########
@@ -17,152 +17,132 @@
 
 package org.apache.rocketmq.store.ha.autoswitch;
 
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.ha.FlowMonitor;
 import org.apache.rocketmq.store.ha.HAClient;
 import org.apache.rocketmq.store.ha.HAConnectionState;
-import org.apache.rocketmq.store.ha.io.AbstractHAReader;
-import org.apache.rocketmq.store.ha.io.HAWriter;
+import org.apache.rocketmq.store.ha.netty.TransferMessage;
+import org.apache.rocketmq.store.ha.netty.TransferType;
+import org.apache.rocketmq.store.ha.netty.NettyTransferClientHandler;
+import org.apache.rocketmq.store.ha.netty.NettyTransferDecoder;
+import org.apache.rocketmq.store.ha.netty.NettyTransferEncoder;
+import org.apache.rocketmq.store.ha.protocol.ConfirmTruncate;
+import org.apache.rocketmq.store.ha.protocol.HandshakeMaster;
+import org.apache.rocketmq.store.ha.protocol.HandshakeResult;
+import org.apache.rocketmq.store.ha.protocol.HandshakeSlave;
+import org.apache.rocketmq.store.ha.protocol.PushCommitLogAck;
+import org.apache.rocketmq.store.ha.protocol.PushCommitLogData;
 
 public class AutoSwitchHAClient extends ServiceThread implements HAClient {
 
-    /**
-     * Handshake header buffer size. Schema: state ordinal + Two flags + slaveAddressLength
-     * Flag: isSyncFromLastFile(short), isAsyncLearner(short)... we can add more flags in the future if needed
-     */
-    public static final int HANDSHAKE_HEADER_SIZE = 4 + 4 + 4;
-
-    /**
-     * Header + slaveAddress.
-     */
-    public static final int HANDSHAKE_SIZE = HANDSHAKE_HEADER_SIZE + 50;
-
-    /**
-     * Transfer header buffer size. Schema: state ordinal + maxOffset.
-     */
-    public static final int TRANSFER_HEADER_SIZE = 4 + 8;
-    public static final int MIN_HEADER_SIZE = Math.min(HANDSHAKE_HEADER_SIZE, TRANSFER_HEADER_SIZE);
     private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
-    private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
+
     private final AtomicReference<String> masterHaAddress = new AtomicReference<>();
     private final AtomicReference<String> masterAddress = new AtomicReference<>();
     private final AtomicReference<Long> slaveId = new AtomicReference<>();
-    private final ByteBuffer handshakeHeaderBuffer = ByteBuffer.allocate(HANDSHAKE_SIZE);
-    private final ByteBuffer transferHeaderBuffer = ByteBuffer.allocate(TRANSFER_HEADER_SIZE);
+
     private final AutoSwitchHAService haService;
-    private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
-    private final DefaultMessageStore messageStore;
-    private final EpochFileCache epochCache;
-
-    private String localAddress;
-    private SocketChannel socketChannel;
-    private Selector selector;
-    private AbstractHAReader haReader;
-    private HAWriter haWriter;
+    private final EpochStore epochCache;
+    private final DefaultMessageStore defaultMessageStore;
+    public EventLoopGroup workerGroup;
+    public Bootstrap bootstrap;
     private FlowMonitor flowMonitor;
-    /**
-     * last time that slave reads date from master.
-     */
-    private long lastReadTimestamp;
-    /**
-     * last time that slave reports offset to master.
-     */
-    private long lastWriteTimestamp;
-
-    private long currentReportedOffset;
-    private int processPosition;
-    private volatile HAConnectionState currentState;
-    /**
-     * Current epoch
-     */
-    private volatile long currentReceivedEpoch;
-
-    public AutoSwitchHAClient(AutoSwitchHAService haService, DefaultMessageStore defaultMessageStore,
-        EpochFileCache epochCache) throws IOException {
+    private volatile HAConnectionState currentState = HAConnectionState.SHUTDOWN;
+    private volatile long currentReceivedEpoch = -1L;
+    private volatile long currentTransferOffset = -1L;
+    private volatile long lastReadTimestamp;
+    private volatile long lastWriteTimestamp;
+    private ChannelFuture future;
+    private ChannelPromise channelPromise;
+
+    public AutoSwitchHAClient(AutoSwitchHAService haService) {
         this.haService = haService;
-        this.messageStore = defaultMessageStore;
-        this.epochCache = epochCache;
-        init();
+        this.defaultMessageStore = haService.getDefaultMessageStore();
+        this.epochCache = haService.getEpochStore();
     }
 
     public void init() throws IOException {
-        this.selector = RemotingUtil.openSelector();
-        this.flowMonitor = new FlowMonitor(this.messageStore.getMessageStoreConfig());
-        this.haReader = new HAClientReader();
-        haReader.registerHook(readSize -> {
-            if (readSize > 0) {
-                AutoSwitchHAClient.this.flowMonitor.addByteCountTransferred(readSize);
-                lastReadTimestamp = System.currentTimeMillis();
-            }
-        });
-        this.haWriter = new HAWriter();
-        haWriter.registerHook(writeSize -> {
-            if (writeSize > 0) {
-                lastWriteTimestamp = System.currentTimeMillis();
-            }
-        });
+        if (this.flowMonitor == null) {
+            this.flowMonitor = new FlowMonitor(this.defaultMessageStore.getMessageStoreConfig());
+        }
+
+        // init offset
+        this.currentReceivedEpoch = -1L;
+        this.currentTransferOffset = -1L;
+
+        startNettyClient();
         changeCurrentState(HAConnectionState.READY);
-        this.currentReceivedEpoch = -1;
-        this.currentReportedOffset = 0;
-        this.processPosition = 0;
-        this.lastReadTimestamp = System.currentTimeMillis();
-        this.lastWriteTimestamp = System.currentTimeMillis();
-        haService.updateConfirmOffset(-1);
     }
 
-    public void reOpen() throws IOException {
-        shutdown();
-        init();
+    public void changePromise(boolean success) {
+        if (this.channelPromise != null && !this.channelPromise.isDone()) {
+            if (success) {
+                this.channelPromise.setSuccess();
+            } else {
+                this.channelPromise.setFailure(new RuntimeException("promise failure"));
+            }
+        }
     }
 
     @Override
     public String getServiceName() {
-        if (haService.getDefaultMessageStore().getBrokerConfig().isInBrokerContainer()) {
-            return haService.getDefaultMessageStore().getBrokerIdentity().getLoggerIdentifier() + AutoSwitchHAClient.class.getSimpleName();
+        if (defaultMessageStore.getBrokerConfig().isInBrokerContainer()) {
+            return defaultMessageStore.getBrokerIdentity().getLoggerIdentifier()
+                + AutoSwitchHAClient.class.getSimpleName();
         }
         return AutoSwitchHAClient.class.getSimpleName();
     }
 
-    public void setLocalAddress(String localAddress) {
-        this.localAddress = localAddress;
+    public void updateSlaveId(Long newId) {
+        this.slaveId.set(newId);
     }
 
-    public void updateSlaveId(Long newId) {
-        Long currentId = this.slaveId.get();
-        if (this.slaveId.compareAndSet(currentId, newId)) {
-            LOGGER.info("Update slave Id, OLD: {}, New: {}", currentId, newId);

Review Comment:
   Do we need to remove this log?



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java:
##########
@@ -237,112 +207,177 @@ public long getTransferredByteInSecond() {
     @Override
     public void shutdown() {
         changeCurrentState(HAConnectionState.SHUTDOWN);
-        // Shutdown thread firstly
+        closeMaster();
         this.flowMonitor.shutdown();
         super.shutdown();
-
-        closeMaster();
-        try {
-            this.selector.close();
-        } catch (IOException e) {
-            LOGGER.warn("Close the selector of AutoSwitchHAClient error, ", e);
-        }
     }
 
-    private boolean isTimeToReportOffset() {
-        long interval = this.messageStore.now() - this.lastWriteTimestamp;
-        return interval > this.messageStore.getMessageStoreConfig().getHaSendHeartbeatInterval();
+    private void sendHandshakeSlave(Channel channel) {
+        BrokerConfig brokerConfig = defaultMessageStore.getBrokerConfig();
+        HandshakeSlave handshakeSlave = new HandshakeSlave();
+        handshakeSlave.setClusterName(brokerConfig.getBrokerClusterName());
+        handshakeSlave.setBrokerName(brokerConfig.getBrokerName());
+        handshakeSlave.setBrokerId(brokerConfig.getBrokerId());
+        handshakeSlave.setBrokerAddr(((AutoSwitchHAService) defaultMessageStore.getHaService()).getLocalAddress());
+        handshakeSlave.setBrokerAppVersion(MQVersion.CURRENT_VERSION);
+        handshakeSlave.setLanguageCode(LanguageCode.JAVA);
+        handshakeSlave.setHaProtocolVersion(2);
+
+        TransferMessage transferMessage = this.haService.buildMessage(TransferType.HANDSHAKE_SLAVE);
+        transferMessage.appendBody(RemotingSerializable.encode(handshakeSlave));
+
+        this.lastWriteTimestamp = System.currentTimeMillis();
+        channel.writeAndFlush(transferMessage);
     }
 
-    private boolean sendHandshakeHeader() throws IOException {
-        this.handshakeHeaderBuffer.position(0);
-        this.handshakeHeaderBuffer.limit(HANDSHAKE_SIZE);
-        // Original state
-        this.handshakeHeaderBuffer.putInt(HAConnectionState.HANDSHAKE.ordinal());
-        // IsSyncFromLastFile
-        short isSyncFromLastFile = this.haService.getDefaultMessageStore().getMessageStoreConfig().isSyncFromLastFile() ? (short) 1 : (short) 0;
-        this.handshakeHeaderBuffer.putShort(isSyncFromLastFile);
-        // IsAsyncLearner role
-        short isAsyncLearner = this.haService.getDefaultMessageStore().getMessageStoreConfig().isAsyncLearner() ? (short) 1 : (short) 0;
-        this.handshakeHeaderBuffer.putShort(isAsyncLearner);
-        // Address length
-        this.handshakeHeaderBuffer.putInt(this.localAddress == null ? 0 : this.localAddress.length());
-        // Slave address
-        this.handshakeHeaderBuffer.put(this.localAddress == null ? new byte[0] : this.localAddress.getBytes());
-
-        this.handshakeHeaderBuffer.flip();
-        return this.haWriter.write(this.socketChannel, this.handshakeHeaderBuffer);
+    public synchronized void startNettyClient() {
+        AutoSwitchHAClient haClient = this;
+        workerGroup = new NioEventLoopGroup();
+        bootstrap = new Bootstrap();
+        bootstrap.group(workerGroup)
+            .channel(NioSocketChannel.class)
+            .option(ChannelOption.SO_KEEPALIVE, true)
+            .option(ChannelOption.SO_SNDBUF, AutoSwitchHAService.WRITE_MAX_BUFFER_SIZE)
+            .option(ChannelOption.SO_RCVBUF, AutoSwitchHAService.READ_MAX_BUFFER_SIZE)
+            .handler(new ChannelInitializer<SocketChannel>() {
+                @Override
+                public void initChannel(SocketChannel ch) {
+                    ch.pipeline()
+                        .addLast(new IdleStateHandler(15, 15, 0))
+                        .addLast(new NettyTransferDecoder(haService))
+                        .addLast(new NettyTransferEncoder(haService))
+                        .addLast(new NettyTransferClientHandler(haClient));
+                }
+            });
     }
 
-    private void handshakeWithMaster() throws IOException {
-        boolean result = this.sendHandshakeHeader();
-        if (!result) {
-            closeMasterAndWait();
+    public void doNettyConnect() throws InterruptedException {
+        if (future != null && future.channel() != null && future.channel().isActive()) {
+            return;
         }
 
-        this.selector.select(5000);
-
-        result = this.haReader.read(this.socketChannel, this.byteBufferRead);
-        if (!result) {
-            closeMasterAndWait();
+        if (StringUtils.isBlank(this.masterHaAddress.get())) {
+            return;
         }
-    }
 
-    private boolean reportSlaveOffset(final long offsetToReport) throws IOException {
-        this.transferHeaderBuffer.position(0);
-        this.transferHeaderBuffer.limit(TRANSFER_HEADER_SIZE);
-        this.transferHeaderBuffer.putInt(this.currentState.ordinal());
-        this.transferHeaderBuffer.putLong(offsetToReport);
-        this.transferHeaderBuffer.flip();
-        return this.haWriter.write(this.socketChannel, this.transferHeaderBuffer);
-    }
+        SocketAddress socketAddress = RemotingUtil.string2SocketAddress(this.masterHaAddress.get());
+        future = bootstrap.connect(socketAddress).addListener((ChannelFutureListener) future -> {
+            if (future.isSuccess()) {
+                LOGGER.info("Client connect to server successfully!");
+            } else {
+                LOGGER.info("Failed to connect to server, try connect after 1000 ms");
+            }
+        });
 
-    private boolean reportSlaveMaxOffset() throws IOException {
-        boolean result = true;
-        final long maxPhyOffset = this.messageStore.getMaxPhyOffset();
-        if (maxPhyOffset > this.currentReportedOffset) {
-            this.currentReportedOffset = maxPhyOffset;
-            result = reportSlaveOffset(this.currentReportedOffset);
+        try {
+            future.await();
+        } catch (InterruptedException e) {
+            throw new RuntimeException("HAClient start server InterruptedException", e);
         }
-        return result;
     }
 
-    public boolean connectMaster() throws IOException {
-        if (null == this.socketChannel) {
-            String addr = this.masterHaAddress.get();
-            if (StringUtils.isNotEmpty(addr)) {
-                SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
-                this.socketChannel = RemotingUtil.connect(socketAddress);
-                if (this.socketChannel != null) {
-                    this.socketChannel.register(this.selector, SelectionKey.OP_READ);
-                    LOGGER.info("AutoSwitchHAClient connect to master {}", addr);
+    public synchronized boolean tryConnectToMaster() throws InterruptedException {
+        try {
+            String address = this.masterHaAddress.get();
+            if (StringUtils.isNotEmpty(address)) {
+                doNettyConnect();
+                channelPromise = new DefaultChannelPromise(future.channel());
+                sendHandshakeSlave(future.channel());
+                channelPromise.await(5000);
+                if (channelPromise.isSuccess()) {
+                    channelPromise = null;
                     changeCurrentState(HAConnectionState.HANDSHAKE);
+                    return true;
+                } else {
+                    LOGGER.warn("Connector to master failed");
                 }
+                channelPromise = null;
             }
-            this.currentReportedOffset = this.messageStore.getMaxPhyOffset();
-            this.lastReadTimestamp = System.currentTimeMillis();
+        } catch (InterruptedException e) {
+            LOGGER.error("HAClient send handshake but not receive response, masterAddr:{}", masterHaAddress.get(), e);
+            future.channel().close().sync();
         }
-        return this.socketChannel != null;
+        return false;
     }
 
-    private boolean transferFromMaster() throws IOException {
-        boolean result;
-        if (isTimeToReportOffset()) {
-            LOGGER.info("Slave report current offset {}", this.currentReportedOffset);
-            result = reportSlaveOffset(this.currentReportedOffset);
-            if (!result) {
-                return false;
-            }
+    private boolean checkConnectionTimeout() {
+        long interval = this.defaultMessageStore.now() - this.lastReadTimestamp;
+        if (interval > this.defaultMessageStore.getMessageStoreConfig().getHaHousekeepingInterval()) {
+            LOGGER.warn("NettyHAClient housekeeping, found this connection {} expired, interval={}",
+                this.masterHaAddress, interval);
+            return false;
         }
+        return true;
+    }
 
-        this.selector.select(1000);
+    public void masterHandshake(HandshakeMaster handshakeMaster) {
+        if (handshakeMaster != null
+            && HandshakeResult.ACCEPT.equals(handshakeMaster.getHandshakeResult())) {
+            channelPromise.setSuccess();
+            return;
+        }
+        LOGGER.error("Master reject build connection, {}", handshakeMaster);
+        channelPromise.setFailure(new Exception("Master reject build connection"));
+    }
 
-        result = this.haReader.read(this.socketChannel, this.byteBufferRead);
-        if (!result) {
+    private boolean queryMasterEpoch() throws InterruptedException {
+        try {
+            TransferMessage transferMessage = haService.buildMessage(TransferType.QUERY_EPOCH);
+            channelPromise = new DefaultChannelPromise(future.channel());
+            this.lastWriteTimestamp = System.currentTimeMillis();
+            future.channel().writeAndFlush(transferMessage);

Review Comment:
   It seems no need to hold a `future` since it's already completed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [ISSUE #4778] Design of optimize log transfer protocol [rocketmq]

Posted by "lizhimins (via GitHub)" <gi...@apache.org>.
lizhimins closed pull request #4780: [ISSUE #4778] Design of optimize log transfer protocol
URL: https://github.com/apache/rocketmq/pull/4780


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org