You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/05/02 09:26:50 UTC

[GitHub] [rocketmq] hzh0425 opened a new pull request, #4236: [Summer of Code] Support switch role for ha service

hzh0425 opened a new pull request, #4236:
URL: https://github.com/apache/rocketmq/pull/4236

   ## What is the purpose of the change
   We want unified log replication, using rocketmq's original HaService instead of dledger mode.
   Previously, i have done the following job with @RongtongJin 
   1. Add statemachine mode for dledger: https://github.com/openmessaging/dledger/pull/128
   2. Embed a strongly consistent controller based on dledger implementation in name-srv: https://github.com/apache/rocketmq/pull/4195
   
   In this pr, I added a haService - AutoSwitchHAService that can switch role, this ha can cooperate with the controller to achieve master-slave switching
   
   In the follow-up work, I will modify the code of the broker to fully realize the master-slave switch
   
   ## Brief changelog
   
   The ha service protocol is:
   ![主从复制](https://user-images.githubusercontent.com/58988019/166210140-ed82e24f-4279-46c5-bca3-dd68a250ebec.png)
   
   
   ## Verifying this change
   
   Follow this checklist to help us incorporate your contribution quickly and easily. Notice, `it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR`.
   
   - [x] Make sure there is a [Github issue](https://github.com/apache/rocketmq/issues) filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue. 
   - [x] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
   - [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
   - [x] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/rocketmq/tree/master/test).
   - [x] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test`  to make sure integration-test pass.
   - [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] hzh0425 commented on a diff in pull request #4236: [Summer of Code] Support switch role for ha service

Posted by GitBox <gi...@apache.org>.
hzh0425 commented on code in PR #4236:
URL: https://github.com/apache/rocketmq/pull/4236#discussion_r864397400


##########
common/src/main/java/org/apache/rocketmq/common/EpochEntry.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common;
+
+public class EpochEntry {
+
+    private final int epoch;
+    private final long startOffset;
+    private long endOffset = Long.MAX_VALUE;

Review Comment:
   这里可能不适合变成 -1, 因为有些地方需要比较 min(endOffset), 变成 -1 又得多一层判断



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] hzh0425 commented on a diff in pull request #4236: [Summer of Code] Support switch role for ha service

Posted by GitBox <gi...@apache.org>.
hzh0425 commented on code in PR #4236:
URL: https://github.com/apache/rocketmq/pull/4236#discussion_r864394714


##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java:
##########
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.List;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.ha.FlowMonitor;
+import org.apache.rocketmq.store.ha.HAConnection;
+import org.apache.rocketmq.store.ha.HAConnectionState;
+import org.apache.rocketmq.store.ha.io.AbstractHAReader;
+import org.apache.rocketmq.store.ha.io.HAWriter;
+
+public class AutoSwitchHAConnection implements HAConnection {
+    /**
+     * Header protocol in syncing msg from master.
+     * Format: current state + body size + offset + epoch + additionalInfo(confirmOffset).
+     * If the msg is hankeShakeMsg, the body size = EpochEntrySize * EpochEntryNums, the offset is maxOffset in master.
+     */
+    public static final int MSG_HEADER_SIZE = 4 + 4 + 8 + 4 + 8;
+    public static final int EPOCH_ENTRY_SIZE = 12;
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private final AutoSwitchHAService haService;
+    private final SocketChannel socketChannel;
+    private final String clientAddress;
+    private final EpochFileCache epochCache;
+    private final AbstractWriteSocketService writeSocketService;
+    private final ReadSocketService readSocketService;
+    private volatile HAConnectionState currentState = HAConnectionState.HANDSHAKE;
+    private volatile long slaveRequestOffset = -1;
+    private volatile long slaveAckOffset = -1;
+    /**
+     * Whether the slave have already sent a handshake message
+     */
+    private volatile boolean isSlaveSendHandshake = false;
+    private volatile int currentTransferEpoch = -1;
+    private volatile long currentTransferEpochEndOffset = 0;
+    private final FlowMonitor flowMonitor;
+
+    public AutoSwitchHAConnection(AutoSwitchHAService haService, SocketChannel socketChannel,
+        EpochFileCache epochCache) throws IOException {
+        this.haService = haService;
+        this.socketChannel = socketChannel;
+        this.epochCache = epochCache;
+        this.clientAddress = this.socketChannel.socket().getRemoteSocketAddress().toString();
+        this.socketChannel.configureBlocking(false);
+        this.socketChannel.socket().setSoLinger(false, -1);
+        this.socketChannel.socket().setTcpNoDelay(true);
+        this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
+        this.socketChannel.socket().setSendBufferSize(1024 * 64);
+        this.writeSocketService = new WriteSocketService(this.socketChannel);
+        this.readSocketService = new ReadSocketService(this.socketChannel);
+        this.haService.getConnectionCount().incrementAndGet();
+        this.flowMonitor = new FlowMonitor(haService.getDefaultMessageStore().getMessageStoreConfig());
+    }
+
+    @Override public void start() {
+        changeCurrentState(HAConnectionState.HANDSHAKE);
+        this.flowMonitor.start();
+        this.readSocketService.start();
+        this.writeSocketService.start();
+    }
+
+    @Override public void shutdown() {
+        changeCurrentState(HAConnectionState.SHUTDOWN);
+        this.flowMonitor.shutdown(true);
+        this.writeSocketService.shutdown(true);
+        this.readSocketService.shutdown(true);
+        this.close();
+    }
+
+    @Override public void close() {
+        if (this.socketChannel != null) {
+            try {
+                this.socketChannel.close();
+            } catch (final IOException e) {
+                LOGGER.error("", e);
+            }
+        }
+    }
+
+    public void changeCurrentState(HAConnectionState connectionState) {
+        LOGGER.info("change state to {}", connectionState);
+        this.currentState = connectionState;
+    }
+
+    @Override public HAConnectionState getCurrentState() {
+        return currentState;
+    }
+
+    @Override public SocketChannel getSocketChannel() {
+        return socketChannel;
+    }
+
+    @Override public String getClientAddress() {
+        return clientAddress;
+    }
+
+    @Override public long getSlaveAckOffset() {
+        return slaveAckOffset;
+    }
+
+    @Override public long getTransferredByteInSecond() {
+        return flowMonitor.getTransferredByteInSecond();
+    }
+
+    @Override public long getTransferFromWhere() {
+        return this.writeSocketService.getNextTransferFromWhere();
+    }
+
+    private void changeTransferEpochToNext(final EpochEntry entry) {
+        this.currentTransferEpoch = entry.getEpoch();
+        this.currentTransferEpochEndOffset = entry.getEndOffset();
+        if (entry.getEpoch() == this.epochCache.lastEpoch()) {
+            // Use -1 to stand for +∞
+            this.currentTransferEpochEndOffset = -1;
+        }
+    }
+
+    class ReadSocketService extends ServiceThread {
+        private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
+        private final Selector selector;
+        private final SocketChannel socketChannel;
+        private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+        private final AbstractHAReader haReader;
+        private int processPosition = 0;
+        private volatile long lastReadTimestamp = System.currentTimeMillis();
+
+        public ReadSocketService(final SocketChannel socketChannel) throws IOException {
+            this.selector = RemotingUtil.openSelector();
+            this.socketChannel = socketChannel;
+            this.socketChannel.register(this.selector, SelectionKey.OP_READ);
+            this.setDaemon(true);
+            haReader = new HAServerReader();
+            haReader.registerHook(readSize -> {
+                if (readSize > 0) {
+                    ReadSocketService.this.lastReadTimestamp =
+                        haService.getDefaultMessageStore().getSystemClock().now();
+                }
+            });
+        }
+
+        @Override
+        public void run() {
+            LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.selector.select(1000);
+                    boolean ok = this.haReader.read(this.socketChannel, this.byteBufferRead);
+                    if (!ok) {
+                        AutoSwitchHAConnection.LOGGER.error("processReadEvent error");
+                        break;
+                    }
+
+                    long interval = haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
+                    if (interval > haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
+                        LOGGER.warn("ha housekeeping, found this connection[" + clientAddress + "] expired, " + interval);
+                        break;
+                    }
+                } catch (Exception e) {
+                    AutoSwitchHAConnection.LOGGER.error(this.getServiceName() + " service has exception.", e);
+                    break;
+                }
+            }
+
+            this.makeStop();
+
+            changeCurrentState(HAConnectionState.SHUTDOWN);
+
+            writeSocketService.makeStop();
+
+            haService.removeConnection(AutoSwitchHAConnection.this);
+
+            haService.getConnectionCount().decrementAndGet();
+
+            SelectionKey sk = this.socketChannel.keyFor(this.selector);
+            if (sk != null) {
+                sk.cancel();
+            }
+
+            try {
+                this.selector.close();
+                this.socketChannel.close();
+            } catch (IOException e) {
+                AutoSwitchHAConnection.LOGGER.error("", e);
+            }
+
+            AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            return ReadSocketService.class.getSimpleName();
+        }
+
+        class HAServerReader extends AbstractHAReader {
+            @Override
+            protected boolean processReadResult(ByteBuffer byteBufferRead) {
+                while (true) {
+                    int diff = byteBufferRead.position() - ReadSocketService.this.processPosition;
+                    if (diff >= AutoSwitchHAClient.TRANSFER_HEADER_SIZE) {
+                        int readPosition = ReadSocketService.this.processPosition;
+                        HAConnectionState slaveState = HAConnectionState.values()[byteBufferRead.getInt(readPosition)];
+
+                        switch (slaveState) {
+                            case HANDSHAKE:
+                                isSlaveSendHandshake = true;
+                                ReadSocketService.this.processPosition += AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
+                                break;
+                            case TRANSFER:
+                                long slaveMaxOffset = byteBufferRead.getLong(readPosition + 4);
+                                ReadSocketService.this.processPosition += AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
+
+                                slaveAckOffset = slaveMaxOffset;
+                                if (slaveRequestOffset < 0) {
+                                    slaveRequestOffset = slaveMaxOffset;
+                                }
+                                LOGGER.info("slave[" + clientAddress + "] request offset " + slaveMaxOffset);
+                                AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
+                                break;
+                            default:
+                                LOGGER.error("Current state illegal {}", currentState);
+                                break;
+                        }
+
+                        if (!slaveState.equals(currentState)) {
+                            LOGGER.warn("Master change state from {} to {}", currentState, slaveState);
+                            changeCurrentState(slaveState);
+                        }
+                        continue;
+                    }
+
+                    if (!byteBufferRead.hasRemaining()) {
+                        byteBufferRead.position(ReadSocketService.this.processPosition);
+                        byteBufferRead.compact();
+                        ReadSocketService.this.processPosition = 0;
+                    }
+                    break;
+                }
+
+                return true;
+            }
+        }
+    }
+
+    class WriteSocketService extends AbstractWriteSocketService {
+        private SelectMappedBufferResult selectMappedBufferResult;
+
+        public WriteSocketService(final SocketChannel socketChannel) throws IOException {
+            super(socketChannel);
+        }
+
+        @Override
+        protected int getNextTransferDataSize() {
+            SelectMappedBufferResult selectResult = haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
+            if (selectResult == null || selectResult.getSize() <= 0) {
+                return 0;
+            }
+            this.selectMappedBufferResult = selectResult;
+            return selectResult.getSize();
+        }
+
+        @Override
+        protected void releaseData() {
+            this.selectMappedBufferResult.release();
+            this.selectMappedBufferResult = null;
+        }
+
+        @Override
+        protected boolean transferData(int maxTransferSize) {
+
+            if (null != this.selectMappedBufferResult && maxTransferSize >= 0) {
+                this.selectMappedBufferResult.getByteBuffer().limit(maxTransferSize);
+            }
+
+            // Write Header
+            boolean result = haWriter.write(this.socketChannel, this.byteBufferHeader);
+
+            if (!result) {
+                return false;
+            }
+
+            if (null == this.selectMappedBufferResult) {
+                return true;
+            }
+
+            // Write Body
+            result = haWriter.write(this.socketChannel, this.selectMappedBufferResult.getByteBuffer());
+
+            if (result) {
+                releaseData();
+            }
+            return result;
+        }
+
+        @Override
+        protected void onStop() {
+            if (this.selectMappedBufferResult != null) {
+                this.selectMappedBufferResult.release();
+            }
+        }
+
+        @Override
+        public String getServiceName() {
+            if (haService.getDefaultMessageStore().getBrokerConfig().isInBrokerContainer()) {
+                return haService.getDefaultMessageStore().getBrokerConfig().getLoggerIdentifier() + WriteSocketService.class.getSimpleName();
+            }
+            return WriteSocketService.class.getSimpleName();
+        }
+    }
+
+    abstract class AbstractWriteSocketService extends ServiceThread {
+        protected final Selector selector;
+        protected final SocketChannel socketChannel;
+        protected final HAWriter haWriter;
+
+        protected final ByteBuffer byteBufferHeader = ByteBuffer.allocate(MSG_HEADER_SIZE);
+        // Store master epochFileCache: (Epoch + startOffset) * 1000
+        private final ByteBuffer handShakeBuffer = ByteBuffer.allocate(EPOCH_ENTRY_SIZE * 1000);
+        protected long nextTransferFromWhere = -1;
+        protected boolean lastWriteOver = true;
+        protected long lastWriteTimestamp = System.currentTimeMillis();
+        protected long lastPrintTimestamp = System.currentTimeMillis();
+        protected long transferOffset = 0;
+
+        public AbstractWriteSocketService(final SocketChannel socketChannel) throws IOException {
+            this.selector = RemotingUtil.openSelector();
+            this.socketChannel = socketChannel;
+            this.socketChannel.register(this.selector, SelectionKey.OP_WRITE);
+            this.setDaemon(true);
+            haWriter = new HAWriter();
+            haWriter.registerHook(writeSize -> {
+                flowMonitor.addByteCountTransferred(writeSize);
+                if (writeSize > 0) {
+                    AbstractWriteSocketService.this.lastWriteTimestamp =
+                        haService.getDefaultMessageStore().getSystemClock().now();
+                }
+            });
+        }
+
+        public long getNextTransferFromWhere() {
+            return this.nextTransferFromWhere;
+        }
+
+        // Handle shake method
+        private boolean buildHandshakeBuffer() {
+            final List<EpochEntry> epochEntries = AutoSwitchHAConnection.this.epochCache.getAllEntries();
+            final int lastEpoch = AutoSwitchHAConnection.this.epochCache.lastEpoch();
+            final long maxPhyOffset = AutoSwitchHAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset();
+            this.byteBufferHeader.position(0);
+            this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+            // State
+            this.byteBufferHeader.putInt(currentState.ordinal());
+            // Body size
+            this.byteBufferHeader.putInt(epochEntries.size() * EPOCH_ENTRY_SIZE);
+            // Offset
+            this.byteBufferHeader.putLong(maxPhyOffset);
+            // Epoch
+            this.byteBufferHeader.putInt(lastEpoch);
+            // Additional info
+            this.byteBufferHeader.putLong(0L);
+            this.byteBufferHeader.flip();
+
+            // EpochEntries
+            this.handShakeBuffer.position(0);
+            this.handShakeBuffer.limit(EPOCH_ENTRY_SIZE * epochEntries.size());
+            for (final EpochEntry entry : epochEntries) {
+                if (entry != null) {
+                    this.handShakeBuffer.putInt(entry.getEpoch());
+                    this.handShakeBuffer.putLong(entry.getStartOffset());
+                }
+            }
+            this.handShakeBuffer.flip();
+            LOGGER.info("Master build handshake header: maxEpoch:{}, maxOffset:{}, epochEntries:{}", lastEpoch, maxPhyOffset, epochEntries);
+            return true;
+        }
+
+        private boolean handshakeWithSlave() {
+            // Write Header
+            boolean result = this.haWriter.write(this.socketChannel, this.byteBufferHeader);
+
+            if (!result) {
+                return false;
+            }
+
+            // Write Body
+            return this.haWriter.write(this.socketChannel, this.handShakeBuffer);
+        }
+
+        // Normal transfer method
+        private void buildTransferHeaderBuffer(long nextOffset, int bodySize) {
+            // Build Header
+            this.byteBufferHeader.position(0);
+            this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+            // State
+            this.byteBufferHeader.putInt(currentState.ordinal());
+            // Body size
+            this.byteBufferHeader.putInt(bodySize);
+            // Offset
+            this.byteBufferHeader.putLong(nextOffset);
+            // Epoch
+            this.byteBufferHeader.putInt(AutoSwitchHAConnection.this.currentTransferEpoch);
+            // Additional info(confirm offset)
+            final long confirmOffset = AutoSwitchHAConnection.this.haService.getConfirmOffset();
+            this.byteBufferHeader.putLong(confirmOffset);
+            this.byteBufferHeader.flip();
+            LOGGER.info("Master send msg, state:{}, size:{}, offset:{}, epoch:{}, confirmOffset:{}",
+                currentState, bodySize, nextOffset, AutoSwitchHAConnection.this.currentTransferEpoch, confirmOffset);
+        }
+
+        private void transferToSlave() throws Exception {
+            if (this.lastWriteOver) {
+                long interval =
+                    haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
+
+                if (interval > haService.getDefaultMessageStore().getMessageStoreConfig()
+                    .getHaSendHeartbeatInterval()) {
+
+                    buildTransferHeaderBuffer(this.nextTransferFromWhere, 0);
+
+                    this.lastWriteOver = this.transferData(0);
+                    if (!this.lastWriteOver) {
+                        return;
+                    }
+                }
+            } else {
+                // maxTransferSize == -1 means to continue transfer remaining data.
+                this.lastWriteOver = this.transferData(-1);
+                if (!this.lastWriteOver) {
+                    return;
+                }
+            }
+
+            int size = this.getNextTransferDataSize();
+            if (size > 0) {
+                if (size > haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
+                    size = haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
+                }
+                int canTransferMaxBytes = flowMonitor.canTransferMaxByteNum();
+                if (size > canTransferMaxBytes) {
+                    if (System.currentTimeMillis() - lastPrintTimestamp > 1000) {
+                        LOGGER.warn("Trigger HA flow control, max transfer speed {}KB/s, current speed: {}KB/s",
+                            String.format("%.2f", flowMonitor.maxTransferByteInSecond() / 1024.0),
+                            String.format("%.2f", flowMonitor.getTransferredByteInSecond() / 1024.0));
+                        lastPrintTimestamp = System.currentTimeMillis();
+                    }
+                    size = canTransferMaxBytes;
+                }
+                if (size <= 0) {
+                    this.releaseData();
+                    this.waitForRunning(100);
+                    return;
+                }
+
+                // We must ensure that the transmitted logs are within the same epoch
+                // If currentEpochEndOffset == -1, means that currentTransferEpoch = last epoch, so the endOffset = +∞.
+                final long currentEpochEndOffset = AutoSwitchHAConnection.this.currentTransferEpochEndOffset;
+                if (currentEpochEndOffset != -1 && this.nextTransferFromWhere + size > currentEpochEndOffset) {
+                    final EpochEntry epochEntry = AutoSwitchHAConnection.this.epochCache.nextEntry(AutoSwitchHAConnection.this.currentTransferEpoch);
+                    if (epochEntry == null) {
+                        LOGGER.error("Can't find a bigger epochEntry than epoch {}", AutoSwitchHAConnection.this.currentTransferEpoch);
+                        waitForRunning(100);
+                        return;
+                    }
+                    size = (int) (currentEpochEndOffset - this.nextTransferFromWhere);
+                    changeTransferEpochToNext(epochEntry);
+                }

Review Comment:
   master epoch 变化的话, 可以走一个统一的接口. master 会把所有 haconnection 清除重连, 所以 {currentTransferEpoch} 和
   {currentTransferEpochEndOffset} 都会动态变化的



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #4236: [Summer of Code] Support switch role for ha service

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on code in PR #4236:
URL: https://github.com/apache/rocketmq/pull/4236#discussion_r864411830


##########
common/src/main/java/org/apache/rocketmq/common/EpochEntry.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common;
+
+public class EpochEntry {
+
+    private final int epoch;
+    private final long startOffset;
+    private long endOffset = Long.MAX_VALUE;

Review Comment:
   OK,这个我没有考虑到



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #4236: [Summer of Code] Support switch role for ha service

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on code in PR #4236:
URL: https://github.com/apache/rocketmq/pull/4236#discussion_r866426430


##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java:
##########
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.ha.FlowMonitor;
+import org.apache.rocketmq.store.ha.HAClient;
+import org.apache.rocketmq.store.ha.HAConnectionState;
+import org.apache.rocketmq.store.ha.io.AbstractHAReader;
+import org.apache.rocketmq.store.ha.io.HAWriter;
+
+public class AutoSwitchHAClient extends ServiceThread implements HAClient {
+    /**
+     * Transfer header buffer size. Schema: state ordinal + maxOffset
+     */
+    public static final int TRANSFER_HEADER_SIZE = 4 + 8;
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
+    private final AtomicReference<String> masterHaAddress = new AtomicReference<>();
+    private final AtomicReference<String> masterAddress = new AtomicReference<>();
+    private final ByteBuffer transferHeaderBuffer = ByteBuffer.allocate(TRANSFER_HEADER_SIZE);
+    private final AutoSwitchHAService haService;
+    private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+    private final DefaultMessageStore messageStore;
+    private final EpochFileCache epochCache;
+
+    private SocketChannel socketChannel;
+    private Selector selector;
+    private AbstractHAReader haReader;
+    private HAWriter haWriter;
+    private FlowMonitor flowMonitor;
+    /**
+     * last time that slave reads date from master.
+     */
+    private long lastReadTimestamp;
+    /**
+     * last time that slave reports offset to master.
+     */
+    private long lastWriteTimestamp;
+
+    private long currentReportedOffset;
+    private int processPosition;
+    private volatile HAConnectionState currentState;
+    /**
+     * Current epoch
+     */
+    private volatile long currentReceivedEpoch;
+
+    /**
+     * Confirm offset = min(localMaxOffset, master confirm offset).
+     */
+    private volatile long confirmOffset;
+
+    public AutoSwitchHAClient(AutoSwitchHAService haService, DefaultMessageStore defaultMessageStore,
+        EpochFileCache epochCache) throws IOException {
+        this.haService = haService;
+        this.messageStore = defaultMessageStore;
+        this.epochCache = epochCache;
+        init();
+    }
+
+    public void init() throws IOException {
+        this.selector = RemotingUtil.openSelector();
+        this.flowMonitor = new FlowMonitor(this.messageStore.getMessageStoreConfig());
+        this.haReader = new HAClientReader();
+        haReader.registerHook(readSize -> {
+            if (readSize > 0) {
+                AutoSwitchHAClient.this.flowMonitor.addByteCountTransferred(readSize);
+                lastReadTimestamp = System.currentTimeMillis();
+            }
+        });
+        this.haWriter = new HAWriter();
+        haWriter.registerHook(writeSize -> {
+            if (writeSize > 0) {
+                lastWriteTimestamp = System.currentTimeMillis();
+            }
+        });
+        changeCurrentState(HAConnectionState.READY);
+        this.currentReceivedEpoch = -1;
+        this.currentReportedOffset = 0;
+        this.processPosition = 0;
+        this.confirmOffset = -1;
+        this.lastReadTimestamp = System.currentTimeMillis();
+        this.lastWriteTimestamp = System.currentTimeMillis();
+    }
+
+    public void reOpen() throws IOException {
+        shutdown();
+        init();
+    }
+
+    @Override public String getServiceName() {
+        return AutoSwitchHAClient.class.getSimpleName();
+    }
+
+    @Override public void updateMasterAddress(String newAddress) {
+        String currentAddr = this.masterAddress.get();
+        if (masterAddress.compareAndSet(currentAddr, newAddress)) {
+            LOGGER.info("update master address, OLD: " + currentAddr + " NEW: " + newAddress);
+        }
+    }
+
+    @Override public void updateHaMasterAddress(String newAddress) {
+        String currentAddr = this.masterHaAddress.get();
+        if (masterHaAddress.compareAndSet(currentAddr, newAddress)) {
+            LOGGER.info("update master ha address, OLD: " + currentAddr + " NEW: " + newAddress);
+        }
+    }
+
+    @Override public String getMasterAddress() {
+        return this.masterAddress.get();
+    }
+
+    @Override public String getHaMasterAddress() {
+        return this.masterHaAddress.get();
+    }
+
+    @Override public long getLastReadTimestamp() {
+        return this.lastReadTimestamp;
+    }
+
+    @Override public long getLastWriteTimestamp() {
+        return this.lastWriteTimestamp;
+    }
+
+    @Override public HAConnectionState getCurrentState() {
+        return this.currentState;
+    }
+
+    @Override public void changeCurrentState(HAConnectionState haConnectionState) {
+        LOGGER.info("change state to {}", haConnectionState);
+        this.currentState = haConnectionState;
+    }
+
+    public void closeMasterAndWait() {
+        this.closeMaster();
+        this.waitForRunning(1000 * 5);
+    }
+
+    @Override public void closeMaster() {
+        if (null != this.socketChannel) {
+            try {
+                SelectionKey sk = this.socketChannel.keyFor(this.selector);
+                if (sk != null) {
+                    sk.cancel();
+                }
+
+                this.socketChannel.close();
+                this.socketChannel = null;
+
+                LOGGER.info("AutoSwitchHAClient close connection with master {}", this.masterHaAddress.get());
+                this.changeCurrentState(HAConnectionState.READY);
+            } catch (IOException e) {
+                LOGGER.warn("CloseMaster exception. ", e);
+            }
+
+            this.lastReadTimestamp = 0;
+            this.processPosition = 0;
+
+            this.byteBufferRead.position(0);
+            this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
+        }
+    }
+
+    @Override public long getTransferredByteInSecond() {
+        return this.flowMonitor.getTransferredByteInSecond();
+    }
+
+    @Override public void shutdown() {
+        changeCurrentState(HAConnectionState.SHUTDOWN);
+        // Shutdown thread firstly
+        this.flowMonitor.shutdown();
+        super.shutdown();
+
+        closeMaster();
+        try {
+            this.selector.close();
+        } catch (IOException e) {
+            LOGGER.warn("Close the selector of AutoSwitchHAClient error, ", e);
+        }
+    }
+
+    private boolean isTimeToReportOffset() {
+        long interval = this.messageStore.now() - this.lastWriteTimestamp;
+        return interval > this.messageStore.getMessageStoreConfig().getHaSendHeartbeatInterval();
+    }
+
+    private boolean sendHandshakeHeader() {
+        this.transferHeaderBuffer.position(0);
+        this.transferHeaderBuffer.limit(TRANSFER_HEADER_SIZE);
+        this.transferHeaderBuffer.putInt(HAConnectionState.HANDSHAKE.ordinal());
+        this.transferHeaderBuffer.putLong(0L);
+        this.transferHeaderBuffer.flip();
+        return this.haWriter.write(this.socketChannel, this.transferHeaderBuffer);
+    }
+
+    private void handshakeWithMaster() throws IOException {
+        sendHandshakeHeader();
+        boolean result = this.sendHandshakeHeader();
+        if (!result) {
+            closeMasterAndWait();
+        }
+
+        this.selector.select(5000);
+
+        result = this.haReader.read(this.socketChannel, this.byteBufferRead);
+        if (!result) {
+            closeMasterAndWait();
+            return;
+        }
+    }
+
+    private boolean reportSlaveOffset(final long offsetToReport) {
+        this.transferHeaderBuffer.position(0);
+        this.transferHeaderBuffer.limit(TRANSFER_HEADER_SIZE);
+        this.transferHeaderBuffer.putInt(this.currentState.ordinal());
+        this.transferHeaderBuffer.putLong(offsetToReport);
+        this.transferHeaderBuffer.flip();
+        return this.haWriter.write(this.socketChannel, this.transferHeaderBuffer);
+    }
+
+    private boolean reportSlaveMaxOffset() {
+        boolean result = true;
+        final long maxPhyOffset = this.messageStore.getMaxPhyOffset();
+        if (maxPhyOffset > this.currentReportedOffset) {
+            this.currentReportedOffset = maxPhyOffset;
+            result = reportSlaveOffset(this.currentReportedOffset);
+        }
+        return result;
+    }
+
+    public boolean connectMaster() throws ClosedChannelException {
+        if (null == this.socketChannel) {
+            String addr = this.masterHaAddress.get();
+            if (addr != null) {
+                SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
+                this.socketChannel = RemotingUtil.connect(socketAddress);
+                if (this.socketChannel != null) {
+                    this.socketChannel.register(this.selector, SelectionKey.OP_READ);
+                    LOGGER.info("AutoSwitchHAClient connect to master {}", addr);
+                    changeCurrentState(HAConnectionState.HANDSHAKE);
+                }
+            }
+            this.currentReportedOffset = this.messageStore.getMaxPhyOffset();
+            this.lastReadTimestamp = System.currentTimeMillis();
+        }
+        return this.socketChannel != null;
+    }
+
+    private boolean transferFromMaster() throws IOException {
+        boolean result;
+        if (isTimeToReportOffset()) {
+            LOGGER.info("Slave report current offset {}", this.currentReportedOffset);
+            result = reportSlaveOffset(this.currentReportedOffset);
+            if (!result) {
+                return false;
+            }
+        }
+
+        this.selector.select(1000);
+
+        result = this.haReader.read(this.socketChannel, this.byteBufferRead);
+        if (!result) {
+            return false;
+        }
+
+        return this.reportSlaveMaxOffset();
+    }
+
+    @Override public void run() {
+        LOGGER.info(this.getServiceName() + " service started");
+
+        this.flowMonitor.start();
+        while (!this.isStopped()) {
+            try {
+                switch (this.currentState) {
+                    case SHUTDOWN:
+                        return;
+                    case READY:
+                        // Truncate invalid msg first
+                        final long truncateOffset = AutoSwitchHAClient.this.haService.truncateInvalidMsg();
+                        if (truncateOffset >= 0) {
+                            AutoSwitchHAClient.this.epochCache.truncateSuffixByOffset(truncateOffset);
+                        }
+                        if (!connectMaster()) {
+                            LOGGER.warn("AutoSwitchHAClient connect to master {} failed", this.masterHaAddress.get());
+                            waitForRunning(1000 * 5);
+                        }
+                        continue;
+                    case HANDSHAKE:
+                        handshakeWithMaster();
+                        continue;
+                    case TRANSFER:
+                        if (!transferFromMaster()) {
+                            closeMasterAndWait();
+                            continue;
+                        }
+                        break;
+                    case SUSPEND:
+                    default:
+                        waitForRunning(1000 * 5);
+                        continue;
+                }
+                long interval = this.messageStore.now() - this.lastReadTimestamp;
+                if (interval > this.messageStore.getMessageStoreConfig().getHaHousekeepingInterval()) {
+                    LOGGER.warn("AutoSwitchHAClient, housekeeping, found this connection[" + this.masterHaAddress
+                        + "] expired, " + interval);
+                    closeMaster();
+                    LOGGER.warn("AutoSwitchHAClient, master not response some time, so close connection");
+                }
+            } catch (Exception e) {
+                LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                closeMasterAndWait();
+            }
+        }
+
+    }
+
+    /**
+     * Compare the master and slave's epoch file, find consistent point, do truncate.
+     */
+    private boolean doTruncate(List<EpochEntry> masterEpochEntries, long masterEndOffset) {
+        final EpochFileCache masterEpochCache = new EpochFileCache();
+        masterEpochCache.initCacheFromEntries(masterEpochEntries);
+        masterEpochCache.setLastEpochEntryEndOffset(masterEndOffset);
+        final EpochFileCache localEpochCache = new EpochFileCache();
+        localEpochCache.initCacheFromEntries(this.epochCache.getAllEntries());
+        localEpochCache.setLastEpochEntryEndOffset(this.messageStore.getMaxPhyOffset());
+
+        final long truncateOffset = localEpochCache.findConsistentPoint(masterEpochCache);

Review Comment:
   1.如果因为某些原因(比如日志被删除),找不到主备间的一致的点,应该等待人工处理,而不是继续往下走。
   2.如果slave是空,是不是可以直接不走截断流程,这里正确的原因主要是找不到主备间的一致的点currentReportedOffset=-1,然后再reportSlaveMaxOffset被修正成0.



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java:
##########
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.List;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.ha.FlowMonitor;
+import org.apache.rocketmq.store.ha.HAConnection;
+import org.apache.rocketmq.store.ha.HAConnectionState;
+import org.apache.rocketmq.store.ha.io.AbstractHAReader;
+import org.apache.rocketmq.store.ha.io.HAWriter;
+
+public class AutoSwitchHAConnection implements HAConnection {
+    /**
+     * Header protocol in syncing msg from master.
+     * Format: current state + body size + offset + epoch  + epochStartOffset + additionalInfo(confirmOffset).
+     * If the msg is hankeShakeMsg, the body size = EpochEntrySize * EpochEntryNums, the offset is maxOffset in master.
+     */
+    public static final int MSG_HEADER_SIZE = 4 + 4 + 8 + 4 + 8 + 8;
+    public static final int EPOCH_ENTRY_SIZE = 12;
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private final AutoSwitchHAService haService;
+    private final SocketChannel socketChannel;
+    private final String clientAddress;
+    private final EpochFileCache epochCache;
+    private final AbstractWriteSocketService writeSocketService;
+    private final ReadSocketService readSocketService;
+    private volatile HAConnectionState currentState = HAConnectionState.HANDSHAKE;
+    private volatile long slaveRequestOffset = -1;
+    private volatile long slaveAckOffset = -1;
+    /**
+     * Whether the slave have already sent a handshake message
+     */
+    private volatile boolean isSlaveSendHandshake = false;
+    private volatile int currentTransferEpoch = -1;
+    private volatile long currentTransferEpochEndOffset = 0;
+    private final FlowMonitor flowMonitor;
+
+    public AutoSwitchHAConnection(AutoSwitchHAService haService, SocketChannel socketChannel,
+        EpochFileCache epochCache) throws IOException {
+        this.haService = haService;
+        this.socketChannel = socketChannel;
+        this.epochCache = epochCache;
+        this.clientAddress = this.socketChannel.socket().getRemoteSocketAddress().toString();
+        this.socketChannel.configureBlocking(false);
+        this.socketChannel.socket().setSoLinger(false, -1);
+        this.socketChannel.socket().setTcpNoDelay(true);
+        this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
+        this.socketChannel.socket().setSendBufferSize(1024 * 64);
+        this.writeSocketService = new WriteSocketService(this.socketChannel);
+        this.readSocketService = new ReadSocketService(this.socketChannel);
+        this.haService.getConnectionCount().incrementAndGet();
+        this.flowMonitor = new FlowMonitor(haService.getDefaultMessageStore().getMessageStoreConfig());
+    }
+
+    @Override public void start() {
+        changeCurrentState(HAConnectionState.HANDSHAKE);
+        this.flowMonitor.start();
+        this.readSocketService.start();
+        this.writeSocketService.start();
+    }
+
+    @Override public void shutdown() {
+        changeCurrentState(HAConnectionState.SHUTDOWN);
+        this.flowMonitor.shutdown(true);
+        this.writeSocketService.shutdown(true);
+        this.readSocketService.shutdown(true);
+        this.close();
+    }
+
+    @Override public void close() {
+        if (this.socketChannel != null) {
+            try {
+                this.socketChannel.close();
+            } catch (final IOException e) {
+                LOGGER.error("", e);
+            }
+        }
+    }
+
+    public void changeCurrentState(HAConnectionState connectionState) {
+        LOGGER.info("change state to {}", connectionState);
+        this.currentState = connectionState;
+    }
+
+    @Override public HAConnectionState getCurrentState() {
+        return currentState;
+    }
+
+    @Override public SocketChannel getSocketChannel() {
+        return socketChannel;
+    }
+
+    @Override public String getClientAddress() {
+        return clientAddress;
+    }
+
+    @Override public long getSlaveAckOffset() {
+        return slaveAckOffset;
+    }
+
+    @Override public long getTransferredByteInSecond() {
+        return flowMonitor.getTransferredByteInSecond();
+    }
+
+    @Override public long getTransferFromWhere() {
+        return this.writeSocketService.getNextTransferFromWhere();
+    }
+
+    private void changeTransferEpochToNext(final EpochEntry entry) {
+        this.currentTransferEpoch = entry.getEpoch();
+        this.currentTransferEpochEndOffset = entry.getEndOffset();
+        if (entry.getEpoch() == this.epochCache.lastEpoch()) {
+            // Use -1 to stand for Long.max
+            this.currentTransferEpochEndOffset = -1;
+        }
+    }
+
+    class ReadSocketService extends ServiceThread {
+        private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
+        private final Selector selector;
+        private final SocketChannel socketChannel;
+        private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+        private final AbstractHAReader haReader;
+        private int processPosition = 0;
+        private volatile long lastReadTimestamp = System.currentTimeMillis();
+
+        public ReadSocketService(final SocketChannel socketChannel) throws IOException {
+            this.selector = RemotingUtil.openSelector();
+            this.socketChannel = socketChannel;
+            this.socketChannel.register(this.selector, SelectionKey.OP_READ);
+            this.setDaemon(true);
+            haReader = new HAServerReader();
+            haReader.registerHook(readSize -> {
+                if (readSize > 0) {
+                    ReadSocketService.this.lastReadTimestamp =
+                        haService.getDefaultMessageStore().getSystemClock().now();
+                }
+            });
+        }
+
+        @Override
+        public void run() {
+            LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.selector.select(1000);
+                    boolean ok = this.haReader.read(this.socketChannel, this.byteBufferRead);
+                    if (!ok) {
+                        AutoSwitchHAConnection.LOGGER.error("processReadEvent error");
+                        break;
+                    }
+
+                    long interval = haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
+                    if (interval > haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
+                        LOGGER.warn("ha housekeeping, found this connection[" + clientAddress + "] expired, " + interval);
+                        break;
+                    }
+                } catch (Exception e) {
+                    AutoSwitchHAConnection.LOGGER.error(this.getServiceName() + " service has exception.", e);
+                    break;
+                }
+            }
+
+            this.makeStop();
+
+            changeCurrentState(HAConnectionState.SHUTDOWN);
+
+            writeSocketService.makeStop();
+
+            haService.removeConnection(AutoSwitchHAConnection.this);
+
+            haService.getConnectionCount().decrementAndGet();
+
+            SelectionKey sk = this.socketChannel.keyFor(this.selector);
+            if (sk != null) {
+                sk.cancel();
+            }
+
+            try {
+                this.selector.close();
+                this.socketChannel.close();
+            } catch (IOException e) {
+                AutoSwitchHAConnection.LOGGER.error("", e);
+            }
+
+            AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            return ReadSocketService.class.getSimpleName();
+        }
+
+        class HAServerReader extends AbstractHAReader {
+            @Override
+            protected boolean processReadResult(ByteBuffer byteBufferRead) {
+                while (true) {
+                    int diff = byteBufferRead.position() - ReadSocketService.this.processPosition;
+                    if (diff >= AutoSwitchHAClient.TRANSFER_HEADER_SIZE) {
+                        int readPosition = ReadSocketService.this.processPosition;
+                        HAConnectionState slaveState = HAConnectionState.values()[byteBufferRead.getInt(readPosition)];
+
+                        switch (slaveState) {
+                            case HANDSHAKE:
+                                isSlaveSendHandshake = true;
+                                ReadSocketService.this.processPosition += AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
+                                break;
+                            case TRANSFER:
+                                long slaveMaxOffset = byteBufferRead.getLong(readPosition + 4);
+                                ReadSocketService.this.processPosition += AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
+
+                                slaveAckOffset = slaveMaxOffset;
+                                if (slaveRequestOffset < 0) {
+                                    slaveRequestOffset = slaveMaxOffset;
+                                }
+                                LOGGER.info("slave[" + clientAddress + "] request offset " + slaveMaxOffset);
+                                AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
+                                break;
+                            default:
+                                LOGGER.error("Current state illegal {}", currentState);
+                                break;
+                        }
+
+                        if (!slaveState.equals(currentState)) {
+                            LOGGER.warn("Master change state from {} to {}", currentState, slaveState);
+                            changeCurrentState(slaveState);
+                        }
+                        continue;
+                    }
+
+                    if (!byteBufferRead.hasRemaining()) {
+                        byteBufferRead.position(ReadSocketService.this.processPosition);
+                        byteBufferRead.compact();
+                        ReadSocketService.this.processPosition = 0;
+                    }
+                    break;
+                }
+
+                return true;
+            }
+        }
+    }
+
+    class WriteSocketService extends AbstractWriteSocketService {
+        private SelectMappedBufferResult selectMappedBufferResult;
+
+        public WriteSocketService(final SocketChannel socketChannel) throws IOException {
+            super(socketChannel);
+        }
+
+        @Override
+        protected int getNextTransferDataSize() {
+            SelectMappedBufferResult selectResult = haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
+            if (selectResult == null || selectResult.getSize() <= 0) {
+                return 0;
+            }
+            this.selectMappedBufferResult = selectResult;
+            return selectResult.getSize();
+        }
+
+        @Override
+        protected void releaseData() {
+            this.selectMappedBufferResult.release();
+            this.selectMappedBufferResult = null;
+        }
+
+        @Override
+        protected boolean transferData(int maxTransferSize) {
+
+            if (null != this.selectMappedBufferResult && maxTransferSize >= 0) {
+                this.selectMappedBufferResult.getByteBuffer().limit(maxTransferSize);
+            }
+
+            // Write Header
+            boolean result = haWriter.write(this.socketChannel, this.byteBufferHeader);
+
+            if (!result) {
+                return false;
+            }
+
+            if (null == this.selectMappedBufferResult) {
+                return true;
+            }
+
+            // Write Body
+            result = haWriter.write(this.socketChannel, this.selectMappedBufferResult.getByteBuffer());
+
+            if (result) {
+                releaseData();
+            }
+            return result;
+        }
+
+        @Override
+        protected void onStop() {
+            if (this.selectMappedBufferResult != null) {
+                this.selectMappedBufferResult.release();
+            }
+        }
+
+        @Override
+        public String getServiceName() {
+            if (haService.getDefaultMessageStore().getBrokerConfig().isInBrokerContainer()) {
+                return haService.getDefaultMessageStore().getBrokerConfig().getLoggerIdentifier() + WriteSocketService.class.getSimpleName();
+            }
+            return WriteSocketService.class.getSimpleName();
+        }
+    }
+
+    abstract class AbstractWriteSocketService extends ServiceThread {
+        protected final Selector selector;
+        protected final SocketChannel socketChannel;
+        protected final HAWriter haWriter;
+
+        protected final ByteBuffer byteBufferHeader = ByteBuffer.allocate(MSG_HEADER_SIZE);
+        // Store master epochFileCache: (Epoch + startOffset) * 1000
+        private final ByteBuffer handShakeBuffer = ByteBuffer.allocate(EPOCH_ENTRY_SIZE * 1000);
+        protected long nextTransferFromWhere = -1;
+        protected boolean lastWriteOver = true;
+        protected long lastWriteTimestamp = System.currentTimeMillis();
+        protected long lastPrintTimestamp = System.currentTimeMillis();
+        protected long transferOffset = 0;
+
+        public AbstractWriteSocketService(final SocketChannel socketChannel) throws IOException {
+            this.selector = RemotingUtil.openSelector();
+            this.socketChannel = socketChannel;
+            this.socketChannel.register(this.selector, SelectionKey.OP_WRITE);
+            this.setDaemon(true);
+            haWriter = new HAWriter();
+            haWriter.registerHook(writeSize -> {
+                flowMonitor.addByteCountTransferred(writeSize);
+                if (writeSize > 0) {
+                    AbstractWriteSocketService.this.lastWriteTimestamp =
+                        haService.getDefaultMessageStore().getSystemClock().now();
+                }
+            });
+        }
+
+        public long getNextTransferFromWhere() {
+            return this.nextTransferFromWhere;
+        }
+
+        // Handle shake method
+        private boolean buildHandshakeBuffer() {
+            final List<EpochEntry> epochEntries = AutoSwitchHAConnection.this.epochCache.getAllEntries();
+            final int lastEpoch = AutoSwitchHAConnection.this.epochCache.lastEpoch();
+            final long maxPhyOffset = AutoSwitchHAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset();
+            this.byteBufferHeader.position(0);
+            this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+            // State
+            this.byteBufferHeader.putInt(currentState.ordinal());
+            // Body size
+            this.byteBufferHeader.putInt(epochEntries.size() * EPOCH_ENTRY_SIZE);
+            // Offset
+            this.byteBufferHeader.putLong(maxPhyOffset);
+            // Epoch
+            this.byteBufferHeader.putInt(lastEpoch);
+            // EpochStartOffset (not needed in handshake)
+            this.byteBufferHeader.putLong(0L);
+            // Additional info (not needed in handshake)
+            this.byteBufferHeader.putLong(0L);
+            this.byteBufferHeader.flip();
+
+            // EpochEntries
+            this.handShakeBuffer.position(0);
+            this.handShakeBuffer.limit(EPOCH_ENTRY_SIZE * epochEntries.size());
+            for (final EpochEntry entry : epochEntries) {
+                if (entry != null) {
+                    this.handShakeBuffer.putInt(entry.getEpoch());
+                    this.handShakeBuffer.putLong(entry.getStartOffset());
+                }
+            }
+            this.handShakeBuffer.flip();
+            LOGGER.info("Master build handshake header: maxEpoch:{}, maxOffset:{}, epochEntries:{}", lastEpoch, maxPhyOffset, epochEntries);
+            return true;
+        }
+
+        private boolean handshakeWithSlave() {
+            // Write Header
+            boolean result = this.haWriter.write(this.socketChannel, this.byteBufferHeader);
+
+            if (!result) {
+                return false;
+            }
+
+            // Write Body
+            return this.haWriter.write(this.socketChannel, this.handShakeBuffer);
+        }
+
+        // Normal transfer method
+        private void buildTransferHeaderBuffer(long nextOffset, int bodySize) {
+            final EpochEntry entry = AutoSwitchHAConnection.this.epochCache.getEntry(AutoSwitchHAConnection.this.currentTransferEpoch);
+            if (entry == null) {
+                LOGGER.error("Failed to find epochEntry with epoch {} when build msg header", AutoSwitchHAConnection.this.currentTransferEpoch);
+                return;
+            }
+            // Build Header
+            this.byteBufferHeader.position(0);
+            this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+            // State
+            this.byteBufferHeader.putInt(currentState.ordinal());
+            // Body size
+            this.byteBufferHeader.putInt(bodySize);
+            // Offset
+            this.byteBufferHeader.putLong(nextOffset);
+            // Epoch
+            this.byteBufferHeader.putInt(entry.getEpoch());
+            // EpochStartOffset
+            this.byteBufferHeader.putLong(entry.getStartOffset());
+            // Additional info(confirm offset)
+            final long confirmOffset = AutoSwitchHAConnection.this.haService.getConfirmOffset();
+            this.byteBufferHeader.putLong(confirmOffset);
+            this.byteBufferHeader.flip();
+            LOGGER.info("Master send msg, state:{}, size:{}, offset:{}, epoch:{}, epochStartOffset:{}, confirmOffset:{}",
+                currentState, bodySize, nextOffset, entry.getEpoch(), entry.getStartOffset(), confirmOffset);
+        }
+
+        private void transferToSlave() throws Exception {
+            if (this.lastWriteOver) {
+                long interval =
+                    haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
+
+                if (interval > haService.getDefaultMessageStore().getMessageStoreConfig()
+                    .getHaSendHeartbeatInterval()) {
+
+                    buildTransferHeaderBuffer(this.nextTransferFromWhere, 0);
+
+                    this.lastWriteOver = this.transferData(0);
+                    if (!this.lastWriteOver) {
+                        return;
+                    }
+                }
+            } else {
+                // maxTransferSize == -1 means to continue transfer remaining data.
+                this.lastWriteOver = this.transferData(-1);
+                if (!this.lastWriteOver) {
+                    return;
+                }
+            }
+
+            int size = this.getNextTransferDataSize();
+            if (size > 0) {
+                if (size > haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
+                    size = haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
+                }
+                int canTransferMaxBytes = flowMonitor.canTransferMaxByteNum();
+                if (size > canTransferMaxBytes) {
+                    if (System.currentTimeMillis() - lastPrintTimestamp > 1000) {
+                        LOGGER.warn("Trigger HA flow control, max transfer speed {}KB/s, current speed: {}KB/s",
+                            String.format("%.2f", flowMonitor.maxTransferByteInSecond() / 1024.0),
+                            String.format("%.2f", flowMonitor.getTransferredByteInSecond() / 1024.0));
+                        lastPrintTimestamp = System.currentTimeMillis();
+                    }
+                    size = canTransferMaxBytes;
+                }
+                if (size <= 0) {
+                    this.releaseData();
+                    this.waitForRunning(100);
+                    return;
+                }
+
+                // We must ensure that the transmitted logs are within the same epoch
+                // If currentEpochEndOffset == -1, means that currentTransferEpoch = last epoch, so the endOffset = Long.max
+                final long currentEpochEndOffset = AutoSwitchHAConnection.this.currentTransferEpochEndOffset;
+                if (currentEpochEndOffset != -1 && this.nextTransferFromWhere + size > currentEpochEndOffset) {
+                    final EpochEntry epochEntry = AutoSwitchHAConnection.this.epochCache.nextEntry(AutoSwitchHAConnection.this.currentTransferEpoch);
+                    if (epochEntry == null) {
+                        LOGGER.error("Can't find a bigger epochEntry than epoch {}", AutoSwitchHAConnection.this.currentTransferEpoch);
+                        waitForRunning(100);
+                        return;
+                    }
+                    size = (int) (currentEpochEndOffset - this.nextTransferFromWhere);
+                    changeTransferEpochToNext(epochEntry);
+                }
+
+                this.transferOffset = this.nextTransferFromWhere;
+                this.nextTransferFromWhere += size;
+
+                // Build Header
+                buildTransferHeaderBuffer(this.transferOffset, size);
+
+                this.lastWriteOver = this.transferData(size);
+            } else {
+                haService.getWaitNotifyObject().allWaitForRunning(100);
+            }
+        }
+
+        @Override public void run() {
+            AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.selector.select(1000);
+
+                    switch (currentState) {
+                        case HANDSHAKE:
+                            // Wait until the slave send it handshake msg to master.
+                            if (!isSlaveSendHandshake) {
+                                this.waitForRunning(10);
+                                continue;
+                            }
+
+                            if (this.lastWriteOver) {
+                                if (!buildHandshakeBuffer()) {
+                                    LOGGER.error("AutoSwitchHAConnection build handshake buffer failed");
+                                    this.waitForRunning(5000);
+                                    continue;
+                                }
+                            }
+
+                            this.lastWriteOver = handshakeWithSlave();
+                            if (this.lastWriteOver) {
+                                // change flag to {false} to wait for slave notification
+                                isSlaveSendHandshake = false;
+                            }
+                            break;
+                        case TRANSFER:
+                            if (-1 == slaveRequestOffset) {
+                                this.waitForRunning(10);
+                                continue;
+                            }
+
+                            if (-1 == this.nextTransferFromWhere) {
+                                if (0 == slaveRequestOffset) {
+                                    // We must ensure that the starting point of syncing log
+                                    // must be the startOffset of a file (maybe the last file, or the minOffset)
+                                    final MessageStoreConfig config = haService.getDefaultMessageStore().getMessageStoreConfig();
+                                    if (config.isSyncFromLastFile()) {

Review Comment:
   从语义上来说这个参数应该是在slave上生效的



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #4236: [Summer of Code] Support switch role for ha service

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on code in PR #4236:
URL: https://github.com/apache/rocketmq/pull/4236#discussion_r863323873


##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java:
##########
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.ha.FlowMonitor;
+import org.apache.rocketmq.store.ha.HAClient;
+import org.apache.rocketmq.store.ha.HAConnectionState;
+import org.apache.rocketmq.store.ha.io.AbstractHAReader;
+import org.apache.rocketmq.store.ha.io.HAWriter;
+
+public class AutoSwitchHAClient extends ServiceThread implements HAClient {
+    /**
+     * Transfer header buffer size. Schema: state ordinal + maxOffset
+     */
+    public static final int TRANSFER_HEADER_SIZE = 4 + 8;
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
+    private final AtomicReference<String> masterHaAddress = new AtomicReference<>();
+    private final AtomicReference<String> masterAddress = new AtomicReference<>();
+    private final ByteBuffer transferHeaderBuffer = ByteBuffer.allocate(TRANSFER_HEADER_SIZE);
+    private final AutoSwitchHAService haService;
+    private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+    private final DefaultMessageStore messageStore;
+    private final EpochFileCache epochCache;
+
+    private SocketChannel socketChannel;
+    private Selector selector;
+    private AbstractHAReader haReader;
+    private HAWriter haWriter;
+    private FlowMonitor flowMonitor;
+    /**
+     * last time that slave reads date from master.
+     */
+    private long lastReadTimestamp;
+    /**
+     * last time that slave reports offset to master.
+     */
+    private long lastWriteTimestamp;
+
+    private long currentReportedOffset;
+    private int processPosition;
+    private volatile HAConnectionState currentState;
+    /**
+     * Current epoch
+     */
+    private volatile long currentReceivedEpoch;
+
+    /**
+     * Confirm offset = min(localMaxOffset, master confirm offset).
+     */
+    private volatile long confirmOffset;
+
+    public AutoSwitchHAClient(AutoSwitchHAService haService, DefaultMessageStore defaultMessageStore,
+        EpochFileCache epochCache) throws IOException {
+        this.haService = haService;
+        this.messageStore = defaultMessageStore;
+        this.epochCache = epochCache;
+        init();
+    }
+
+    public void init() throws IOException {
+        this.selector = RemotingUtil.openSelector();
+        this.flowMonitor = new FlowMonitor(this.messageStore.getMessageStoreConfig());
+        this.haReader = new HAClientReader();
+        haReader.registerHook(readSize -> {
+            if (readSize > 0) {
+                AutoSwitchHAClient.this.flowMonitor.addByteCountTransferred(readSize);
+                lastReadTimestamp = System.currentTimeMillis();
+            }
+        });
+        this.haWriter = new HAWriter();
+        haWriter.registerHook(writeSize -> {
+            if (writeSize > 0) {
+                lastWriteTimestamp = System.currentTimeMillis();
+            }
+        });
+        changeCurrentState(HAConnectionState.READY);
+        this.currentReceivedEpoch = -1;
+        this.currentReportedOffset = 0;
+        this.processPosition = 0;
+        this.confirmOffset = -1;
+        this.lastReadTimestamp = System.currentTimeMillis();
+        this.lastWriteTimestamp = System.currentTimeMillis();
+    }
+
+    public void reOpen() throws IOException {
+        shutdown();
+        init();
+    }
+
+    @Override public String getServiceName() {
+        return AutoSwitchHAClient.class.getSimpleName();
+    }
+
+    @Override public void updateMasterAddress(String newAddress) {
+        String currentAddr = this.masterAddress.get();
+        if (masterAddress.compareAndSet(currentAddr, newAddress)) {
+            LOGGER.info("update master address, OLD: " + currentAddr + " NEW: " + newAddress);
+        }
+    }
+
+    @Override public void updateHaMasterAddress(String newAddress) {
+        String currentAddr = this.masterHaAddress.get();
+        if (masterHaAddress.compareAndSet(currentAddr, newAddress)) {
+            LOGGER.info("update master ha address, OLD: " + currentAddr + " NEW: " + newAddress);
+        }
+    }
+
+    @Override public String getMasterAddress() {
+        return this.masterAddress.get();
+    }
+
+    @Override public String getHaMasterAddress() {
+        return this.masterHaAddress.get();
+    }
+
+    @Override public long getLastReadTimestamp() {
+        return this.lastReadTimestamp;
+    }
+
+    @Override public long getLastWriteTimestamp() {
+        return this.lastWriteTimestamp;
+    }
+
+    @Override public HAConnectionState getCurrentState() {
+        return this.currentState;
+    }
+
+    @Override public void changeCurrentState(HAConnectionState haConnectionState) {
+        LOGGER.info("change state to {}", haConnectionState);
+        this.currentState = haConnectionState;
+    }
+
+    public void closeMasterAndWait() {
+        this.closeMaster();
+        this.waitForRunning(1000 * 5);
+    }
+
+    @Override public void closeMaster() {
+        if (null != this.socketChannel) {
+            try {
+                SelectionKey sk = this.socketChannel.keyFor(this.selector);
+                if (sk != null) {
+                    sk.cancel();
+                }
+
+                this.socketChannel.close();
+                this.socketChannel = null;
+
+                LOGGER.info("AutoRecoverHAClient close connection with master {}", this.masterHaAddress.get());
+                this.changeCurrentState(HAConnectionState.READY);
+            } catch (IOException e) {
+                LOGGER.warn("CloseMaster exception. ", e);
+            }
+
+            this.lastReadTimestamp = 0;
+            this.processPosition = 0;
+
+            this.byteBufferRead.position(0);
+            this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
+        }
+    }
+
+    @Override public long getTransferredByteInSecond() {
+        return this.flowMonitor.getTransferredByteInSecond();
+    }
+
+    @Override public void shutdown() {
+        changeCurrentState(HAConnectionState.SHUTDOWN);
+        // Shutdown thread firstly
+        this.flowMonitor.shutdown();
+        super.shutdown();
+
+        closeMaster();
+        try {
+            this.selector.close();
+        } catch (IOException e) {
+            LOGGER.warn("Close the selector of AutoRecoverHAClient error, ", e);
+        }
+    }
+
+    private boolean isTimeToReportOffset() {
+        long interval = this.messageStore.now() - this.lastWriteTimestamp;
+        return interval > this.messageStore.getMessageStoreConfig().getHaSendHeartbeatInterval();
+    }
+
+    private boolean sendHandshakeHeader() {
+        this.transferHeaderBuffer.position(0);
+        this.transferHeaderBuffer.limit(TRANSFER_HEADER_SIZE);
+        this.transferHeaderBuffer.putInt(HAConnectionState.HANDSHAKE.ordinal());
+        this.transferHeaderBuffer.putLong(0L);
+        this.transferHeaderBuffer.flip();
+        return this.haWriter.write(this.socketChannel, this.transferHeaderBuffer);
+    }
+
+    private void handshakeWithMaster() throws IOException {
+        sendHandshakeHeader();
+        boolean result = this.sendHandshakeHeader();
+        if (!result) {
+            closeMasterAndWait();
+        }
+
+        this.selector.select(5000);
+
+        result = this.haReader.read(this.socketChannel, this.byteBufferRead);
+        if (!result) {
+            closeMasterAndWait();
+            return;
+        }
+    }
+
+    private boolean reportSlaveOffset(final long offsetToReport) {
+        this.transferHeaderBuffer.position(0);
+        this.transferHeaderBuffer.limit(TRANSFER_HEADER_SIZE);
+        this.transferHeaderBuffer.putInt(this.currentState.ordinal());
+        this.transferHeaderBuffer.putLong(offsetToReport);
+        this.transferHeaderBuffer.flip();
+        return this.haWriter.write(this.socketChannel, this.transferHeaderBuffer);
+    }
+
+    private boolean reportSlaveMaxOffset() {
+        boolean result = true;
+        final long maxPhyOffset = this.messageStore.getMaxPhyOffset();
+        if (maxPhyOffset > this.currentReportedOffset) {
+            this.currentReportedOffset = maxPhyOffset;
+            result = reportSlaveOffset(this.currentReportedOffset);
+        }
+        return result;
+    }
+
+    public boolean connectMaster() throws ClosedChannelException {
+        if (null == this.socketChannel) {
+            String addr = this.masterHaAddress.get();
+            if (addr != null) {
+                SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
+                this.socketChannel = RemotingUtil.connect(socketAddress);
+                if (this.socketChannel != null) {
+                    this.socketChannel.register(this.selector, SelectionKey.OP_READ);
+                    LOGGER.info("AutoRecoverHAClient connect to master {}", addr);
+                    changeCurrentState(HAConnectionState.HANDSHAKE);
+                }
+            }
+            this.currentReportedOffset = this.messageStore.getMaxPhyOffset();
+            this.lastReadTimestamp = System.currentTimeMillis();
+        }
+        return this.socketChannel != null;
+    }
+
+    private boolean transferFromMaster() throws IOException {
+        boolean result;
+        if (isTimeToReportOffset()) {
+            LOGGER.info("Slave report current offset {}", this.currentReportedOffset);
+            result = reportSlaveOffset(this.currentReportedOffset);
+            if (!result) {
+                return false;
+            }
+        }
+
+        this.selector.select(1000);
+
+        result = this.haReader.read(this.socketChannel, this.byteBufferRead);
+        if (!result) {
+            return false;
+        }
+
+        return this.reportSlaveMaxOffset();
+    }
+
+    @Override public void run() {
+        LOGGER.info(this.getServiceName() + " service started");
+
+        this.flowMonitor.start();
+        while (!this.isStopped()) {
+            try {
+                switch (this.currentState) {
+                    case SHUTDOWN:
+                        return;
+                    case READY:
+                        // Truncate invalid msg first
+                        final long truncateOffset = AutoSwitchHAClient.this.haService.truncateInvalidMsg();
+                        if (truncateOffset >= 0) {
+                            AutoSwitchHAClient.this.epochCache.truncateFromOffset(truncateOffset);
+                        }
+                        if (!connectMaster()) {
+                            LOGGER.warn("AutoRecoverHAClient connect to master {} failed", this.masterHaAddress.get());
+                            waitForRunning(1000 * 5);
+                        }
+                        continue;
+                    case HANDSHAKE:
+                        handshakeWithMaster();
+                        continue;
+                    case TRANSFER:
+                        if (!transferFromMaster()) {
+                            closeMasterAndWait();
+                            continue;
+                        }
+                        break;
+                    case SUSPEND:
+                    default:
+                        waitForRunning(1000 * 5);
+                        continue;
+                }
+                long interval = this.messageStore.now() - this.lastReadTimestamp;
+                if (interval > this.messageStore.getMessageStoreConfig().getHaHousekeepingInterval()) {
+                    LOGGER.warn("AutoRecoverHAClient, housekeeping, found this connection[" + this.masterHaAddress

Review Comment:
   AutoSwicthHAClient



##########
store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java:
##########
@@ -33,6 +33,11 @@ public class MessageStoreConfig {
     private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
         + File.separator + "commitlog";
 
+    //The directory in which the commitlog is kept
+    @ImportantField
+    private String storePathEpochFile = System.getProperty("user.home") + File.separator + "store"
+        + File.separator + "epochFileCheckpoint";

Review Comment:
   最好和commitlog文件在同一级目录下,现在是和commitlog目录在同一级



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java:
##########
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.ha.FlowMonitor;
+import org.apache.rocketmq.store.ha.HAClient;
+import org.apache.rocketmq.store.ha.HAConnectionState;
+import org.apache.rocketmq.store.ha.io.AbstractHAReader;
+import org.apache.rocketmq.store.ha.io.HAWriter;
+
+public class AutoSwitchHAClient extends ServiceThread implements HAClient {
+    /**
+     * Transfer header buffer size. Schema: state ordinal + maxOffset
+     */
+    public static final int TRANSFER_HEADER_SIZE = 4 + 8;
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
+    private final AtomicReference<String> masterHaAddress = new AtomicReference<>();
+    private final AtomicReference<String> masterAddress = new AtomicReference<>();
+    private final ByteBuffer transferHeaderBuffer = ByteBuffer.allocate(TRANSFER_HEADER_SIZE);
+    private final AutoSwitchHAService haService;
+    private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+    private final DefaultMessageStore messageStore;
+    private final EpochFileCache epochCache;
+
+    private SocketChannel socketChannel;
+    private Selector selector;
+    private AbstractHAReader haReader;
+    private HAWriter haWriter;
+    private FlowMonitor flowMonitor;
+    /**
+     * last time that slave reads date from master.
+     */
+    private long lastReadTimestamp;
+    /**
+     * last time that slave reports offset to master.
+     */
+    private long lastWriteTimestamp;
+
+    private long currentReportedOffset;
+    private int processPosition;
+    private volatile HAConnectionState currentState;
+    /**
+     * Current epoch
+     */
+    private volatile long currentReceivedEpoch;
+
+    /**
+     * Confirm offset = min(localMaxOffset, master confirm offset).
+     */
+    private volatile long confirmOffset;
+
+    public AutoSwitchHAClient(AutoSwitchHAService haService, DefaultMessageStore defaultMessageStore,
+        EpochFileCache epochCache) throws IOException {
+        this.haService = haService;
+        this.messageStore = defaultMessageStore;
+        this.epochCache = epochCache;
+        init();
+    }
+
+    public void init() throws IOException {
+        this.selector = RemotingUtil.openSelector();
+        this.flowMonitor = new FlowMonitor(this.messageStore.getMessageStoreConfig());
+        this.haReader = new HAClientReader();
+        haReader.registerHook(readSize -> {
+            if (readSize > 0) {
+                AutoSwitchHAClient.this.flowMonitor.addByteCountTransferred(readSize);
+                lastReadTimestamp = System.currentTimeMillis();
+            }
+        });
+        this.haWriter = new HAWriter();
+        haWriter.registerHook(writeSize -> {
+            if (writeSize > 0) {
+                lastWriteTimestamp = System.currentTimeMillis();
+            }
+        });
+        changeCurrentState(HAConnectionState.READY);
+        this.currentReceivedEpoch = -1;
+        this.currentReportedOffset = 0;
+        this.processPosition = 0;
+        this.confirmOffset = -1;
+        this.lastReadTimestamp = System.currentTimeMillis();
+        this.lastWriteTimestamp = System.currentTimeMillis();
+    }
+
+    public void reOpen() throws IOException {
+        shutdown();
+        init();
+    }
+
+    @Override public String getServiceName() {
+        return AutoSwitchHAClient.class.getSimpleName();
+    }
+
+    @Override public void updateMasterAddress(String newAddress) {
+        String currentAddr = this.masterAddress.get();
+        if (masterAddress.compareAndSet(currentAddr, newAddress)) {
+            LOGGER.info("update master address, OLD: " + currentAddr + " NEW: " + newAddress);
+        }
+    }
+
+    @Override public void updateHaMasterAddress(String newAddress) {
+        String currentAddr = this.masterHaAddress.get();
+        if (masterHaAddress.compareAndSet(currentAddr, newAddress)) {
+            LOGGER.info("update master ha address, OLD: " + currentAddr + " NEW: " + newAddress);
+        }
+    }
+
+    @Override public String getMasterAddress() {
+        return this.masterAddress.get();
+    }
+
+    @Override public String getHaMasterAddress() {
+        return this.masterHaAddress.get();
+    }
+
+    @Override public long getLastReadTimestamp() {
+        return this.lastReadTimestamp;
+    }
+
+    @Override public long getLastWriteTimestamp() {
+        return this.lastWriteTimestamp;
+    }
+
+    @Override public HAConnectionState getCurrentState() {
+        return this.currentState;
+    }
+
+    @Override public void changeCurrentState(HAConnectionState haConnectionState) {
+        LOGGER.info("change state to {}", haConnectionState);
+        this.currentState = haConnectionState;
+    }
+
+    public void closeMasterAndWait() {
+        this.closeMaster();
+        this.waitForRunning(1000 * 5);
+    }
+
+    @Override public void closeMaster() {
+        if (null != this.socketChannel) {
+            try {
+                SelectionKey sk = this.socketChannel.keyFor(this.selector);
+                if (sk != null) {
+                    sk.cancel();
+                }
+
+                this.socketChannel.close();
+                this.socketChannel = null;
+
+                LOGGER.info("AutoRecoverHAClient close connection with master {}", this.masterHaAddress.get());

Review Comment:
   AutoSwitchHAClient



##########
store/src/test/java/org/apache/rocketmq/store/ha/EpochFileCacheTest.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.ha;
+
+import java.io.File;
+import java.nio.file.Paths;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.store.ha.autoswitch.EpochFileCache;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class EpochFileCacheTest {
+    private EpochFileCache epochCache;
+    private EpochFileCache epochCache2;
+    private String path;
+    private String path2;
+
+    @Before
+    public void setup() {
+        this.path = Paths.get(File.separator + "tmp", "EpochCheckpoint").toString();
+        this.epochCache = new EpochFileCache(path);
+        assertTrue(this.epochCache.appendEntry(new EpochEntry(1, 100)));
+        assertTrue(this.epochCache.appendEntry(new EpochEntry(2, 300)));
+        assertTrue(this.epochCache.appendEntry(new EpochEntry(3, 500)));
+        final EpochEntry entry = this.epochCache.getEntry(2);
+        assertEquals(entry.getEpoch(), 2);
+        assertEquals(entry.getStartOffset(), 300);
+        assertEquals(entry.getEndOffset(), 500);
+    }
+
+    @After
+    public void shutdown() {
+        new File(this.path).delete();
+        if (this.path2 != null) {
+            new File(this.path2).delete();
+        }
+    }
+
+    @Test
+    public void testInitFromFile() {
+        // Remove entries, init from file
+        assertTrue(this.epochCache.initCacheFromFile());
+        final EpochEntry entry = this.epochCache.getEntry(2);
+        assertEquals(entry.getEpoch(), 2);
+        assertEquals(entry.getStartOffset(), 300);
+        assertEquals(entry.getEndOffset(), 500);
+    }
+
+    @Test
+    public void testTruncate() {
+        this.epochCache.truncateFromOffset(150);
+        assertNotNull(this.epochCache.getEntry(1));
+        assertNull(this.epochCache.getEntry(2));
+    }
+
+    @Test
+    public void testFindConsistentPointSample1() {
+        this.path2 = Paths.get(File.separator + "tmp", "EpochCheckpoint2").toString();
+        this.epochCache2 = new EpochFileCache(path2);
+        assertTrue(this.epochCache2.appendEntry(new EpochEntry(1, 100)));
+        assertTrue(this.epochCache2.appendEntry(new EpochEntry(2, 300)));
+        assertTrue(this.epochCache2.appendEntry(new EpochEntry(3, 450)));
+        /**
+         *  cache1: <Epoch1, 100>, <Epoch2, 300>, <Epoch3, 500>
+         *  cache2: <Epoch1, 100>, <Epoch2, 300>, <Epoch3, 450>
+         *  The consistent point should be 450
+         */
+        final long consistentPoint = this.epochCache.findConsistentPoint(this.epochCache2);
+        assertEquals(consistentPoint, 450);
+    }
+
+    @Test
+    public void testFindConsistentPointSample2() {
+        this.path2 = Paths.get(File.separator + "tmp", "EpochCheckpoint2").toString();
+        this.epochCache2 = new EpochFileCache(path2);
+        assertTrue(this.epochCache2.appendEntry(new EpochEntry(1, 100)));
+        assertTrue(this.epochCache2.appendEntry(new EpochEntry(2, 300)));
+        assertTrue(this.epochCache2.appendEntry(new EpochEntry(3, 500)));
+        /**
+         *  cache1: <Epoch1, 100>, <Epoch2, 300>, <Epoch3, 500, 700>
+         *  cache2: <Epoch1, 100>, <Epoch2, 300>, <Epoch3, 500, 600>
+         *  The consistent point should be 600
+         */
+        this.epochCache.setLastEpochEntryEndOffset(700);
+        this.epochCache2.setLastEpochEntryEndOffset(600);
+        final long consistentPoint = this.epochCache.findConsistentPoint(this.epochCache2);
+        assertEquals(consistentPoint, 600);
+    }

Review Comment:
   It would be better to add some other samples :-)



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/EpochFileCache.java:
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.CheckpointFile;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+/**
+ * Cache for epochFile.
+ * Mapping (Epoch -> StartOffset)
+ */
+public class EpochFileCache {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    private final Lock readLock = this.readWriteLock.readLock();
+    private final Lock writeLock = this.readWriteLock.writeLock();
+    private final TreeMap<Integer, EpochEntry> epochMap;
+    private CheckpointFile<EpochEntry> checkpoint;
+
+    public EpochFileCache() {
+        this.epochMap = new TreeMap<>();
+    }
+
+    public EpochFileCache(final String path) {
+        this.epochMap = new TreeMap<>();
+        this.checkpoint = new CheckpointFile<>(path, new EpochEntrySerializer());
+    }
+
+    public boolean initCacheFromFile() {
+        this.writeLock.lock();
+        try {
+            final List<EpochEntry> entries = this.checkpoint.read();
+            initEntries(entries);
+            return true;
+        } catch (final IOException e) {
+            log.error("Error happen when init epoch entries from epochFile", e);
+            return false;
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    public void initCacheFromEntries(final List<EpochEntry> entries) {
+        this.writeLock.lock();
+        try {
+            initEntries(entries);
+            flush();
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    private void initEntries(final List<EpochEntry> entries) {
+        this.epochMap.clear();
+        EpochEntry preEntry = null;
+        for (final EpochEntry entry : entries) {
+            this.epochMap.put(entry.getEpoch(), entry);
+            if (preEntry != null) {
+                preEntry.setEndOffset(entry.getStartOffset());
+            }
+            preEntry = entry;
+        }
+    }
+
+    public boolean appendEntry(final EpochEntry entry) {
+        this.writeLock.lock();
+        try {
+            if (!this.epochMap.isEmpty()) {
+                final EpochEntry lastEntry = this.epochMap.lastEntry().getValue();
+                if (lastEntry.getEpoch() >= entry.getEpoch() || lastEntry.getStartOffset() >= entry.getStartOffset()) {
+                    return false;
+                }
+                lastEntry.setEndOffset(entry.getStartOffset());
+            }
+            this.epochMap.put(entry.getEpoch(), new EpochEntry(entry));
+            flush();
+            return true;
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    /**
+     * Set endOffset for lastEpochEntry.
+     */
+    public void setLastEpochEntryEndOffset(final long endOffset) {
+        this.writeLock.lock();
+        try {
+            if (!this.epochMap.isEmpty()) {
+                final EpochEntry lastEntry = this.epochMap.lastEntry().getValue();
+                if (lastEntry.getStartOffset() <= endOffset) {
+                    lastEntry.setEndOffset(endOffset);
+                }
+            }
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    public EpochEntry firstEntry() {
+        this.readLock.lock();
+        try {
+            if (this.epochMap.isEmpty()) {
+                return null;
+            }
+            return new EpochEntry(this.epochMap.firstEntry().getValue());
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public EpochEntry lastEntry() {
+        this.readLock.lock();
+        try {
+            if (this.epochMap.isEmpty()) {
+                return null;
+            }
+            return new EpochEntry(this.epochMap.lastEntry().getValue());
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public int lastEpoch() {
+        final EpochEntry entry = lastEntry();
+        if (entry != null) {
+            return entry.getEpoch();
+        }
+        return -1;
+    }
+
+    public EpochEntry getEntry(final int epoch) {
+        this.readLock.lock();
+        try {
+            if (this.epochMap.containsKey(epoch)) {
+                final EpochEntry entry = this.epochMap.get(epoch);
+                return new EpochEntry(entry);
+            }
+            return null;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public EpochEntry findEpochEntryByOffset(final long offset) {
+        this.readLock.lock();
+        try {
+            if (!this.epochMap.isEmpty()) {
+                for (Map.Entry<Integer, EpochEntry> entry : this.epochMap.entrySet()) {
+                    if (entry.getValue().getStartOffset() <= offset && entry.getValue().getEndOffset() > offset) {
+                        return new EpochEntry(entry.getValue());
+                    }
+                }
+            }
+            return null;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public EpochEntry nextEntry(final int epoch) {
+        this.readLock.lock();
+        try {
+            final Map.Entry<Integer, EpochEntry> entry = this.epochMap.ceilingEntry(epoch + 1);
+            if (entry != null) {
+                return new EpochEntry(entry.getValue());
+            }
+            return null;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public List<EpochEntry> getAllEntries() {
+        this.readLock.lock();
+        try {
+            final ArrayList<EpochEntry> result = new ArrayList<>(this.epochMap.size());
+            this.epochMap.forEach((key, value) -> result.add(new EpochEntry(value)));
+            return result;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    /**
+     * Find the consistentPoint between compareCache and local.
+     *
+     * @return the consistent offset
+     */
+    public long findConsistentPoint(final EpochFileCache compareCache) {
+        this.readLock.lock();
+        try {
+            long consistentOffset = -1;
+            final Map<Integer, EpochEntry> descendingMap = new TreeMap<>(this.epochMap).descendingMap();
+            final Iterator<Map.Entry<Integer, EpochEntry>> iter = descendingMap.entrySet().iterator();
+            while (iter.hasNext()) {
+                final Map.Entry<Integer, EpochEntry> curLocalEntry = iter.next();
+                final EpochEntry compareEntry = compareCache.getEntry(curLocalEntry.getKey());
+                if (compareEntry != null && compareEntry.getStartOffset() == curLocalEntry.getValue().getStartOffset()) {
+                    consistentOffset = Math.min(curLocalEntry.getValue().getEndOffset(), compareEntry.getEndOffset());
+                    break;
+                }
+            }
+            return consistentOffset;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    /**
+     * Remove epochEntries with epoch >= truncateEpoch.
+     */
+    public void truncateFromEpoch(final int truncateEpoch) {
+        doTruncate((entry) -> entry.getEpoch() >= truncateEpoch);
+    }
+
+    /**
+     * Remove epochEntries with startOffset >= truncateOffset.
+     */
+    public void truncateFromOffset(final long truncateOffset) {
+        doTruncate((entry) -> entry.getStartOffset() >= truncateOffset);
+    }
+
+    /**
+     * Clear all epochEntries
+     */
+    public void clearAll() {
+        doTruncate((entry) -> true);
+    }
+
+    private void doTruncate(final Predicate<EpochEntry> predict) {
+        this.writeLock.lock();
+        try {
+            this.epochMap.entrySet().removeIf(entry -> predict.test(entry.getValue()));
+            final EpochEntry entry = lastEntry();
+            if (entry != null) {
+                entry.setEndOffset(Long.MAX_VALUE);
+            }
+            flush();
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    private void flush() {
+        this.writeLock.lock();
+        try {
+            final ArrayList<EpochEntry> entries = new ArrayList<>(this.epochMap.values());
+            try {
+                if (this.checkpoint != null) {
+                    this.checkpoint.write(entries);
+                }
+            } catch (final IOException e) {
+                log.error("Error happen when flush epochEntries to epochCheckpointFile", e);
+            }

Review Comment:
   It would be better to initialize entries when the checkpoint is not  null



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/EpochFileCache.java:
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.CheckpointFile;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+/**
+ * Cache for epochFile.
+ * Mapping (Epoch -> StartOffset)
+ */
+public class EpochFileCache {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    private final Lock readLock = this.readWriteLock.readLock();
+    private final Lock writeLock = this.readWriteLock.writeLock();
+    private final TreeMap<Integer, EpochEntry> epochMap;
+    private CheckpointFile<EpochEntry> checkpoint;
+
+    public EpochFileCache() {
+        this.epochMap = new TreeMap<>();
+    }
+
+    public EpochFileCache(final String path) {
+        this.epochMap = new TreeMap<>();
+        this.checkpoint = new CheckpointFile<>(path, new EpochEntrySerializer());
+    }
+
+    public boolean initCacheFromFile() {
+        this.writeLock.lock();
+        try {
+            final List<EpochEntry> entries = this.checkpoint.read();
+            initEntries(entries);
+            return true;
+        } catch (final IOException e) {
+            log.error("Error happen when init epoch entries from epochFile", e);
+            return false;
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    public void initCacheFromEntries(final List<EpochEntry> entries) {
+        this.writeLock.lock();
+        try {
+            initEntries(entries);
+            flush();
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    private void initEntries(final List<EpochEntry> entries) {
+        this.epochMap.clear();
+        EpochEntry preEntry = null;
+        for (final EpochEntry entry : entries) {
+            this.epochMap.put(entry.getEpoch(), entry);
+            if (preEntry != null) {
+                preEntry.setEndOffset(entry.getStartOffset());
+            }
+            preEntry = entry;
+        }
+    }
+
+    public boolean appendEntry(final EpochEntry entry) {
+        this.writeLock.lock();
+        try {
+            if (!this.epochMap.isEmpty()) {
+                final EpochEntry lastEntry = this.epochMap.lastEntry().getValue();
+                if (lastEntry.getEpoch() >= entry.getEpoch() || lastEntry.getStartOffset() >= entry.getStartOffset()) {
+                    return false;
+                }

Review Comment:
   It would be better to print logs when appending entry fails.



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java:
##########
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.List;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.ha.FlowMonitor;
+import org.apache.rocketmq.store.ha.HAConnection;
+import org.apache.rocketmq.store.ha.HAConnectionState;
+import org.apache.rocketmq.store.ha.io.AbstractHAReader;
+import org.apache.rocketmq.store.ha.io.HAWriter;
+
+public class AutoSwitchHAConnection implements HAConnection {
+    /**
+     * Header protocol in syncing msg from master.
+     * Format: current state + body size + offset + epoch + additionalInfo(confirmOffset / lastFlushOffset).
+     * If the msg is normalMsg, the additionalInfo is confirmOffset
+     * If the msg is hankeShakeMsg, the body size = EpochEntrySize * EpochEntryNums, the additionalInfo is lastFlushOffset
+     */
+    public static final int MSG_HEADER_SIZE = 4 + 4 + 8 + 4 + 8;
+    public static final int EPOCH_ENTRY_SIZE = 12;
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private final AutoSwitchHAService haService;
+    private final SocketChannel socketChannel;
+    private final String clientAddress;
+    private final EpochFileCache epochCache;
+    private final AbstractWriteSocketService writeSocketService;
+    private final ReadSocketService readSocketService;
+    private volatile HAConnectionState currentState = HAConnectionState.HANDSHAKE;
+    private volatile long slaveRequestOffset = -1;
+    private volatile long slaveAckOffset = -1;
+    /**
+     * Whether the slave have already sent a handshake message
+     */
+    private volatile boolean isSlaveSendHandshake = false;
+    private volatile int currentTransferEpoch = -1;
+    private volatile long currentTransferEpochEndOffset = 0;
+    private final FlowMonitor flowMonitor;
+
+    public AutoSwitchHAConnection(AutoSwitchHAService haService, SocketChannel socketChannel,
+        EpochFileCache epochCache) throws IOException {
+        this.haService = haService;
+        this.socketChannel = socketChannel;
+        this.epochCache = epochCache;
+        this.clientAddress = this.socketChannel.socket().getRemoteSocketAddress().toString();
+        this.socketChannel.configureBlocking(false);
+        this.socketChannel.socket().setSoLinger(false, -1);
+        this.socketChannel.socket().setTcpNoDelay(true);
+        this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
+        this.socketChannel.socket().setSendBufferSize(1024 * 64);
+        this.writeSocketService = new WriteSocketService(this.socketChannel);
+        this.readSocketService = new ReadSocketService(this.socketChannel);
+        this.haService.getConnectionCount().incrementAndGet();
+        this.flowMonitor = new FlowMonitor(haService.getDefaultMessageStore().getMessageStoreConfig());
+    }
+
+    @Override public void start() {
+        changeCurrentState(HAConnectionState.HANDSHAKE);
+        this.flowMonitor.start();
+        this.readSocketService.start();
+        this.writeSocketService.start();
+    }
+
+    @Override public void shutdown() {
+        changeCurrentState(HAConnectionState.SHUTDOWN);
+        this.flowMonitor.shutdown(true);
+        this.writeSocketService.shutdown(true);
+        this.readSocketService.shutdown(true);
+        this.close();
+    }
+
+    @Override public void close() {
+        if (this.socketChannel != null) {
+            try {
+                this.socketChannel.close();
+            } catch (final IOException e) {
+                LOGGER.error("", e);
+            }
+        }
+    }
+
+    public void changeCurrentState(HAConnectionState connectionState) {
+        LOGGER.info("change state to {}", connectionState);
+        this.currentState = connectionState;
+    }
+
+    @Override public HAConnectionState getCurrentState() {
+        return currentState;
+    }
+
+    @Override public SocketChannel getSocketChannel() {
+        return socketChannel;
+    }
+
+    @Override public String getClientAddress() {
+        return clientAddress;
+    }
+
+    @Override public long getSlaveAckOffset() {
+        return slaveAckOffset;
+    }
+
+    @Override public long getTransferredByteInSecond() {
+        return flowMonitor.getTransferredByteInSecond();
+    }
+
+    @Override public long getTransferFromWhere() {
+        return this.writeSocketService.getNextTransferFromWhere();
+    }
+
+    class ReadSocketService extends ServiceThread {
+        private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
+        private final Selector selector;
+        private final SocketChannel socketChannel;
+        private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+        private final AbstractHAReader haReader;
+        private int processPosition = 0;
+        private volatile long lastReadTimestamp = System.currentTimeMillis();
+
+        public ReadSocketService(final SocketChannel socketChannel) throws IOException {
+            this.selector = RemotingUtil.openSelector();
+            this.socketChannel = socketChannel;
+            this.socketChannel.register(this.selector, SelectionKey.OP_READ);
+            this.setDaemon(true);
+            haReader = new HAServerReader();
+            haReader.registerHook(readSize -> {
+                if (readSize > 0) {
+                    ReadSocketService.this.lastReadTimestamp =
+                        haService.getDefaultMessageStore().getSystemClock().now();
+                }
+            });
+        }
+
+        @Override
+        public void run() {
+            LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.selector.select(1000);
+                    boolean ok = this.haReader.read(this.socketChannel, this.byteBufferRead);
+                    if (!ok) {
+                        AutoSwitchHAConnection.LOGGER.error("processReadEvent error");
+                        break;
+                    }
+
+                    long interval = haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
+                    if (interval > haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
+                        LOGGER.warn("ha housekeeping, found this connection[" + clientAddress + "] expired, " + interval);
+                        break;
+                    }
+                } catch (Exception e) {
+                    AutoSwitchHAConnection.LOGGER.error(this.getServiceName() + " service has exception.", e);
+                    break;
+                }
+            }
+
+            this.makeStop();
+
+            changeCurrentState(HAConnectionState.SHUTDOWN);
+
+            writeSocketService.makeStop();
+
+            haService.removeConnection(AutoSwitchHAConnection.this);
+
+            haService.getConnectionCount().decrementAndGet();
+
+            SelectionKey sk = this.socketChannel.keyFor(this.selector);
+            if (sk != null) {
+                sk.cancel();
+            }
+
+            try {
+                this.selector.close();
+                this.socketChannel.close();
+            } catch (IOException e) {
+                AutoSwitchHAConnection.LOGGER.error("", e);
+            }
+
+            AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            return ReadSocketService.class.getSimpleName();
+        }
+
+        class HAServerReader extends AbstractHAReader {
+            @Override
+            protected boolean processReadResult(ByteBuffer byteBufferRead) {
+                while (true) {
+                    int diff = byteBufferRead.position() - ReadSocketService.this.processPosition;
+                    if (diff >= AutoSwitchHAClient.TRANSFER_HEADER_SIZE) {
+                        int readPosition = ReadSocketService.this.processPosition;
+                        HAConnectionState slaveState = HAConnectionState.values()[byteBufferRead.getInt(readPosition)];
+
+                        switch (slaveState) {
+                            case HANDSHAKE:
+                                isSlaveSendHandshake = true;
+                                ReadSocketService.this.processPosition += AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
+                                break;
+                            case TRANSFER:
+                                long slaveMaxOffset = byteBufferRead.getLong(readPosition + 4);
+                                ReadSocketService.this.processPosition += AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
+
+                                slaveAckOffset = slaveMaxOffset;
+                                if (slaveRequestOffset < 0) {
+                                    slaveRequestOffset = slaveMaxOffset;
+                                }
+                                LOGGER.info("slave[" + clientAddress + "] request offset " + slaveMaxOffset);
+                                AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
+                                break;
+                            default:
+                                LOGGER.error("Current state illegal {}", currentState);
+                                break;
+                        }
+
+                        if (!slaveState.equals(currentState)) {
+                            LOGGER.warn("Master change state from {} to {}", currentState, slaveState);
+                            changeCurrentState(slaveState);
+                        }
+                        continue;
+                    }
+
+                    if (!byteBufferRead.hasRemaining()) {
+                        byteBufferRead.position(ReadSocketService.this.processPosition);
+                        byteBufferRead.compact();
+                        ReadSocketService.this.processPosition = 0;
+                    }
+                    break;
+                }
+
+                return true;
+            }
+        }
+    }
+
+    class WriteSocketService extends AbstractWriteSocketService {
+        private SelectMappedBufferResult selectMappedBufferResult;
+
+        public WriteSocketService(final SocketChannel socketChannel) throws IOException {
+            super(socketChannel);
+        }
+
+        @Override
+        protected int getNextTransferDataSize() {
+            SelectMappedBufferResult selectResult = haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
+            if (selectResult == null || selectResult.getSize() <= 0) {
+                return 0;
+            }
+            this.selectMappedBufferResult = selectResult;
+            return selectResult.getSize();
+        }
+
+        @Override
+        protected void releaseData() {
+            this.selectMappedBufferResult.release();
+            this.selectMappedBufferResult = null;
+        }
+
+        @Override
+        protected boolean transferData(int maxTransferSize) {
+
+            if (null != this.selectMappedBufferResult && maxTransferSize >= 0) {
+                this.selectMappedBufferResult.getByteBuffer().limit(maxTransferSize);
+            }
+
+            // Write Header
+            boolean result = haWriter.write(this.socketChannel, this.byteBufferHeader);
+
+            if (!result) {
+                return false;
+            }
+
+            if (null == this.selectMappedBufferResult) {
+                return true;
+            }
+
+            // Write Body
+            result = haWriter.write(this.socketChannel, this.selectMappedBufferResult.getByteBuffer());
+
+            if (result) {
+                releaseData();
+            }
+            return result;
+        }
+
+        @Override
+        protected void onStop() {
+            if (this.selectMappedBufferResult != null) {
+                this.selectMappedBufferResult.release();
+            }
+        }
+
+        @Override
+        public String getServiceName() {
+            if (haService.getDefaultMessageStore().getBrokerConfig().isInBrokerContainer()) {
+                return haService.getDefaultMessageStore().getBrokerConfig().getLoggerIdentifier() + WriteSocketService.class.getSimpleName();
+            }
+            return WriteSocketService.class.getSimpleName();
+        }
+    }
+
+    abstract class AbstractWriteSocketService extends ServiceThread {
+        protected final Selector selector;
+        protected final SocketChannel socketChannel;
+        protected final HAWriter haWriter;
+
+        protected final ByteBuffer byteBufferHeader = ByteBuffer.allocate(MSG_HEADER_SIZE);
+        // Store master epochFileCache: (Epoch + startOffset) * 1000
+        private final ByteBuffer handShakeBuffer = ByteBuffer.allocate(EPOCH_ENTRY_SIZE * 1000);
+        protected long nextTransferFromWhere = -1;
+        protected boolean lastWriteOver = true;
+        protected long lastWriteTimestamp = System.currentTimeMillis();
+        protected long lastPrintTimestamp = System.currentTimeMillis();
+        protected long transferOffset = 0;
+
+        public AbstractWriteSocketService(final SocketChannel socketChannel) throws IOException {
+            this.selector = RemotingUtil.openSelector();
+            this.socketChannel = socketChannel;
+            this.socketChannel.register(this.selector, SelectionKey.OP_WRITE);
+            this.setDaemon(true);
+            haWriter = new HAWriter();
+            haWriter.registerHook(writeSize -> {
+                flowMonitor.addByteCountTransferred(writeSize);
+                if (writeSize > 0) {
+                    AbstractWriteSocketService.this.lastWriteTimestamp =
+                        haService.getDefaultMessageStore().getSystemClock().now();
+                }
+            });
+        }
+
+        public long getNextTransferFromWhere() {
+            return this.nextTransferFromWhere;
+        }
+
+        // Handle shake method
+        private boolean buildHandshakeBuffer() {
+            final List<EpochEntry> epochEntries = AutoSwitchHAConnection.this.epochCache.getAllEntries();
+            final int lastEpoch = AutoSwitchHAConnection.this.epochCache.lastEpoch();
+            final long maxPhyOffset = AutoSwitchHAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset();
+            this.byteBufferHeader.position(0);
+            this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+            // State
+            this.byteBufferHeader.putInt(currentState.ordinal());
+            // Body size
+            this.byteBufferHeader.putInt(epochEntries.size() * EPOCH_ENTRY_SIZE);
+            // Offset
+            this.byteBufferHeader.putLong(0L);

Review Comment:
   Why not put masterMaxOffset in here but in additionalOffset?



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java:
##########
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.List;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.ha.FlowMonitor;
+import org.apache.rocketmq.store.ha.HAConnection;
+import org.apache.rocketmq.store.ha.HAConnectionState;
+import org.apache.rocketmq.store.ha.io.AbstractHAReader;
+import org.apache.rocketmq.store.ha.io.HAWriter;
+
+public class AutoSwitchHAConnection implements HAConnection {
+    /**
+     * Header protocol in syncing msg from master.
+     * Format: current state + body size + offset + epoch + additionalInfo(confirmOffset / lastFlushOffset).
+     * If the msg is normalMsg, the additionalInfo is confirmOffset
+     * If the msg is hankeShakeMsg, the body size = EpochEntrySize * EpochEntryNums, the additionalInfo is lastFlushOffset
+     */
+    public static final int MSG_HEADER_SIZE = 4 + 4 + 8 + 4 + 8;
+    public static final int EPOCH_ENTRY_SIZE = 12;
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private final AutoSwitchHAService haService;
+    private final SocketChannel socketChannel;
+    private final String clientAddress;
+    private final EpochFileCache epochCache;
+    private final AbstractWriteSocketService writeSocketService;
+    private final ReadSocketService readSocketService;
+    private volatile HAConnectionState currentState = HAConnectionState.HANDSHAKE;
+    private volatile long slaveRequestOffset = -1;
+    private volatile long slaveAckOffset = -1;
+    /**
+     * Whether the slave have already sent a handshake message
+     */
+    private volatile boolean isSlaveSendHandshake = false;
+    private volatile int currentTransferEpoch = -1;
+    private volatile long currentTransferEpochEndOffset = 0;
+    private final FlowMonitor flowMonitor;
+
+    public AutoSwitchHAConnection(AutoSwitchHAService haService, SocketChannel socketChannel,
+        EpochFileCache epochCache) throws IOException {
+        this.haService = haService;
+        this.socketChannel = socketChannel;
+        this.epochCache = epochCache;
+        this.clientAddress = this.socketChannel.socket().getRemoteSocketAddress().toString();
+        this.socketChannel.configureBlocking(false);
+        this.socketChannel.socket().setSoLinger(false, -1);
+        this.socketChannel.socket().setTcpNoDelay(true);
+        this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
+        this.socketChannel.socket().setSendBufferSize(1024 * 64);
+        this.writeSocketService = new WriteSocketService(this.socketChannel);
+        this.readSocketService = new ReadSocketService(this.socketChannel);
+        this.haService.getConnectionCount().incrementAndGet();
+        this.flowMonitor = new FlowMonitor(haService.getDefaultMessageStore().getMessageStoreConfig());
+    }
+
+    @Override public void start() {
+        changeCurrentState(HAConnectionState.HANDSHAKE);
+        this.flowMonitor.start();
+        this.readSocketService.start();
+        this.writeSocketService.start();
+    }
+
+    @Override public void shutdown() {
+        changeCurrentState(HAConnectionState.SHUTDOWN);
+        this.flowMonitor.shutdown(true);
+        this.writeSocketService.shutdown(true);
+        this.readSocketService.shutdown(true);
+        this.close();
+    }
+
+    @Override public void close() {
+        if (this.socketChannel != null) {
+            try {
+                this.socketChannel.close();
+            } catch (final IOException e) {
+                LOGGER.error("", e);
+            }
+        }
+    }
+
+    public void changeCurrentState(HAConnectionState connectionState) {
+        LOGGER.info("change state to {}", connectionState);
+        this.currentState = connectionState;
+    }
+
+    @Override public HAConnectionState getCurrentState() {
+        return currentState;
+    }
+
+    @Override public SocketChannel getSocketChannel() {
+        return socketChannel;
+    }
+
+    @Override public String getClientAddress() {
+        return clientAddress;
+    }
+
+    @Override public long getSlaveAckOffset() {
+        return slaveAckOffset;
+    }
+
+    @Override public long getTransferredByteInSecond() {
+        return flowMonitor.getTransferredByteInSecond();
+    }
+
+    @Override public long getTransferFromWhere() {
+        return this.writeSocketService.getNextTransferFromWhere();
+    }
+
+    class ReadSocketService extends ServiceThread {
+        private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
+        private final Selector selector;
+        private final SocketChannel socketChannel;
+        private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+        private final AbstractHAReader haReader;
+        private int processPosition = 0;
+        private volatile long lastReadTimestamp = System.currentTimeMillis();
+
+        public ReadSocketService(final SocketChannel socketChannel) throws IOException {
+            this.selector = RemotingUtil.openSelector();
+            this.socketChannel = socketChannel;
+            this.socketChannel.register(this.selector, SelectionKey.OP_READ);
+            this.setDaemon(true);
+            haReader = new HAServerReader();
+            haReader.registerHook(readSize -> {
+                if (readSize > 0) {
+                    ReadSocketService.this.lastReadTimestamp =
+                        haService.getDefaultMessageStore().getSystemClock().now();
+                }
+            });
+        }
+
+        @Override
+        public void run() {
+            LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.selector.select(1000);
+                    boolean ok = this.haReader.read(this.socketChannel, this.byteBufferRead);
+                    if (!ok) {
+                        AutoSwitchHAConnection.LOGGER.error("processReadEvent error");
+                        break;
+                    }
+
+                    long interval = haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
+                    if (interval > haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
+                        LOGGER.warn("ha housekeeping, found this connection[" + clientAddress + "] expired, " + interval);
+                        break;
+                    }
+                } catch (Exception e) {
+                    AutoSwitchHAConnection.LOGGER.error(this.getServiceName() + " service has exception.", e);
+                    break;
+                }
+            }
+
+            this.makeStop();
+
+            changeCurrentState(HAConnectionState.SHUTDOWN);
+
+            writeSocketService.makeStop();
+
+            haService.removeConnection(AutoSwitchHAConnection.this);
+
+            haService.getConnectionCount().decrementAndGet();
+
+            SelectionKey sk = this.socketChannel.keyFor(this.selector);
+            if (sk != null) {
+                sk.cancel();
+            }
+
+            try {
+                this.selector.close();
+                this.socketChannel.close();
+            } catch (IOException e) {
+                AutoSwitchHAConnection.LOGGER.error("", e);
+            }
+
+            AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            return ReadSocketService.class.getSimpleName();
+        }
+
+        class HAServerReader extends AbstractHAReader {
+            @Override
+            protected boolean processReadResult(ByteBuffer byteBufferRead) {
+                while (true) {
+                    int diff = byteBufferRead.position() - ReadSocketService.this.processPosition;
+                    if (diff >= AutoSwitchHAClient.TRANSFER_HEADER_SIZE) {
+                        int readPosition = ReadSocketService.this.processPosition;
+                        HAConnectionState slaveState = HAConnectionState.values()[byteBufferRead.getInt(readPosition)];
+
+                        switch (slaveState) {
+                            case HANDSHAKE:
+                                isSlaveSendHandshake = true;
+                                ReadSocketService.this.processPosition += AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
+                                break;
+                            case TRANSFER:
+                                long slaveMaxOffset = byteBufferRead.getLong(readPosition + 4);
+                                ReadSocketService.this.processPosition += AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
+
+                                slaveAckOffset = slaveMaxOffset;
+                                if (slaveRequestOffset < 0) {
+                                    slaveRequestOffset = slaveMaxOffset;
+                                }
+                                LOGGER.info("slave[" + clientAddress + "] request offset " + slaveMaxOffset);
+                                AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
+                                break;
+                            default:
+                                LOGGER.error("Current state illegal {}", currentState);
+                                break;
+                        }
+
+                        if (!slaveState.equals(currentState)) {
+                            LOGGER.warn("Master change state from {} to {}", currentState, slaveState);
+                            changeCurrentState(slaveState);
+                        }
+                        continue;
+                    }
+
+                    if (!byteBufferRead.hasRemaining()) {
+                        byteBufferRead.position(ReadSocketService.this.processPosition);
+                        byteBufferRead.compact();
+                        ReadSocketService.this.processPosition = 0;
+                    }
+                    break;
+                }
+
+                return true;
+            }
+        }
+    }
+
+    class WriteSocketService extends AbstractWriteSocketService {
+        private SelectMappedBufferResult selectMappedBufferResult;
+
+        public WriteSocketService(final SocketChannel socketChannel) throws IOException {
+            super(socketChannel);
+        }
+
+        @Override
+        protected int getNextTransferDataSize() {
+            SelectMappedBufferResult selectResult = haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
+            if (selectResult == null || selectResult.getSize() <= 0) {
+                return 0;
+            }
+            this.selectMappedBufferResult = selectResult;
+            return selectResult.getSize();
+        }
+
+        @Override
+        protected void releaseData() {
+            this.selectMappedBufferResult.release();
+            this.selectMappedBufferResult = null;
+        }
+
+        @Override
+        protected boolean transferData(int maxTransferSize) {
+
+            if (null != this.selectMappedBufferResult && maxTransferSize >= 0) {
+                this.selectMappedBufferResult.getByteBuffer().limit(maxTransferSize);
+            }
+
+            // Write Header
+            boolean result = haWriter.write(this.socketChannel, this.byteBufferHeader);
+
+            if (!result) {
+                return false;
+            }
+
+            if (null == this.selectMappedBufferResult) {
+                return true;
+            }
+
+            // Write Body
+            result = haWriter.write(this.socketChannel, this.selectMappedBufferResult.getByteBuffer());
+
+            if (result) {
+                releaseData();
+            }
+            return result;
+        }
+
+        @Override
+        protected void onStop() {
+            if (this.selectMappedBufferResult != null) {
+                this.selectMappedBufferResult.release();
+            }
+        }
+
+        @Override
+        public String getServiceName() {
+            if (haService.getDefaultMessageStore().getBrokerConfig().isInBrokerContainer()) {
+                return haService.getDefaultMessageStore().getBrokerConfig().getLoggerIdentifier() + WriteSocketService.class.getSimpleName();
+            }
+            return WriteSocketService.class.getSimpleName();
+        }
+    }
+
+    abstract class AbstractWriteSocketService extends ServiceThread {
+        protected final Selector selector;
+        protected final SocketChannel socketChannel;
+        protected final HAWriter haWriter;
+
+        protected final ByteBuffer byteBufferHeader = ByteBuffer.allocate(MSG_HEADER_SIZE);
+        // Store master epochFileCache: (Epoch + startOffset) * 1000
+        private final ByteBuffer handShakeBuffer = ByteBuffer.allocate(EPOCH_ENTRY_SIZE * 1000);
+        protected long nextTransferFromWhere = -1;
+        protected boolean lastWriteOver = true;
+        protected long lastWriteTimestamp = System.currentTimeMillis();
+        protected long lastPrintTimestamp = System.currentTimeMillis();
+        protected long transferOffset = 0;
+
+        public AbstractWriteSocketService(final SocketChannel socketChannel) throws IOException {
+            this.selector = RemotingUtil.openSelector();
+            this.socketChannel = socketChannel;
+            this.socketChannel.register(this.selector, SelectionKey.OP_WRITE);
+            this.setDaemon(true);
+            haWriter = new HAWriter();
+            haWriter.registerHook(writeSize -> {
+                flowMonitor.addByteCountTransferred(writeSize);
+                if (writeSize > 0) {
+                    AbstractWriteSocketService.this.lastWriteTimestamp =
+                        haService.getDefaultMessageStore().getSystemClock().now();
+                }
+            });
+        }
+
+        public long getNextTransferFromWhere() {
+            return this.nextTransferFromWhere;
+        }
+
+        // Handle shake method
+        private boolean buildHandshakeBuffer() {
+            final List<EpochEntry> epochEntries = AutoSwitchHAConnection.this.epochCache.getAllEntries();
+            final int lastEpoch = AutoSwitchHAConnection.this.epochCache.lastEpoch();
+            final long maxPhyOffset = AutoSwitchHAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset();
+            this.byteBufferHeader.position(0);
+            this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+            // State
+            this.byteBufferHeader.putInt(currentState.ordinal());
+            // Body size
+            this.byteBufferHeader.putInt(epochEntries.size() * EPOCH_ENTRY_SIZE);
+            // Offset
+            this.byteBufferHeader.putLong(0L);
+            // Epoch
+            this.byteBufferHeader.putInt(lastEpoch);
+            // Additional info(Flush position)
+            this.byteBufferHeader.putLong(maxPhyOffset);
+            this.byteBufferHeader.flip();
+
+            // EpochEntries
+            this.handShakeBuffer.position(0);
+            this.handShakeBuffer.limit(EPOCH_ENTRY_SIZE * epochEntries.size());
+            for (final EpochEntry entry : epochEntries) {
+                if (entry != null) {
+                    this.handShakeBuffer.putInt(entry.getEpoch());
+                    this.handShakeBuffer.putLong(entry.getStartOffset());
+                }
+            }
+            this.handShakeBuffer.flip();
+            LOGGER.info("Master build handshake header: maxEpoch:{}, maxOffset:{}, epochEntries:{}", lastEpoch, maxPhyOffset, epochEntries);
+            return true;
+        }
+
+        private boolean handshakeWithSlave() {
+            // Write Header
+            boolean result = this.haWriter.write(this.socketChannel, this.byteBufferHeader);
+
+            if (!result) {
+                return false;
+            }
+
+            // Write Body
+            return this.haWriter.write(this.socketChannel, this.handShakeBuffer);
+        }
+
+        // Normal transfer method
+        private void buildTransferHeaderBuffer(long nextOffset, int bodySize) {
+            // Build Header
+            this.byteBufferHeader.position(0);
+            this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+            // State
+            this.byteBufferHeader.putInt(currentState.ordinal());
+            // Body size
+            this.byteBufferHeader.putInt(bodySize);
+            // Offset
+            this.byteBufferHeader.putLong(nextOffset);
+            // Epoch
+            this.byteBufferHeader.putInt(AutoSwitchHAConnection.this.currentTransferEpoch);
+            // Additional info(confirm offset)
+            final long confirmOffset = AutoSwitchHAConnection.this.haService.getConfirmOffset();
+            this.byteBufferHeader.putLong(confirmOffset);
+            this.byteBufferHeader.flip();
+            LOGGER.info("Master send msg, state:{}, size:{}, offset:{}, epoch:{}, confirmOffset:{}",
+                currentState, bodySize, nextOffset, AutoSwitchHAConnection.this.currentTransferEpoch, confirmOffset);
+        }
+
+        private void transferToSlave() throws Exception {
+            if (this.lastWriteOver) {
+                long interval =
+                    haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
+
+                if (interval > haService.getDefaultMessageStore().getMessageStoreConfig()
+                    .getHaSendHeartbeatInterval()) {
+
+                    buildTransferHeaderBuffer(this.nextTransferFromWhere, 0);
+
+                    this.lastWriteOver = this.transferData(0);
+                    if (!this.lastWriteOver) {
+                        return;
+                    }
+                }
+            } else {
+                // maxTransferSize == -1 means to continue transfer remaining data.
+                this.lastWriteOver = this.transferData(-1);
+                if (!this.lastWriteOver) {
+                    return;
+                }
+            }
+
+            int size = this.getNextTransferDataSize();
+            if (size > 0) {
+                if (size > haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
+                    size = haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
+                }
+                int canTransferMaxBytes = flowMonitor.canTransferMaxByteNum();
+                if (size > canTransferMaxBytes) {
+                    if (System.currentTimeMillis() - lastPrintTimestamp > 1000) {
+                        LOGGER.warn("Trigger HA flow control, max transfer speed {}KB/s, current speed: {}KB/s",
+                            String.format("%.2f", flowMonitor.maxTransferByteInSecond() / 1024.0),
+                            String.format("%.2f", flowMonitor.getTransferredByteInSecond() / 1024.0));
+                        lastPrintTimestamp = System.currentTimeMillis();
+                    }
+                    size = canTransferMaxBytes;
+                }
+                if (size <= 0) {
+                    this.releaseData();
+                    this.waitForRunning(100);
+                    return;
+                }
+
+                // We must ensure that the transmitted logs are within the same epoch
+                if (this.nextTransferFromWhere + size > AutoSwitchHAConnection.this.currentTransferEpochEndOffset) {
+                    final EpochEntry epochEntry = AutoSwitchHAConnection.this.epochCache.nextEntry(AutoSwitchHAConnection.this.currentTransferEpoch);
+                    if (epochEntry == null) {
+                        LOGGER.error("Can't find a bigger epochEntry than epoch {}", AutoSwitchHAConnection.this.currentTransferEpoch);
+                        waitForRunning(100);
+                        return;
+                    }
+                    size = (int) (AutoSwitchHAConnection.this.currentTransferEpochEndOffset - this.nextTransferFromWhere);
+                    AutoSwitchHAConnection.this.currentTransferEpoch = epochEntry.getEpoch();
+                    AutoSwitchHAConnection.this.currentTransferEpochEndOffset = epochEntry.getEndOffset();
+                }
+
+                this.transferOffset = this.nextTransferFromWhere;
+                this.nextTransferFromWhere += size;
+
+                // Build Header
+                buildTransferHeaderBuffer(this.transferOffset, size);
+
+                this.lastWriteOver = this.transferData(size);
+            } else {
+                haService.getWaitNotifyObject().allWaitForRunning(100);
+            }
+        }
+
+        @Override public void run() {
+            AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.selector.select(1000);
+
+                    switch (currentState) {
+                        case HANDSHAKE:
+                            // Wait until the slave send it handshake msg to master.
+                            if (!isSlaveSendHandshake) {
+                                this.waitForRunning(10);
+                                continue;
+                            }
+
+                            if (this.lastWriteOver) {
+                                if (!buildHandshakeBuffer()) {
+                                    LOGGER.error("AutoSwitchHAConnection build handshake buffer failed");
+                                    this.waitForRunning(5000);
+                                    continue;
+                                }
+                            }
+
+                            this.lastWriteOver = handshakeWithSlave();
+                            if (this.lastWriteOver) {
+                                // change flag to {false} to wait for slave notification
+                                isSlaveSendHandshake = false;
+                            }
+                            break;
+                        case TRANSFER:
+                            if (-1 == slaveRequestOffset) {
+                                this.waitForRunning(10);
+                                continue;
+                            }
+
+                            if (-1 == this.nextTransferFromWhere) {
+                                if (0 == slaveRequestOffset) {
+                                    this.nextTransferFromWhere = haService.getDefaultMessageStore().getCommitLog().getMinOffset();
+                                } else {
+                                    this.nextTransferFromWhere = slaveRequestOffset;
+                                }
+                                // Setup initial transferEpoch
+                                EpochEntry epochEntry = AutoSwitchHAConnection.this.epochCache.findEpochEntryByOffset(this.nextTransferFromWhere);
+                                if (epochEntry == null) {
+                                    LOGGER.error("Failed to find an epochEntry to match slaveRequestOffset {}", this.nextTransferFromWhere);
+                                    waitForRunning(500);
+                                    break;
+                                }
+                                AutoSwitchHAConnection.this.currentTransferEpoch = epochEntry.getEpoch();
+                                AutoSwitchHAConnection.this.currentTransferEpochEndOffset = epochEntry.getEndOffset();
+                                if (epochEntry.getEpoch() == AutoSwitchHAConnection.this.epochCache.lastEpoch()) {
+                                    AutoSwitchHAConnection.this.currentTransferEpochEndOffset = Long.MAX_VALUE;

Review Comment:
   Although it is almost impossible to exceed Long.MAX_VALUE, it would be to change it to - 1 for infinity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #4236: [Summer of Code] Support switch role for ha service

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on code in PR #4236:
URL: https://github.com/apache/rocketmq/pull/4236#discussion_r863946877


##########
common/src/main/java/org/apache/rocketmq/common/EpochEntry.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common;
+
+public class EpochEntry {
+
+    private final int epoch;
+    private final long startOffset;
+    private long endOffset = Long.MAX_VALUE;

Review Comment:
   这里也需要变成-1,另外是不是这里变成-1后,下面就不需要去判断是不是最后一个了



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/EpochFileCache.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.CheckpointFile;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+/**
+ * Cache for epochFile.
+ * Mapping (Epoch -> StartOffset)
+ */
+public class EpochFileCache {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    private final Lock readLock = this.readWriteLock.readLock();
+    private final Lock writeLock = this.readWriteLock.writeLock();
+    private final TreeMap<Integer, EpochEntry> epochMap;
+    private CheckpointFile<EpochEntry> checkpoint;
+
+    public EpochFileCache() {
+        this.epochMap = new TreeMap<>();
+    }
+
+    public EpochFileCache(final String path) {
+        this.epochMap = new TreeMap<>();
+        this.checkpoint = new CheckpointFile<>(path, new EpochEntrySerializer());
+    }
+
+    public boolean initCacheFromFile() {
+        this.writeLock.lock();
+        try {
+            final List<EpochEntry> entries = this.checkpoint.read();
+            initEntries(entries);
+            return true;
+        } catch (final IOException e) {
+            log.error("Error happen when init epoch entries from epochFile", e);
+            return false;
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    public void initCacheFromEntries(final List<EpochEntry> entries) {
+        this.writeLock.lock();
+        try {
+            initEntries(entries);
+            flush();
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    private void initEntries(final List<EpochEntry> entries) {
+        this.epochMap.clear();
+        EpochEntry preEntry = null;
+        for (final EpochEntry entry : entries) {
+            this.epochMap.put(entry.getEpoch(), entry);
+            if (preEntry != null) {
+                preEntry.setEndOffset(entry.getStartOffset());
+            }
+            preEntry = entry;
+        }
+    }
+
+    public boolean appendEntry(final EpochEntry entry) {
+        this.writeLock.lock();
+        try {
+            if (!this.epochMap.isEmpty()) {
+                final EpochEntry lastEntry = this.epochMap.lastEntry().getValue();
+                if (lastEntry.getEpoch() >= entry.getEpoch() || lastEntry.getStartOffset() >= entry.getStartOffset()) {
+                    log.error("The appending entry's lastEpoch or endOffset {} is not bigger than lastEntry {}, append failed", entry, lastEntry);
+                    return false;
+                }
+                lastEntry.setEndOffset(entry.getStartOffset());
+            }
+            this.epochMap.put(entry.getEpoch(), new EpochEntry(entry));
+            flush();
+            return true;
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    /**
+     * Set endOffset for lastEpochEntry.
+     */
+    public void setLastEpochEntryEndOffset(final long endOffset) {
+        this.writeLock.lock();
+        try {
+            if (!this.epochMap.isEmpty()) {
+                final EpochEntry lastEntry = this.epochMap.lastEntry().getValue();
+                if (lastEntry.getStartOffset() <= endOffset) {
+                    lastEntry.setEndOffset(endOffset);
+                }
+            }
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    public EpochEntry firstEntry() {
+        this.readLock.lock();
+        try {
+            if (this.epochMap.isEmpty()) {
+                return null;
+            }
+            return new EpochEntry(this.epochMap.firstEntry().getValue());
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public EpochEntry lastEntry() {
+        this.readLock.lock();
+        try {
+            if (this.epochMap.isEmpty()) {
+                return null;
+            }
+            return new EpochEntry(this.epochMap.lastEntry().getValue());
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public int lastEpoch() {
+        final EpochEntry entry = lastEntry();
+        if (entry != null) {
+            return entry.getEpoch();
+        }
+        return -1;
+    }
+
+    public EpochEntry getEntry(final int epoch) {
+        this.readLock.lock();
+        try {
+            if (this.epochMap.containsKey(epoch)) {
+                final EpochEntry entry = this.epochMap.get(epoch);
+                return new EpochEntry(entry);
+            }
+            return null;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public EpochEntry findEpochEntryByOffset(final long offset) {
+        this.readLock.lock();
+        try {
+            if (!this.epochMap.isEmpty()) {
+                for (Map.Entry<Integer, EpochEntry> entry : this.epochMap.entrySet()) {
+                    if (entry.getValue().getStartOffset() <= offset && entry.getValue().getEndOffset() > offset) {
+                        return new EpochEntry(entry.getValue());
+                    }
+                }
+            }
+            return null;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public EpochEntry nextEntry(final int epoch) {
+        this.readLock.lock();
+        try {
+            final Map.Entry<Integer, EpochEntry> entry = this.epochMap.ceilingEntry(epoch + 1);
+            if (entry != null) {
+                return new EpochEntry(entry.getValue());
+            }
+            return null;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public List<EpochEntry> getAllEntries() {
+        this.readLock.lock();
+        try {
+            final ArrayList<EpochEntry> result = new ArrayList<>(this.epochMap.size());
+            this.epochMap.forEach((key, value) -> result.add(new EpochEntry(value)));
+            return result;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    /**
+     * Find the consistentPoint between compareCache and local.
+     *
+     * @return the consistent offset
+     */
+    public long findConsistentPoint(final EpochFileCache compareCache) {
+        this.readLock.lock();
+        try {
+            long consistentOffset = -1;
+            final Map<Integer, EpochEntry> descendingMap = new TreeMap<>(this.epochMap).descendingMap();
+            final Iterator<Map.Entry<Integer, EpochEntry>> iter = descendingMap.entrySet().iterator();
+            while (iter.hasNext()) {
+                final Map.Entry<Integer, EpochEntry> curLocalEntry = iter.next();
+                final EpochEntry compareEntry = compareCache.getEntry(curLocalEntry.getKey());
+                if (compareEntry != null && compareEntry.getStartOffset() == curLocalEntry.getValue().getStartOffset()) {
+                    consistentOffset = Math.min(curLocalEntry.getValue().getEndOffset(), compareEntry.getEndOffset());
+                    break;
+                }
+            }
+            return consistentOffset;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    /**
+     * Remove epochEntries with epoch >= truncateEpoch.
+     */
+    public void truncateFromEpoch(final int truncateEpoch) {
+        doTruncate((entry) -> entry.getEpoch() >= truncateEpoch);
+    }
+
+    /**
+     * Remove epochEntries with startOffset >= truncateOffset.
+     */
+    public void truncateFromOffset(final long truncateOffset) {
+        doTruncate((entry) -> entry.getStartOffset() >= truncateOffset);
+    }
+
+    /**
+     * Clear all epochEntries
+     */
+    public void clearAll() {
+        doTruncate((entry) -> true);
+    }
+
+    private void doTruncate(final Predicate<EpochEntry> predict) {
+        this.writeLock.lock();
+        try {
+            this.epochMap.entrySet().removeIf(entry -> predict.test(entry.getValue()));
+            final EpochEntry entry = lastEntry();
+            if (entry != null) {
+                entry.setEndOffset(Long.MAX_VALUE);
+            }
+            flush();
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    private void flush() {
+        this.writeLock.lock();
+        try {
+            try {
+                if (this.checkpoint != null) {
+                    final ArrayList<EpochEntry> entries = new ArrayList<>(this.epochMap.values());
+                    this.checkpoint.write(entries);
+                }
+            } catch (final IOException e) {
+                log.error("Error happen when flush epochEntries to epochCheckpointFile", e);
+            }
+        } finally {
+            this.writeLock.unlock();
+        }

Review Comment:
   这里用一个try catch即可,不需要两个



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java:
##########
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.List;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.ha.FlowMonitor;
+import org.apache.rocketmq.store.ha.HAConnection;
+import org.apache.rocketmq.store.ha.HAConnectionState;
+import org.apache.rocketmq.store.ha.io.AbstractHAReader;
+import org.apache.rocketmq.store.ha.io.HAWriter;
+
+public class AutoSwitchHAConnection implements HAConnection {
+    /**
+     * Header protocol in syncing msg from master.
+     * Format: current state + body size + offset + epoch + additionalInfo(confirmOffset).
+     * If the msg is hankeShakeMsg, the body size = EpochEntrySize * EpochEntryNums, the offset is maxOffset in master.
+     */
+    public static final int MSG_HEADER_SIZE = 4 + 4 + 8 + 4 + 8;
+    public static final int EPOCH_ENTRY_SIZE = 12;
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private final AutoSwitchHAService haService;
+    private final SocketChannel socketChannel;
+    private final String clientAddress;
+    private final EpochFileCache epochCache;
+    private final AbstractWriteSocketService writeSocketService;
+    private final ReadSocketService readSocketService;
+    private volatile HAConnectionState currentState = HAConnectionState.HANDSHAKE;
+    private volatile long slaveRequestOffset = -1;
+    private volatile long slaveAckOffset = -1;
+    /**
+     * Whether the slave have already sent a handshake message
+     */
+    private volatile boolean isSlaveSendHandshake = false;
+    private volatile int currentTransferEpoch = -1;
+    private volatile long currentTransferEpochEndOffset = 0;
+    private final FlowMonitor flowMonitor;
+
+    public AutoSwitchHAConnection(AutoSwitchHAService haService, SocketChannel socketChannel,
+        EpochFileCache epochCache) throws IOException {
+        this.haService = haService;
+        this.socketChannel = socketChannel;
+        this.epochCache = epochCache;
+        this.clientAddress = this.socketChannel.socket().getRemoteSocketAddress().toString();
+        this.socketChannel.configureBlocking(false);
+        this.socketChannel.socket().setSoLinger(false, -1);
+        this.socketChannel.socket().setTcpNoDelay(true);
+        this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
+        this.socketChannel.socket().setSendBufferSize(1024 * 64);
+        this.writeSocketService = new WriteSocketService(this.socketChannel);
+        this.readSocketService = new ReadSocketService(this.socketChannel);
+        this.haService.getConnectionCount().incrementAndGet();
+        this.flowMonitor = new FlowMonitor(haService.getDefaultMessageStore().getMessageStoreConfig());
+    }
+
+    @Override public void start() {
+        changeCurrentState(HAConnectionState.HANDSHAKE);
+        this.flowMonitor.start();
+        this.readSocketService.start();
+        this.writeSocketService.start();
+    }
+
+    @Override public void shutdown() {
+        changeCurrentState(HAConnectionState.SHUTDOWN);
+        this.flowMonitor.shutdown(true);
+        this.writeSocketService.shutdown(true);
+        this.readSocketService.shutdown(true);
+        this.close();
+    }
+
+    @Override public void close() {
+        if (this.socketChannel != null) {
+            try {
+                this.socketChannel.close();
+            } catch (final IOException e) {
+                LOGGER.error("", e);
+            }
+        }
+    }
+
+    public void changeCurrentState(HAConnectionState connectionState) {
+        LOGGER.info("change state to {}", connectionState);
+        this.currentState = connectionState;
+    }
+
+    @Override public HAConnectionState getCurrentState() {
+        return currentState;
+    }
+
+    @Override public SocketChannel getSocketChannel() {
+        return socketChannel;
+    }
+
+    @Override public String getClientAddress() {
+        return clientAddress;
+    }
+
+    @Override public long getSlaveAckOffset() {
+        return slaveAckOffset;
+    }
+
+    @Override public long getTransferredByteInSecond() {
+        return flowMonitor.getTransferredByteInSecond();
+    }
+
+    @Override public long getTransferFromWhere() {
+        return this.writeSocketService.getNextTransferFromWhere();
+    }
+
+    private void changeTransferEpochToNext(final EpochEntry entry) {
+        this.currentTransferEpoch = entry.getEpoch();
+        this.currentTransferEpochEndOffset = entry.getEndOffset();
+        if (entry.getEpoch() == this.epochCache.lastEpoch()) {
+            // Use -1 to stand for +∞
+            this.currentTransferEpochEndOffset = -1;
+        }
+    }
+
+    class ReadSocketService extends ServiceThread {
+        private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
+        private final Selector selector;
+        private final SocketChannel socketChannel;
+        private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+        private final AbstractHAReader haReader;
+        private int processPosition = 0;
+        private volatile long lastReadTimestamp = System.currentTimeMillis();
+
+        public ReadSocketService(final SocketChannel socketChannel) throws IOException {
+            this.selector = RemotingUtil.openSelector();
+            this.socketChannel = socketChannel;
+            this.socketChannel.register(this.selector, SelectionKey.OP_READ);
+            this.setDaemon(true);
+            haReader = new HAServerReader();
+            haReader.registerHook(readSize -> {
+                if (readSize > 0) {
+                    ReadSocketService.this.lastReadTimestamp =
+                        haService.getDefaultMessageStore().getSystemClock().now();
+                }
+            });
+        }
+
+        @Override
+        public void run() {
+            LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.selector.select(1000);
+                    boolean ok = this.haReader.read(this.socketChannel, this.byteBufferRead);
+                    if (!ok) {
+                        AutoSwitchHAConnection.LOGGER.error("processReadEvent error");
+                        break;
+                    }
+
+                    long interval = haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
+                    if (interval > haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
+                        LOGGER.warn("ha housekeeping, found this connection[" + clientAddress + "] expired, " + interval);
+                        break;
+                    }
+                } catch (Exception e) {
+                    AutoSwitchHAConnection.LOGGER.error(this.getServiceName() + " service has exception.", e);
+                    break;
+                }
+            }
+
+            this.makeStop();
+
+            changeCurrentState(HAConnectionState.SHUTDOWN);
+
+            writeSocketService.makeStop();
+
+            haService.removeConnection(AutoSwitchHAConnection.this);
+
+            haService.getConnectionCount().decrementAndGet();
+
+            SelectionKey sk = this.socketChannel.keyFor(this.selector);
+            if (sk != null) {
+                sk.cancel();
+            }
+
+            try {
+                this.selector.close();
+                this.socketChannel.close();
+            } catch (IOException e) {
+                AutoSwitchHAConnection.LOGGER.error("", e);
+            }
+
+            AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            return ReadSocketService.class.getSimpleName();
+        }
+
+        class HAServerReader extends AbstractHAReader {
+            @Override
+            protected boolean processReadResult(ByteBuffer byteBufferRead) {
+                while (true) {
+                    int diff = byteBufferRead.position() - ReadSocketService.this.processPosition;
+                    if (diff >= AutoSwitchHAClient.TRANSFER_HEADER_SIZE) {
+                        int readPosition = ReadSocketService.this.processPosition;
+                        HAConnectionState slaveState = HAConnectionState.values()[byteBufferRead.getInt(readPosition)];
+
+                        switch (slaveState) {
+                            case HANDSHAKE:
+                                isSlaveSendHandshake = true;
+                                ReadSocketService.this.processPosition += AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
+                                break;
+                            case TRANSFER:
+                                long slaveMaxOffset = byteBufferRead.getLong(readPosition + 4);
+                                ReadSocketService.this.processPosition += AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
+
+                                slaveAckOffset = slaveMaxOffset;
+                                if (slaveRequestOffset < 0) {
+                                    slaveRequestOffset = slaveMaxOffset;
+                                }
+                                LOGGER.info("slave[" + clientAddress + "] request offset " + slaveMaxOffset);
+                                AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
+                                break;
+                            default:
+                                LOGGER.error("Current state illegal {}", currentState);
+                                break;
+                        }
+
+                        if (!slaveState.equals(currentState)) {
+                            LOGGER.warn("Master change state from {} to {}", currentState, slaveState);
+                            changeCurrentState(slaveState);
+                        }
+                        continue;
+                    }
+
+                    if (!byteBufferRead.hasRemaining()) {
+                        byteBufferRead.position(ReadSocketService.this.processPosition);
+                        byteBufferRead.compact();
+                        ReadSocketService.this.processPosition = 0;
+                    }
+                    break;
+                }
+
+                return true;
+            }
+        }
+    }
+
+    class WriteSocketService extends AbstractWriteSocketService {
+        private SelectMappedBufferResult selectMappedBufferResult;
+
+        public WriteSocketService(final SocketChannel socketChannel) throws IOException {
+            super(socketChannel);
+        }
+
+        @Override
+        protected int getNextTransferDataSize() {
+            SelectMappedBufferResult selectResult = haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
+            if (selectResult == null || selectResult.getSize() <= 0) {
+                return 0;
+            }
+            this.selectMappedBufferResult = selectResult;
+            return selectResult.getSize();
+        }
+
+        @Override
+        protected void releaseData() {
+            this.selectMappedBufferResult.release();
+            this.selectMappedBufferResult = null;
+        }
+
+        @Override
+        protected boolean transferData(int maxTransferSize) {
+
+            if (null != this.selectMappedBufferResult && maxTransferSize >= 0) {
+                this.selectMappedBufferResult.getByteBuffer().limit(maxTransferSize);
+            }
+
+            // Write Header
+            boolean result = haWriter.write(this.socketChannel, this.byteBufferHeader);
+
+            if (!result) {
+                return false;
+            }
+
+            if (null == this.selectMappedBufferResult) {
+                return true;
+            }
+
+            // Write Body
+            result = haWriter.write(this.socketChannel, this.selectMappedBufferResult.getByteBuffer());
+
+            if (result) {
+                releaseData();
+            }
+            return result;
+        }
+
+        @Override
+        protected void onStop() {
+            if (this.selectMappedBufferResult != null) {
+                this.selectMappedBufferResult.release();
+            }
+        }
+
+        @Override
+        public String getServiceName() {
+            if (haService.getDefaultMessageStore().getBrokerConfig().isInBrokerContainer()) {
+                return haService.getDefaultMessageStore().getBrokerConfig().getLoggerIdentifier() + WriteSocketService.class.getSimpleName();
+            }
+            return WriteSocketService.class.getSimpleName();
+        }
+    }
+
+    abstract class AbstractWriteSocketService extends ServiceThread {
+        protected final Selector selector;
+        protected final SocketChannel socketChannel;
+        protected final HAWriter haWriter;
+
+        protected final ByteBuffer byteBufferHeader = ByteBuffer.allocate(MSG_HEADER_SIZE);
+        // Store master epochFileCache: (Epoch + startOffset) * 1000
+        private final ByteBuffer handShakeBuffer = ByteBuffer.allocate(EPOCH_ENTRY_SIZE * 1000);
+        protected long nextTransferFromWhere = -1;
+        protected boolean lastWriteOver = true;
+        protected long lastWriteTimestamp = System.currentTimeMillis();
+        protected long lastPrintTimestamp = System.currentTimeMillis();
+        protected long transferOffset = 0;
+
+        public AbstractWriteSocketService(final SocketChannel socketChannel) throws IOException {
+            this.selector = RemotingUtil.openSelector();
+            this.socketChannel = socketChannel;
+            this.socketChannel.register(this.selector, SelectionKey.OP_WRITE);
+            this.setDaemon(true);
+            haWriter = new HAWriter();
+            haWriter.registerHook(writeSize -> {
+                flowMonitor.addByteCountTransferred(writeSize);
+                if (writeSize > 0) {
+                    AbstractWriteSocketService.this.lastWriteTimestamp =
+                        haService.getDefaultMessageStore().getSystemClock().now();
+                }
+            });
+        }
+
+        public long getNextTransferFromWhere() {
+            return this.nextTransferFromWhere;
+        }
+
+        // Handle shake method
+        private boolean buildHandshakeBuffer() {
+            final List<EpochEntry> epochEntries = AutoSwitchHAConnection.this.epochCache.getAllEntries();
+            final int lastEpoch = AutoSwitchHAConnection.this.epochCache.lastEpoch();
+            final long maxPhyOffset = AutoSwitchHAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset();
+            this.byteBufferHeader.position(0);
+            this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+            // State
+            this.byteBufferHeader.putInt(currentState.ordinal());
+            // Body size
+            this.byteBufferHeader.putInt(epochEntries.size() * EPOCH_ENTRY_SIZE);
+            // Offset
+            this.byteBufferHeader.putLong(maxPhyOffset);
+            // Epoch
+            this.byteBufferHeader.putInt(lastEpoch);
+            // Additional info
+            this.byteBufferHeader.putLong(0L);
+            this.byteBufferHeader.flip();
+
+            // EpochEntries
+            this.handShakeBuffer.position(0);
+            this.handShakeBuffer.limit(EPOCH_ENTRY_SIZE * epochEntries.size());
+            for (final EpochEntry entry : epochEntries) {
+                if (entry != null) {
+                    this.handShakeBuffer.putInt(entry.getEpoch());
+                    this.handShakeBuffer.putLong(entry.getStartOffset());
+                }
+            }
+            this.handShakeBuffer.flip();
+            LOGGER.info("Master build handshake header: maxEpoch:{}, maxOffset:{}, epochEntries:{}", lastEpoch, maxPhyOffset, epochEntries);
+            return true;
+        }
+
+        private boolean handshakeWithSlave() {
+            // Write Header
+            boolean result = this.haWriter.write(this.socketChannel, this.byteBufferHeader);
+
+            if (!result) {
+                return false;
+            }
+
+            // Write Body
+            return this.haWriter.write(this.socketChannel, this.handShakeBuffer);
+        }
+
+        // Normal transfer method
+        private void buildTransferHeaderBuffer(long nextOffset, int bodySize) {
+            // Build Header
+            this.byteBufferHeader.position(0);
+            this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+            // State
+            this.byteBufferHeader.putInt(currentState.ordinal());
+            // Body size
+            this.byteBufferHeader.putInt(bodySize);
+            // Offset
+            this.byteBufferHeader.putLong(nextOffset);
+            // Epoch
+            this.byteBufferHeader.putInt(AutoSwitchHAConnection.this.currentTransferEpoch);
+            // Additional info(confirm offset)
+            final long confirmOffset = AutoSwitchHAConnection.this.haService.getConfirmOffset();
+            this.byteBufferHeader.putLong(confirmOffset);
+            this.byteBufferHeader.flip();
+            LOGGER.info("Master send msg, state:{}, size:{}, offset:{}, epoch:{}, confirmOffset:{}",
+                currentState, bodySize, nextOffset, AutoSwitchHAConnection.this.currentTransferEpoch, confirmOffset);
+        }
+
+        private void transferToSlave() throws Exception {
+            if (this.lastWriteOver) {
+                long interval =
+                    haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
+
+                if (interval > haService.getDefaultMessageStore().getMessageStoreConfig()
+                    .getHaSendHeartbeatInterval()) {
+
+                    buildTransferHeaderBuffer(this.nextTransferFromWhere, 0);
+
+                    this.lastWriteOver = this.transferData(0);
+                    if (!this.lastWriteOver) {
+                        return;
+                    }
+                }
+            } else {
+                // maxTransferSize == -1 means to continue transfer remaining data.
+                this.lastWriteOver = this.transferData(-1);
+                if (!this.lastWriteOver) {
+                    return;
+                }
+            }
+
+            int size = this.getNextTransferDataSize();
+            if (size > 0) {
+                if (size > haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
+                    size = haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
+                }
+                int canTransferMaxBytes = flowMonitor.canTransferMaxByteNum();
+                if (size > canTransferMaxBytes) {
+                    if (System.currentTimeMillis() - lastPrintTimestamp > 1000) {
+                        LOGGER.warn("Trigger HA flow control, max transfer speed {}KB/s, current speed: {}KB/s",
+                            String.format("%.2f", flowMonitor.maxTransferByteInSecond() / 1024.0),
+                            String.format("%.2f", flowMonitor.getTransferredByteInSecond() / 1024.0));
+                        lastPrintTimestamp = System.currentTimeMillis();
+                    }
+                    size = canTransferMaxBytes;
+                }
+                if (size <= 0) {
+                    this.releaseData();
+                    this.waitForRunning(100);
+                    return;
+                }
+
+                // We must ensure that the transmitted logs are within the same epoch
+                // If currentEpochEndOffset == -1, means that currentTransferEpoch = last epoch, so the endOffset = +∞.
+                final long currentEpochEndOffset = AutoSwitchHAConnection.this.currentTransferEpochEndOffset;
+                if (currentEpochEndOffset != -1 && this.nextTransferFromWhere + size > currentEpochEndOffset) {
+                    final EpochEntry epochEntry = AutoSwitchHAConnection.this.epochCache.nextEntry(AutoSwitchHAConnection.this.currentTransferEpoch);
+                    if (epochEntry == null) {
+                        LOGGER.error("Can't find a bigger epochEntry than epoch {}", AutoSwitchHAConnection.this.currentTransferEpoch);
+                        waitForRunning(100);
+                        return;
+                    }
+                    size = (int) (currentEpochEndOffset - this.nextTransferFromWhere);
+                    changeTransferEpochToNext(epochEntry);
+                }

Review Comment:
   这里可能需要注意,有不有可能master和slave连接没有断开,但master的epoch增加了(一般情况下不会出现,但有可能人工运维造成的),此时currentEpochEndOffset仍然是上一个epoch所以是-1,导致slave的数据epoch map出现问题。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin merged pull request #4236: [Summer of Code] Support switch role for ha service

Posted by GitBox <gi...@apache.org>.
RongtongJin merged PR #4236:
URL: https://github.com/apache/rocketmq/pull/4236


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] codecov-commenter commented on pull request #4236: [Summer of Code] Support switch role for ha service

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #4236:
URL: https://github.com/apache/rocketmq/pull/4236#issuecomment-1114694722

   # [Codecov](https://codecov.io/gh/apache/rocketmq/pull/4236?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#4236](https://codecov.io/gh/apache/rocketmq/pull/4236?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (a1eca58) into [5.0.0-beta-dledger-controller](https://codecov.io/gh/apache/rocketmq/commit/d26773a9ab181ce825dc465c900840f48e1ec6b8?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (d26773a) will **increase** coverage by `0.44%`.
   > The diff coverage is `69.00%`.
   
   > :exclamation: Current head a1eca58 differs from pull request most recent head 860c057. Consider uploading reports for the commit 860c057 to get more accurate results
   
   ```diff
   @@                         Coverage Diff                         @@
   ##             5.0.0-beta-dledger-controller    #4236      +/-   ##
   ===================================================================
   + Coverage                            43.27%   43.72%   +0.44%     
   - Complexity                            6137     6250     +113     
   ===================================================================
     Files                                  818      826       +8     
     Lines                                57559    58466     +907     
     Branches                              7852     7984     +132     
   ===================================================================
   + Hits                                 24910    25565     +655     
   - Misses                               29412    29605     +193     
   - Partials                              3237     3296      +59     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/rocketmq/pull/4236?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...in/java/org/apache/rocketmq/common/EpochEntry.java](https://codecov.io/gh/apache/rocketmq/pull/4236/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vRXBvY2hFbnRyeS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...g/apache/rocketmq/common/utils/CheckpointFile.java](https://codecov.io/gh/apache/rocketmq/pull/4236/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vdXRpbHMvQ2hlY2twb2ludEZpbGUuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...e/rocketmq/namesrv/routeinfo/RouteInfoManager.java](https://codecov.io/gh/apache/rocketmq/pull/4236/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-bmFtZXNydi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcm9ja2V0bXEvbmFtZXNydi9yb3V0ZWluZm8vUm91dGVJbmZvTWFuYWdlci5qYXZh) | `69.16% <ø> (+0.09%)` | :arrow_up: |
   | [...ketmq/store/ha/autoswitch/AutoSwitchHAService.java](https://codecov.io/gh/apache/rocketmq/pull/4236/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2hhL2F1dG9zd2l0Y2gvQXV0b1N3aXRjaEhBU2VydmljZS5qYXZh) | `50.00% <50.00%> (ø)` | |
   | [...java/org/apache/rocketmq/store/ha/io/HAWriter.java](https://codecov.io/gh/apache/rocketmq/pull/4236/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2hhL2lvL0hBV3JpdGVyLmphdmE=) | `54.16% <54.16%> (ø)` | |
   | [...cketmq/store/ha/autoswitch/AutoSwitchHAClient.java](https://codecov.io/gh/apache/rocketmq/pull/4236/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2hhL2F1dG9zd2l0Y2gvQXV0b1N3aXRjaEhBQ2xpZW50LmphdmE=) | `71.66% <71.66%> (ø)` | |
   | [...mq/store/ha/autoswitch/AutoSwitchHAConnection.java](https://codecov.io/gh/apache/rocketmq/pull/4236/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2hhL2F1dG9zd2l0Y2gvQXV0b1N3aXRjaEhBQ29ubmVjdGlvbi5qYXZh) | `76.47% <76.47%> (ø)` | |
   | [.../apache/rocketmq/store/ha/io/AbstractHAReader.java](https://codecov.io/gh/apache/rocketmq/pull/4236/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2hhL2lvL0Fic3RyYWN0SEFSZWFkZXIuamF2YQ==) | `79.31% <79.31%> (ø)` | |
   | [...org/apache/rocketmq/store/ha/DefaultHAService.java](https://codecov.io/gh/apache/rocketmq/pull/4236/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2hhL0RlZmF1bHRIQVNlcnZpY2UuamF2YQ==) | `67.46% <81.25%> (+1.43%)` | :arrow_up: |
   | [...e/rocketmq/store/ha/autoswitch/EpochFileCache.java](https://codecov.io/gh/apache/rocketmq/pull/4236/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2hhL2F1dG9zd2l0Y2gvRXBvY2hGaWxlQ2FjaGUuamF2YQ==) | `82.96% <82.96%> (ø)` | |
   | ... and [19 more](https://codecov.io/gh/apache/rocketmq/pull/4236/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/rocketmq/pull/4236?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/rocketmq/pull/4236?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [d26773a...860c057](https://codecov.io/gh/apache/rocketmq/pull/4236?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] coveralls commented on pull request #4236: [Summer of Code] Support switch role for ha service

Posted by GitBox <gi...@apache.org>.
coveralls commented on PR #4236:
URL: https://github.com/apache/rocketmq/pull/4236#issuecomment-1114694455

   
   [![Coverage Status](https://coveralls.io/builds/48748948/badge)](https://coveralls.io/builds/48748948)
   
   Coverage increased (+0.5%) to 47.84% when pulling **860c0577e06b889be389723a9462221a7eba76ad on hzh0425:feature/auto-switch-ha** into **d26773a9ab181ce825dc465c900840f48e1ec6b8 on apache:5.0.0-beta-dledger-controller**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #4236: [Summer of Code] Support switch role for ha service

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on code in PR #4236:
URL: https://github.com/apache/rocketmq/pull/4236#discussion_r866603589


##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.ha.DefaultHAService;
+import org.apache.rocketmq.store.ha.GroupTransferService;
+import org.apache.rocketmq.store.ha.HAConnection;
+import org.apache.rocketmq.store.ha.HAConnectionStateNotificationService;
+
+/**
+ * SwitchAble ha service, support switch role to master or slave.
+ */
+public class AutoSwitchHAService extends DefaultHAService {
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private EpochFileCache epochCache;
+    private AutoSwitchHAClient haClient;
+
+    public AutoSwitchHAService() {
+    }
+
+    @Override
+    public void init(final DefaultMessageStore defaultMessageStore) throws IOException {
+        this.epochCache = new EpochFileCache(defaultMessageStore.getMessageStoreConfig().getStorePathEpochFile());
+        this.epochCache.initCacheFromFile();
+        this.defaultMessageStore = defaultMessageStore;
+        this.acceptSocketService =
+            new AutoSwitchAcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
+        this.groupTransferService = new GroupTransferService(this, defaultMessageStore);
+        if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
+            this.haClient = new AutoSwitchHAClient(this, defaultMessageStore, this.epochCache);
+        }
+        this.haConnectionStateNotificationService = new HAConnectionStateNotificationService(this, defaultMessageStore);
+    }
+
+    @Override
+    public boolean changeToMaster(int masterEpoch) {
+        final int lastEpoch = this.epochCache.lastEpoch();
+        if (masterEpoch <= lastEpoch) {
+            return false;
+        }
+        destroyConnections();
+        // Stop ha client if needed
+        if (this.haClient != null) {
+            this.haClient.shutdown();
+        }
+
+        // Truncate dirty file
+        final long truncateOffset = truncateInvalidMsg();
+        if (truncateOffset >= 0) {
+            this.epochCache.truncateSuffixByOffset(truncateOffset);
+        }
+
+        // Append new epoch to epochFile
+        final EpochEntry newEpochEntry = new EpochEntry(masterEpoch, this.defaultMessageStore.getMaxPhyOffset());
+        if (this.epochCache.lastEpoch() >= masterEpoch) {
+            this.epochCache.truncateSuffixByEpoch(masterEpoch);
+        }
+        this.epochCache.appendEntry(newEpochEntry);
+
+        this.defaultMessageStore.recoverTopicQueueTable();
+        LOGGER.info("Change ha to master success, newMasterEpoch:{}, startOffset:{}", masterEpoch, newEpochEntry.getStartOffset());
+        return true;
+    }
+
+    @Override
+    public boolean changeToSlave(String newMasterAddr, String newHaMasterAddr, int newMasterEpoch) {
+        final int lastEpoch = this.epochCache.lastEpoch();
+        if (newMasterEpoch <= lastEpoch) {
+            return false;
+        }
+        try {
+            destroyConnections();
+            if (this.haClient == null) {
+                this.haClient = new AutoSwitchHAClient(this, defaultMessageStore, this.epochCache);
+            } else {
+                this.haClient.reOpen();
+            }
+            this.haClient.updateHaMasterAddress(newHaMasterAddr);
+            this.haClient.updateMasterAddress(newMasterAddr);
+            this.haClient.start();
+            LOGGER.info("Change ha to slave success, newMasterAddress:{}, newMasterEpoch:{}", newMasterAddr, newMasterEpoch);
+            return true;
+        } catch (final Exception e) {
+            LOGGER.error("Error happen when change ha to slave", e);
+            return false;
+        }
+    }
+
+    @Override
+    public Set<String> checkSyncStateSetChanged() {
+        final HashSet<String> newSyncStateSet = new HashSet<>(this.connectionList.size());
+        final long masterOffset = this.defaultMessageStore.getMaxPhyOffset();
+        for (HAConnection connection : this.connectionList) {
+            if (isInSyncSlave(masterOffset, connection)) {
+                newSyncStateSet.add(connection.getClientAddress());
+            }
+        }
+        return newSyncStateSet;
+    }
+
+    public void truncateEpochFilePrefix(final long offset) {
+        this.epochCache.truncatePrefixByOffset(offset);
+    }
+
+    public void truncateEpochFileSuffix(final long offset) {
+        this.epochCache.truncateSuffixByOffset(offset);
+    }
+
+    /**
+     * Try to truncate incomplete msg transferred from master.
+     */
+    public long truncateInvalidMsg() {
+        long dispatchBehind = this.defaultMessageStore.dispatchBehindBytes();
+        if (dispatchBehind <= 0) {
+            LOGGER.info("Dispatch complete, skip truncate");
+            return -1;
+        }
+
+        long reputFromOffset = this.defaultMessageStore.getMaxPhyOffset() - dispatchBehind;
+
+        boolean doNext = true;
+        while (reputFromOffset < this.defaultMessageStore.getMaxPhyOffset() && doNext) {
+            SelectMappedBufferResult result = this.defaultMessageStore.getCommitLog().getData(reputFromOffset);
+            if (result == null) {
+                break;
+            }
+
+            try {
+                reputFromOffset = result.getStartOffset();
+
+                int readSize = 0;
+                while (readSize < result.getSize()) {
+                    DispatchRequest dispatchRequest = this.defaultMessageStore.getCommitLog().checkMessageAndReturnSize(result.getByteBuffer(), false, false);
+
+                    int size = dispatchRequest.getMsgSize();
+
+                    if (dispatchRequest.isSuccess()) {
+                        if (size > 0) {
+                            reputFromOffset += size;
+                            readSize += size;
+                        } else {
+                            reputFromOffset = this.defaultMessageStore.getCommitLog().rollNextFile(reputFromOffset);
+                            break;
+                        }
+                    } else {
+                        doNext = false;
+                        break;
+                    }
+                }
+            } finally {
+                result.release();
+            }
+        }
+
+        LOGGER.info("AutoRecoverHAClient truncate commitLog to {}", reputFromOffset);
+        this.defaultMessageStore.truncateDirtyFiles(reputFromOffset);
+        return reputFromOffset;
+    }
+
+    /**
+     * Get confirm offset (min slaveAckOffset of all syncStateSet)
+     */
+    public long getConfirmOffset() {
+        long confirmOffset = this.defaultMessageStore.getMaxPhyOffset();
+        for (HAConnection connection : this.connectionList) {
+            confirmOffset = Math.min(confirmOffset, connection.getSlaveAckOffset());
+        }
+        return confirmOffset;
+    }

Review Comment:
   这个后面还得改,得筛选出在SyncStateSet中的connnection然后比较位点



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java:
##########
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.ha.FlowMonitor;
+import org.apache.rocketmq.store.ha.HAClient;
+import org.apache.rocketmq.store.ha.HAConnectionState;
+import org.apache.rocketmq.store.ha.io.AbstractHAReader;
+import org.apache.rocketmq.store.ha.io.HAWriter;
+
+public class AutoSwitchHAClient extends ServiceThread implements HAClient {
+
+
+    /**
+     * Transfer header buffer size. Schema: state ordinal + additional info(maxOffset or flag)
+     * If in handshake state, we reuse additional info as the flag -- isSyncFromLastFile.
+     */
+    public static final int TRANSFER_HEADER_SIZE = 4 + 8;
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
+    private final AtomicReference<String> masterHaAddress = new AtomicReference<>();
+    private final AtomicReference<String> masterAddress = new AtomicReference<>();
+    private final ByteBuffer transferHeaderBuffer = ByteBuffer.allocate(TRANSFER_HEADER_SIZE);
+    private final AutoSwitchHAService haService;
+    private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+    private final DefaultMessageStore messageStore;
+    private final EpochFileCache epochCache;
+
+    private SocketChannel socketChannel;
+    private Selector selector;
+    private AbstractHAReader haReader;
+    private HAWriter haWriter;
+    private FlowMonitor flowMonitor;
+    /**
+     * last time that slave reads date from master.
+     */
+    private long lastReadTimestamp;
+    /**
+     * last time that slave reports offset to master.
+     */
+    private long lastWriteTimestamp;
+
+    private long currentReportedOffset;
+    private int processPosition;
+    private volatile HAConnectionState currentState;
+    /**
+     * Current epoch
+     */
+    private volatile long currentReceivedEpoch;
+
+    /**
+     * Confirm offset = min(localMaxOffset, master confirm offset).
+     */
+    private volatile long confirmOffset;
+
+    public static final int SYNC_FROM_LAST_FILE = -1;
+
+    public static final int SYNC_FROM_FIRST_FILE = -2;
+
+    public AutoSwitchHAClient(AutoSwitchHAService haService, DefaultMessageStore defaultMessageStore,
+        EpochFileCache epochCache) throws IOException {
+        this.haService = haService;
+        this.messageStore = defaultMessageStore;
+        this.epochCache = epochCache;
+        init();
+    }
+
+    public void init() throws IOException {
+        this.selector = RemotingUtil.openSelector();
+        this.flowMonitor = new FlowMonitor(this.messageStore.getMessageStoreConfig());
+        this.haReader = new HAClientReader();
+        haReader.registerHook(readSize -> {
+            if (readSize > 0) {
+                AutoSwitchHAClient.this.flowMonitor.addByteCountTransferred(readSize);
+                lastReadTimestamp = System.currentTimeMillis();
+            }
+        });
+        this.haWriter = new HAWriter();
+        haWriter.registerHook(writeSize -> {
+            if (writeSize > 0) {
+                lastWriteTimestamp = System.currentTimeMillis();
+            }
+        });
+        changeCurrentState(HAConnectionState.READY);
+        this.currentReceivedEpoch = -1;
+        this.currentReportedOffset = 0;
+        this.processPosition = 0;
+        this.confirmOffset = -1;
+        this.lastReadTimestamp = System.currentTimeMillis();
+        this.lastWriteTimestamp = System.currentTimeMillis();
+    }
+
+    public void reOpen() throws IOException {
+        shutdown();
+        init();
+    }
+
+    @Override public String getServiceName() {
+        return AutoSwitchHAClient.class.getSimpleName();
+    }
+
+    @Override public void updateMasterAddress(String newAddress) {
+        String currentAddr = this.masterAddress.get();
+        if (masterAddress.compareAndSet(currentAddr, newAddress)) {
+            LOGGER.info("update master address, OLD: " + currentAddr + " NEW: " + newAddress);
+        }
+    }
+
+    @Override public void updateHaMasterAddress(String newAddress) {
+        String currentAddr = this.masterHaAddress.get();
+        if (masterHaAddress.compareAndSet(currentAddr, newAddress)) {
+            LOGGER.info("update master ha address, OLD: " + currentAddr + " NEW: " + newAddress);
+        }
+    }
+
+    @Override public String getMasterAddress() {
+        return this.masterAddress.get();
+    }
+
+    @Override public String getHaMasterAddress() {
+        return this.masterHaAddress.get();
+    }
+
+    @Override public long getLastReadTimestamp() {
+        return this.lastReadTimestamp;
+    }
+
+    @Override public long getLastWriteTimestamp() {
+        return this.lastWriteTimestamp;
+    }
+
+    @Override public HAConnectionState getCurrentState() {
+        return this.currentState;
+    }
+
+    @Override public void changeCurrentState(HAConnectionState haConnectionState) {
+        LOGGER.info("change state to {}", haConnectionState);
+        this.currentState = haConnectionState;
+    }
+
+    public void closeMasterAndWait() {
+        this.closeMaster();
+        this.waitForRunning(1000 * 5);
+    }
+
+    @Override public void closeMaster() {
+        if (null != this.socketChannel) {
+            try {
+                SelectionKey sk = this.socketChannel.keyFor(this.selector);
+                if (sk != null) {
+                    sk.cancel();
+                }
+
+                this.socketChannel.close();
+                this.socketChannel = null;
+
+                LOGGER.info("AutoSwitchHAClient close connection with master {}", this.masterHaAddress.get());
+                this.changeCurrentState(HAConnectionState.READY);
+            } catch (IOException e) {
+                LOGGER.warn("CloseMaster exception. ", e);
+            }
+
+            this.lastReadTimestamp = 0;
+            this.processPosition = 0;
+
+            this.byteBufferRead.position(0);
+            this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
+        }
+    }
+
+    @Override public long getTransferredByteInSecond() {
+        return this.flowMonitor.getTransferredByteInSecond();
+    }
+
+    @Override public void shutdown() {
+        changeCurrentState(HAConnectionState.SHUTDOWN);
+        // Shutdown thread firstly
+        this.flowMonitor.shutdown();
+        super.shutdown();
+
+        closeMaster();
+        try {
+            this.selector.close();
+        } catch (IOException e) {
+            LOGGER.warn("Close the selector of AutoSwitchHAClient error, ", e);
+        }
+    }
+
+    private boolean isTimeToReportOffset() {
+        long interval = this.messageStore.now() - this.lastWriteTimestamp;
+        return interval > this.messageStore.getMessageStoreConfig().getHaSendHeartbeatInterval();
+    }
+
+    private boolean sendHandshakeHeader() {
+        this.transferHeaderBuffer.position(0);
+        this.transferHeaderBuffer.limit(TRANSFER_HEADER_SIZE);
+        this.transferHeaderBuffer.putInt(HAConnectionState.HANDSHAKE.ordinal());
+        if (this.haService.getDefaultMessageStore().getMessageStoreConfig().isSyncFromLastFile()) {
+            this.transferHeaderBuffer.putLong(SYNC_FROM_LAST_FILE);
+        } else {
+            this.transferHeaderBuffer.putLong(SYNC_FROM_FIRST_FILE);
+        }
+        this.transferHeaderBuffer.flip();
+        return this.haWriter.write(this.socketChannel, this.transferHeaderBuffer);
+    }
+
+    private void handshakeWithMaster() throws IOException {
+        sendHandshakeHeader();
+        boolean result = this.sendHandshakeHeader();
+        if (!result) {
+            closeMasterAndWait();
+        }
+
+        this.selector.select(5000);
+
+        result = this.haReader.read(this.socketChannel, this.byteBufferRead);
+        if (!result) {
+            closeMasterAndWait();
+            return;
+        }
+    }
+
+    private boolean reportSlaveOffset(final long offsetToReport) {
+        this.transferHeaderBuffer.position(0);
+        this.transferHeaderBuffer.limit(TRANSFER_HEADER_SIZE);
+        this.transferHeaderBuffer.putInt(this.currentState.ordinal());
+        this.transferHeaderBuffer.putLong(offsetToReport);
+        this.transferHeaderBuffer.flip();
+        return this.haWriter.write(this.socketChannel, this.transferHeaderBuffer);
+    }
+
+    private boolean reportSlaveMaxOffset() {
+        boolean result = true;
+        final long maxPhyOffset = this.messageStore.getMaxPhyOffset();
+        if (maxPhyOffset > this.currentReportedOffset) {
+            this.currentReportedOffset = maxPhyOffset;
+            result = reportSlaveOffset(this.currentReportedOffset);
+        }
+        return result;
+    }
+
+    public boolean connectMaster() throws ClosedChannelException {
+        if (null == this.socketChannel) {
+            String addr = this.masterHaAddress.get();
+            if (addr != null) {
+                SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
+                this.socketChannel = RemotingUtil.connect(socketAddress);
+                if (this.socketChannel != null) {
+                    this.socketChannel.register(this.selector, SelectionKey.OP_READ);
+                    LOGGER.info("AutoSwitchHAClient connect to master {}", addr);
+                    changeCurrentState(HAConnectionState.HANDSHAKE);
+                }
+            }
+            this.currentReportedOffset = this.messageStore.getMaxPhyOffset();
+            this.lastReadTimestamp = System.currentTimeMillis();
+        }
+        return this.socketChannel != null;
+    }
+
+    private boolean transferFromMaster() throws IOException {
+        boolean result;
+        if (isTimeToReportOffset()) {
+            LOGGER.info("Slave report current offset {}", this.currentReportedOffset);
+            result = reportSlaveOffset(this.currentReportedOffset);
+            if (!result) {
+                return false;
+            }
+        }
+
+        this.selector.select(1000);
+
+        result = this.haReader.read(this.socketChannel, this.byteBufferRead);
+        if (!result) {
+            return false;
+        }
+
+        return this.reportSlaveMaxOffset();
+    }
+
+    @Override public void run() {
+        LOGGER.info(this.getServiceName() + " service started");
+
+        this.flowMonitor.start();
+        while (!this.isStopped()) {
+            try {
+                switch (this.currentState) {
+                    case SHUTDOWN:
+                        return;
+                    case READY:
+                        // Truncate invalid msg first
+                        final long truncateOffset = AutoSwitchHAClient.this.haService.truncateInvalidMsg();
+                        if (truncateOffset >= 0) {
+                            AutoSwitchHAClient.this.epochCache.truncateSuffixByOffset(truncateOffset);
+                        }
+                        if (!connectMaster()) {
+                            LOGGER.warn("AutoSwitchHAClient connect to master {} failed", this.masterHaAddress.get());
+                            waitForRunning(1000 * 5);
+                        }
+                        continue;
+                    case HANDSHAKE:
+                        handshakeWithMaster();
+                        continue;
+                    case TRANSFER:
+                        if (!transferFromMaster()) {
+                            closeMasterAndWait();
+                            continue;
+                        }
+                        break;
+                    case SUSPEND:
+                    default:
+                        waitForRunning(1000 * 5);
+                        continue;
+                }
+                long interval = this.messageStore.now() - this.lastReadTimestamp;
+                if (interval > this.messageStore.getMessageStoreConfig().getHaHousekeepingInterval()) {
+                    LOGGER.warn("AutoSwitchHAClient, housekeeping, found this connection[" + this.masterHaAddress
+                        + "] expired, " + interval);
+                    closeMaster();
+                    LOGGER.warn("AutoSwitchHAClient, master not response some time, so close connection");
+                }
+            } catch (Exception e) {
+                LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                closeMasterAndWait();
+            }
+        }
+
+    }
+
+    /**
+     * Compare the master and slave's epoch file, find consistent point, do truncate.
+     */
+    private boolean doTruncate(List<EpochEntry> masterEpochEntries, long masterEndOffset) {
+        if (this.epochCache.getEntrySize() == 0) {
+            // If epochMap is empty, means the broker is a new replicas
+            LOGGER.info("Slave local epochCache is empty, skip truncate log");
+            changeCurrentState(HAConnectionState.TRANSFER);
+            this.currentReportedOffset = 0;
+        } else {
+            final EpochFileCache masterEpochCache = new EpochFileCache();
+            masterEpochCache.initCacheFromEntries(masterEpochEntries);
+            masterEpochCache.setLastEpochEntryEndOffset(masterEndOffset);
+            final List<EpochEntry> localEpochEntries = this.epochCache.getAllEntries();
+            final EpochFileCache localEpochCache = new EpochFileCache();
+            localEpochCache.initCacheFromEntries(localEpochEntries);
+            localEpochCache.setLastEpochEntryEndOffset(this.messageStore.getMaxPhyOffset());
+
+            final long truncateOffset = localEpochCache.findConsistentPoint(masterEpochCache);
+            if (truncateOffset < 0) {
+                // If truncateOffset < 0, means we can't find a consistent point
+                LOGGER.error("Failed to find a consistent point between masterEpoch:{} and slaveEpoch:{}", masterEpochEntries, localEpochEntries);
+                return false;
+            }
+            if (!this.messageStore.truncateFiles(truncateOffset)) {
+                LOGGER.error("Failed to truncate slave log to {}", truncateOffset);
+                return false;
+            }
+            this.epochCache.truncateSuffixByOffset(truncateOffset);
+            LOGGER.info("Truncate slave log to {} success, change to transfer state", truncateOffset);
+            changeCurrentState(HAConnectionState.TRANSFER);
+            this.currentReportedOffset = truncateOffset;
+        }
+        if (!reportSlaveMaxOffset()) {
+            LOGGER.error("AutoSwitchHAClient report max offset to master failed");
+            return false;
+        }
+        return true;
+    }
+
+    class HAClientReader extends AbstractHAReader {
+
+        @Override
+        protected boolean processReadResult(ByteBuffer byteBufferRead) {
+            int readSocketPos = byteBufferRead.position();
+
+            while (true) {
+                int diff = byteBufferRead.position() - AutoSwitchHAClient.this.processPosition;
+                if (diff >= AutoSwitchHAConnection.MSG_HEADER_SIZE) {
+                    int processPosition =  AutoSwitchHAClient.this.processPosition;
+                    int masterState = byteBufferRead.getInt(processPosition);
+                    int bodySize = byteBufferRead.getInt(processPosition + 4);
+                    long masterOffset = byteBufferRead.getLong(processPosition + 4 + 4);
+                    int masterEpoch = byteBufferRead.getInt(processPosition + 4 + 4 + 8);
+                    long masterEpochStartOffset = byteBufferRead.getLong(processPosition + 4 + 4 + 8 + 4);
+                    long confirmOffset = byteBufferRead.getLong(processPosition + 4 + 4 + 8 + 4 + 8);
+
+                    if (masterState != AutoSwitchHAClient.this.currentState.ordinal()) {
+                        AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize;
+                        AutoSwitchHAClient.this.waitForRunning(1);
+                        LOGGER.error("State not matched, masterState:{}, slaveState:{}, bodySize:{}, offset:{}, masterEpoch:{}, masterEpochStartOffset:{}, confirmOffset:{}",
+                            masterState, AutoSwitchHAClient.this.currentState, bodySize, masterOffset, masterEpoch, masterEpochStartOffset, confirmOffset);
+                        return true;
+                    }
+                    LOGGER.info("Receive master msg, masterState:{}, bodySize:{}, offset:{}, masterEpoch:{}, masterEpochStartOffset:{}, confirmOffset:{}",
+                        HAConnectionState.values()[masterState], bodySize, masterOffset, masterEpoch, masterEpochStartOffset, confirmOffset);
+
+                    if (diff >= (AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize)) {
+                        switch (AutoSwitchHAClient.this.currentState) {
+                            case HANDSHAKE:
+                                AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.MSG_HEADER_SIZE;
+                                // Truncate log
+                                int entrySize = AutoSwitchHAConnection.EPOCH_ENTRY_SIZE;
+                                final int entryNums = bodySize / entrySize;
+                                final ArrayList<EpochEntry> epochEntries = new ArrayList<>(entryNums);
+                                for (int i = 0; i < entryNums; i++) {
+                                    int epoch = byteBufferRead.getInt(AutoSwitchHAClient.this.processPosition + i * entrySize);
+                                    long startOffset = byteBufferRead.getLong(AutoSwitchHAClient.this.processPosition + i * entrySize + 4);
+                                    epochEntries.add(new EpochEntry(epoch, startOffset));
+                                }
+                                byteBufferRead.position(readSocketPos);
+                                AutoSwitchHAClient.this.processPosition += bodySize;
+                                LOGGER.info("Receive handshake, masterMaxPosition {}, masterEpochEntries:{}, try truncate log", masterOffset, epochEntries);
+                                if (!doTruncate(epochEntries, masterOffset)) {
+                                    waitForRunning(1000 * 2);
+                                    LOGGER.error("AutoSwitchHAClient truncate log failed in handshake state");
+                                    return false;
+                                }
+                                break;
+                            case TRANSFER:
+                                byte[] bodyData = new byte[bodySize];
+                                byteBufferRead.position(AutoSwitchHAClient.this.processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE);
+                                byteBufferRead.get(bodyData);
+                                byteBufferRead.position(readSocketPos);
+                                AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize;
+
+                                long slavePhyOffset = AutoSwitchHAClient.this.messageStore.getMaxPhyOffset();
+                                if (slavePhyOffset != 0) {
+                                    if (slavePhyOffset != masterOffset) {
+                                        LOGGER.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
+                                            + slavePhyOffset + " MASTER: " + masterOffset);
+                                        return false;
+                                    }
+                                }
+
+                                // If epoch changed
+                                if (masterEpoch != AutoSwitchHAClient.this.currentReceivedEpoch) {
+                                    AutoSwitchHAClient.this.currentReceivedEpoch = masterEpoch;
+                                    AutoSwitchHAClient.this.epochCache.appendEntry(new EpochEntry(masterEpoch, masterEpochStartOffset));
+                                }
+                                AutoSwitchHAClient.this.confirmOffset = Math.min(confirmOffset, messageStore.getMaxPhyOffset());
+
+                                if (bodySize > 0) {
+                                    final DefaultMessageStore messageStore = AutoSwitchHAClient.this.messageStore;
+                                    if (messageStore.appendToCommitLog(masterOffset, bodyData, 0, bodyData.length)) {
+                                        LOGGER.info("Slave append master log success, from {}, size {}, epoch:{}", masterOffset, bodySize, masterEpoch);
+                                    }

Review Comment:
   这里不建议打日志,这个操作太频繁了,会产生大量垃圾日志



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org