You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/08/29 03:37:12 UTC

[GitHub] [rocketmq] lizhimins commented on a diff in pull request #4780: [ISSUE #4778] Design of optimize log transfer protocol

lizhimins commented on code in PR #4780:
URL: https://github.com/apache/rocketmq/pull/4780#discussion_r956869627


##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java:
##########
@@ -17,152 +17,132 @@
 
 package org.apache.rocketmq.store.ha.autoswitch;
 
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.ha.FlowMonitor;
 import org.apache.rocketmq.store.ha.HAClient;
 import org.apache.rocketmq.store.ha.HAConnectionState;
-import org.apache.rocketmq.store.ha.io.AbstractHAReader;
-import org.apache.rocketmq.store.ha.io.HAWriter;
+import org.apache.rocketmq.store.ha.netty.TransferMessage;
+import org.apache.rocketmq.store.ha.netty.TransferType;
+import org.apache.rocketmq.store.ha.netty.NettyTransferClientHandler;
+import org.apache.rocketmq.store.ha.netty.NettyTransferDecoder;
+import org.apache.rocketmq.store.ha.netty.NettyTransferEncoder;
+import org.apache.rocketmq.store.ha.protocol.ConfirmTruncate;
+import org.apache.rocketmq.store.ha.protocol.HandshakeMaster;
+import org.apache.rocketmq.store.ha.protocol.HandshakeResult;
+import org.apache.rocketmq.store.ha.protocol.HandshakeSlave;
+import org.apache.rocketmq.store.ha.protocol.PushCommitLogAck;
+import org.apache.rocketmq.store.ha.protocol.PushCommitLogData;
 
 public class AutoSwitchHAClient extends ServiceThread implements HAClient {
 
-    /**
-     * Handshake header buffer size. Schema: state ordinal + Two flags + slaveAddressLength
-     * Flag: isSyncFromLastFile(short), isAsyncLearner(short)... we can add more flags in the future if needed
-     */
-    public static final int HANDSHAKE_HEADER_SIZE = 4 + 4 + 4;
-
-    /**
-     * Header + slaveAddress.
-     */
-    public static final int HANDSHAKE_SIZE = HANDSHAKE_HEADER_SIZE + 50;
-
-    /**
-     * Transfer header buffer size. Schema: state ordinal + maxOffset.
-     */
-    public static final int TRANSFER_HEADER_SIZE = 4 + 8;
-    public static final int MIN_HEADER_SIZE = Math.min(HANDSHAKE_HEADER_SIZE, TRANSFER_HEADER_SIZE);
     private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
-    private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
+
     private final AtomicReference<String> masterHaAddress = new AtomicReference<>();
     private final AtomicReference<String> masterAddress = new AtomicReference<>();
     private final AtomicReference<Long> slaveId = new AtomicReference<>();
-    private final ByteBuffer handshakeHeaderBuffer = ByteBuffer.allocate(HANDSHAKE_SIZE);
-    private final ByteBuffer transferHeaderBuffer = ByteBuffer.allocate(TRANSFER_HEADER_SIZE);
+
     private final AutoSwitchHAService haService;
-    private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
-    private final DefaultMessageStore messageStore;
-    private final EpochFileCache epochCache;
-
-    private String localAddress;
-    private SocketChannel socketChannel;
-    private Selector selector;
-    private AbstractHAReader haReader;
-    private HAWriter haWriter;
+    private final EpochStore epochCache;
+    private final DefaultMessageStore defaultMessageStore;
+    public EventLoopGroup workerGroup;
+    public Bootstrap bootstrap;
     private FlowMonitor flowMonitor;
-    /**
-     * last time that slave reads date from master.
-     */
-    private long lastReadTimestamp;
-    /**
-     * last time that slave reports offset to master.
-     */
-    private long lastWriteTimestamp;
-
-    private long currentReportedOffset;
-    private int processPosition;
-    private volatile HAConnectionState currentState;
-    /**
-     * Current epoch
-     */
-    private volatile long currentReceivedEpoch;
-
-    public AutoSwitchHAClient(AutoSwitchHAService haService, DefaultMessageStore defaultMessageStore,
-        EpochFileCache epochCache) throws IOException {
+    private volatile HAConnectionState currentState = HAConnectionState.SHUTDOWN;
+    private volatile long currentReceivedEpoch = -1L;
+    private volatile long currentTransferOffset = -1L;
+    private volatile long lastReadTimestamp;
+    private volatile long lastWriteTimestamp;
+    private ChannelFuture future;
+    private ChannelPromise channelPromise;
+
+    public AutoSwitchHAClient(AutoSwitchHAService haService) {
         this.haService = haService;
-        this.messageStore = defaultMessageStore;
-        this.epochCache = epochCache;
-        init();
+        this.defaultMessageStore = haService.getDefaultMessageStore();
+        this.epochCache = haService.getEpochStore();
     }
 
     public void init() throws IOException {
-        this.selector = RemotingUtil.openSelector();
-        this.flowMonitor = new FlowMonitor(this.messageStore.getMessageStoreConfig());
-        this.haReader = new HAClientReader();
-        haReader.registerHook(readSize -> {
-            if (readSize > 0) {
-                AutoSwitchHAClient.this.flowMonitor.addByteCountTransferred(readSize);
-                lastReadTimestamp = System.currentTimeMillis();
-            }
-        });
-        this.haWriter = new HAWriter();
-        haWriter.registerHook(writeSize -> {
-            if (writeSize > 0) {
-                lastWriteTimestamp = System.currentTimeMillis();
-            }
-        });
+        if (this.flowMonitor == null) {
+            this.flowMonitor = new FlowMonitor(this.defaultMessageStore.getMessageStoreConfig());
+        }
+
+        // init offset
+        this.currentReceivedEpoch = -1L;
+        this.currentTransferOffset = -1L;
+
+        startNettyClient();
         changeCurrentState(HAConnectionState.READY);
-        this.currentReceivedEpoch = -1;
-        this.currentReportedOffset = 0;
-        this.processPosition = 0;
-        this.lastReadTimestamp = System.currentTimeMillis();
-        this.lastWriteTimestamp = System.currentTimeMillis();
-        haService.updateConfirmOffset(-1);
     }
 
-    public void reOpen() throws IOException {
-        shutdown();
-        init();
+    public void changePromise(boolean success) {
+        if (this.channelPromise != null && !this.channelPromise.isDone()) {
+            if (success) {
+                this.channelPromise.setSuccess();
+            } else {
+                this.channelPromise.setFailure(new RuntimeException("promise failure"));
+            }
+        }
     }
 
     @Override
     public String getServiceName() {
-        if (haService.getDefaultMessageStore().getBrokerConfig().isInBrokerContainer()) {
-            return haService.getDefaultMessageStore().getBrokerIdentity().getLoggerIdentifier() + AutoSwitchHAClient.class.getSimpleName();
+        if (defaultMessageStore.getBrokerConfig().isInBrokerContainer()) {
+            return defaultMessageStore.getBrokerIdentity().getLoggerIdentifier()
+                + AutoSwitchHAClient.class.getSimpleName();
         }
         return AutoSwitchHAClient.class.getSimpleName();
     }
 
-    public void setLocalAddress(String localAddress) {
-        this.localAddress = localAddress;
+    public void updateSlaveId(Long newId) {
+        this.slaveId.set(newId);
     }
 
-    public void updateSlaveId(Long newId) {
-        Long currentId = this.slaveId.get();
-        if (this.slaveId.compareAndSet(currentId, newId)) {
-            LOGGER.info("Update slave Id, OLD: {}, New: {}", currentId, newId);

Review Comment:
   Why do we need CAS here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org