You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/05/04 00:48:42 UTC

[GitHub] [rocketmq] hzh0425 commented on a diff in pull request #4236: [Summer of Code] Support switch role for ha service

hzh0425 commented on code in PR #4236:
URL: https://github.com/apache/rocketmq/pull/4236#discussion_r864394714


##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java:
##########
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.List;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.ha.FlowMonitor;
+import org.apache.rocketmq.store.ha.HAConnection;
+import org.apache.rocketmq.store.ha.HAConnectionState;
+import org.apache.rocketmq.store.ha.io.AbstractHAReader;
+import org.apache.rocketmq.store.ha.io.HAWriter;
+
+public class AutoSwitchHAConnection implements HAConnection {
+    /**
+     * Header protocol in syncing msg from master.
+     * Format: current state + body size + offset + epoch + additionalInfo(confirmOffset).
+     * If the msg is hankeShakeMsg, the body size = EpochEntrySize * EpochEntryNums, the offset is maxOffset in master.
+     */
+    public static final int MSG_HEADER_SIZE = 4 + 4 + 8 + 4 + 8;
+    public static final int EPOCH_ENTRY_SIZE = 12;
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private final AutoSwitchHAService haService;
+    private final SocketChannel socketChannel;
+    private final String clientAddress;
+    private final EpochFileCache epochCache;
+    private final AbstractWriteSocketService writeSocketService;
+    private final ReadSocketService readSocketService;
+    private volatile HAConnectionState currentState = HAConnectionState.HANDSHAKE;
+    private volatile long slaveRequestOffset = -1;
+    private volatile long slaveAckOffset = -1;
+    /**
+     * Whether the slave have already sent a handshake message
+     */
+    private volatile boolean isSlaveSendHandshake = false;
+    private volatile int currentTransferEpoch = -1;
+    private volatile long currentTransferEpochEndOffset = 0;
+    private final FlowMonitor flowMonitor;
+
+    public AutoSwitchHAConnection(AutoSwitchHAService haService, SocketChannel socketChannel,
+        EpochFileCache epochCache) throws IOException {
+        this.haService = haService;
+        this.socketChannel = socketChannel;
+        this.epochCache = epochCache;
+        this.clientAddress = this.socketChannel.socket().getRemoteSocketAddress().toString();
+        this.socketChannel.configureBlocking(false);
+        this.socketChannel.socket().setSoLinger(false, -1);
+        this.socketChannel.socket().setTcpNoDelay(true);
+        this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
+        this.socketChannel.socket().setSendBufferSize(1024 * 64);
+        this.writeSocketService = new WriteSocketService(this.socketChannel);
+        this.readSocketService = new ReadSocketService(this.socketChannel);
+        this.haService.getConnectionCount().incrementAndGet();
+        this.flowMonitor = new FlowMonitor(haService.getDefaultMessageStore().getMessageStoreConfig());
+    }
+
+    @Override public void start() {
+        changeCurrentState(HAConnectionState.HANDSHAKE);
+        this.flowMonitor.start();
+        this.readSocketService.start();
+        this.writeSocketService.start();
+    }
+
+    @Override public void shutdown() {
+        changeCurrentState(HAConnectionState.SHUTDOWN);
+        this.flowMonitor.shutdown(true);
+        this.writeSocketService.shutdown(true);
+        this.readSocketService.shutdown(true);
+        this.close();
+    }
+
+    @Override public void close() {
+        if (this.socketChannel != null) {
+            try {
+                this.socketChannel.close();
+            } catch (final IOException e) {
+                LOGGER.error("", e);
+            }
+        }
+    }
+
+    public void changeCurrentState(HAConnectionState connectionState) {
+        LOGGER.info("change state to {}", connectionState);
+        this.currentState = connectionState;
+    }
+
+    @Override public HAConnectionState getCurrentState() {
+        return currentState;
+    }
+
+    @Override public SocketChannel getSocketChannel() {
+        return socketChannel;
+    }
+
+    @Override public String getClientAddress() {
+        return clientAddress;
+    }
+
+    @Override public long getSlaveAckOffset() {
+        return slaveAckOffset;
+    }
+
+    @Override public long getTransferredByteInSecond() {
+        return flowMonitor.getTransferredByteInSecond();
+    }
+
+    @Override public long getTransferFromWhere() {
+        return this.writeSocketService.getNextTransferFromWhere();
+    }
+
+    private void changeTransferEpochToNext(final EpochEntry entry) {
+        this.currentTransferEpoch = entry.getEpoch();
+        this.currentTransferEpochEndOffset = entry.getEndOffset();
+        if (entry.getEpoch() == this.epochCache.lastEpoch()) {
+            // Use -1 to stand for +∞
+            this.currentTransferEpochEndOffset = -1;
+        }
+    }
+
+    class ReadSocketService extends ServiceThread {
+        private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
+        private final Selector selector;
+        private final SocketChannel socketChannel;
+        private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+        private final AbstractHAReader haReader;
+        private int processPosition = 0;
+        private volatile long lastReadTimestamp = System.currentTimeMillis();
+
+        public ReadSocketService(final SocketChannel socketChannel) throws IOException {
+            this.selector = RemotingUtil.openSelector();
+            this.socketChannel = socketChannel;
+            this.socketChannel.register(this.selector, SelectionKey.OP_READ);
+            this.setDaemon(true);
+            haReader = new HAServerReader();
+            haReader.registerHook(readSize -> {
+                if (readSize > 0) {
+                    ReadSocketService.this.lastReadTimestamp =
+                        haService.getDefaultMessageStore().getSystemClock().now();
+                }
+            });
+        }
+
+        @Override
+        public void run() {
+            LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.selector.select(1000);
+                    boolean ok = this.haReader.read(this.socketChannel, this.byteBufferRead);
+                    if (!ok) {
+                        AutoSwitchHAConnection.LOGGER.error("processReadEvent error");
+                        break;
+                    }
+
+                    long interval = haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
+                    if (interval > haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
+                        LOGGER.warn("ha housekeeping, found this connection[" + clientAddress + "] expired, " + interval);
+                        break;
+                    }
+                } catch (Exception e) {
+                    AutoSwitchHAConnection.LOGGER.error(this.getServiceName() + " service has exception.", e);
+                    break;
+                }
+            }
+
+            this.makeStop();
+
+            changeCurrentState(HAConnectionState.SHUTDOWN);
+
+            writeSocketService.makeStop();
+
+            haService.removeConnection(AutoSwitchHAConnection.this);
+
+            haService.getConnectionCount().decrementAndGet();
+
+            SelectionKey sk = this.socketChannel.keyFor(this.selector);
+            if (sk != null) {
+                sk.cancel();
+            }
+
+            try {
+                this.selector.close();
+                this.socketChannel.close();
+            } catch (IOException e) {
+                AutoSwitchHAConnection.LOGGER.error("", e);
+            }
+
+            AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            return ReadSocketService.class.getSimpleName();
+        }
+
+        class HAServerReader extends AbstractHAReader {
+            @Override
+            protected boolean processReadResult(ByteBuffer byteBufferRead) {
+                while (true) {
+                    int diff = byteBufferRead.position() - ReadSocketService.this.processPosition;
+                    if (diff >= AutoSwitchHAClient.TRANSFER_HEADER_SIZE) {
+                        int readPosition = ReadSocketService.this.processPosition;
+                        HAConnectionState slaveState = HAConnectionState.values()[byteBufferRead.getInt(readPosition)];
+
+                        switch (slaveState) {
+                            case HANDSHAKE:
+                                isSlaveSendHandshake = true;
+                                ReadSocketService.this.processPosition += AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
+                                break;
+                            case TRANSFER:
+                                long slaveMaxOffset = byteBufferRead.getLong(readPosition + 4);
+                                ReadSocketService.this.processPosition += AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
+
+                                slaveAckOffset = slaveMaxOffset;
+                                if (slaveRequestOffset < 0) {
+                                    slaveRequestOffset = slaveMaxOffset;
+                                }
+                                LOGGER.info("slave[" + clientAddress + "] request offset " + slaveMaxOffset);
+                                AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
+                                break;
+                            default:
+                                LOGGER.error("Current state illegal {}", currentState);
+                                break;
+                        }
+
+                        if (!slaveState.equals(currentState)) {
+                            LOGGER.warn("Master change state from {} to {}", currentState, slaveState);
+                            changeCurrentState(slaveState);
+                        }
+                        continue;
+                    }
+
+                    if (!byteBufferRead.hasRemaining()) {
+                        byteBufferRead.position(ReadSocketService.this.processPosition);
+                        byteBufferRead.compact();
+                        ReadSocketService.this.processPosition = 0;
+                    }
+                    break;
+                }
+
+                return true;
+            }
+        }
+    }
+
+    class WriteSocketService extends AbstractWriteSocketService {
+        private SelectMappedBufferResult selectMappedBufferResult;
+
+        public WriteSocketService(final SocketChannel socketChannel) throws IOException {
+            super(socketChannel);
+        }
+
+        @Override
+        protected int getNextTransferDataSize() {
+            SelectMappedBufferResult selectResult = haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
+            if (selectResult == null || selectResult.getSize() <= 0) {
+                return 0;
+            }
+            this.selectMappedBufferResult = selectResult;
+            return selectResult.getSize();
+        }
+
+        @Override
+        protected void releaseData() {
+            this.selectMappedBufferResult.release();
+            this.selectMappedBufferResult = null;
+        }
+
+        @Override
+        protected boolean transferData(int maxTransferSize) {
+
+            if (null != this.selectMappedBufferResult && maxTransferSize >= 0) {
+                this.selectMappedBufferResult.getByteBuffer().limit(maxTransferSize);
+            }
+
+            // Write Header
+            boolean result = haWriter.write(this.socketChannel, this.byteBufferHeader);
+
+            if (!result) {
+                return false;
+            }
+
+            if (null == this.selectMappedBufferResult) {
+                return true;
+            }
+
+            // Write Body
+            result = haWriter.write(this.socketChannel, this.selectMappedBufferResult.getByteBuffer());
+
+            if (result) {
+                releaseData();
+            }
+            return result;
+        }
+
+        @Override
+        protected void onStop() {
+            if (this.selectMappedBufferResult != null) {
+                this.selectMappedBufferResult.release();
+            }
+        }
+
+        @Override
+        public String getServiceName() {
+            if (haService.getDefaultMessageStore().getBrokerConfig().isInBrokerContainer()) {
+                return haService.getDefaultMessageStore().getBrokerConfig().getLoggerIdentifier() + WriteSocketService.class.getSimpleName();
+            }
+            return WriteSocketService.class.getSimpleName();
+        }
+    }
+
+    abstract class AbstractWriteSocketService extends ServiceThread {
+        protected final Selector selector;
+        protected final SocketChannel socketChannel;
+        protected final HAWriter haWriter;
+
+        protected final ByteBuffer byteBufferHeader = ByteBuffer.allocate(MSG_HEADER_SIZE);
+        // Store master epochFileCache: (Epoch + startOffset) * 1000
+        private final ByteBuffer handShakeBuffer = ByteBuffer.allocate(EPOCH_ENTRY_SIZE * 1000);
+        protected long nextTransferFromWhere = -1;
+        protected boolean lastWriteOver = true;
+        protected long lastWriteTimestamp = System.currentTimeMillis();
+        protected long lastPrintTimestamp = System.currentTimeMillis();
+        protected long transferOffset = 0;
+
+        public AbstractWriteSocketService(final SocketChannel socketChannel) throws IOException {
+            this.selector = RemotingUtil.openSelector();
+            this.socketChannel = socketChannel;
+            this.socketChannel.register(this.selector, SelectionKey.OP_WRITE);
+            this.setDaemon(true);
+            haWriter = new HAWriter();
+            haWriter.registerHook(writeSize -> {
+                flowMonitor.addByteCountTransferred(writeSize);
+                if (writeSize > 0) {
+                    AbstractWriteSocketService.this.lastWriteTimestamp =
+                        haService.getDefaultMessageStore().getSystemClock().now();
+                }
+            });
+        }
+
+        public long getNextTransferFromWhere() {
+            return this.nextTransferFromWhere;
+        }
+
+        // Handle shake method
+        private boolean buildHandshakeBuffer() {
+            final List<EpochEntry> epochEntries = AutoSwitchHAConnection.this.epochCache.getAllEntries();
+            final int lastEpoch = AutoSwitchHAConnection.this.epochCache.lastEpoch();
+            final long maxPhyOffset = AutoSwitchHAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset();
+            this.byteBufferHeader.position(0);
+            this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+            // State
+            this.byteBufferHeader.putInt(currentState.ordinal());
+            // Body size
+            this.byteBufferHeader.putInt(epochEntries.size() * EPOCH_ENTRY_SIZE);
+            // Offset
+            this.byteBufferHeader.putLong(maxPhyOffset);
+            // Epoch
+            this.byteBufferHeader.putInt(lastEpoch);
+            // Additional info
+            this.byteBufferHeader.putLong(0L);
+            this.byteBufferHeader.flip();
+
+            // EpochEntries
+            this.handShakeBuffer.position(0);
+            this.handShakeBuffer.limit(EPOCH_ENTRY_SIZE * epochEntries.size());
+            for (final EpochEntry entry : epochEntries) {
+                if (entry != null) {
+                    this.handShakeBuffer.putInt(entry.getEpoch());
+                    this.handShakeBuffer.putLong(entry.getStartOffset());
+                }
+            }
+            this.handShakeBuffer.flip();
+            LOGGER.info("Master build handshake header: maxEpoch:{}, maxOffset:{}, epochEntries:{}", lastEpoch, maxPhyOffset, epochEntries);
+            return true;
+        }
+
+        private boolean handshakeWithSlave() {
+            // Write Header
+            boolean result = this.haWriter.write(this.socketChannel, this.byteBufferHeader);
+
+            if (!result) {
+                return false;
+            }
+
+            // Write Body
+            return this.haWriter.write(this.socketChannel, this.handShakeBuffer);
+        }
+
+        // Normal transfer method
+        private void buildTransferHeaderBuffer(long nextOffset, int bodySize) {
+            // Build Header
+            this.byteBufferHeader.position(0);
+            this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+            // State
+            this.byteBufferHeader.putInt(currentState.ordinal());
+            // Body size
+            this.byteBufferHeader.putInt(bodySize);
+            // Offset
+            this.byteBufferHeader.putLong(nextOffset);
+            // Epoch
+            this.byteBufferHeader.putInt(AutoSwitchHAConnection.this.currentTransferEpoch);
+            // Additional info(confirm offset)
+            final long confirmOffset = AutoSwitchHAConnection.this.haService.getConfirmOffset();
+            this.byteBufferHeader.putLong(confirmOffset);
+            this.byteBufferHeader.flip();
+            LOGGER.info("Master send msg, state:{}, size:{}, offset:{}, epoch:{}, confirmOffset:{}",
+                currentState, bodySize, nextOffset, AutoSwitchHAConnection.this.currentTransferEpoch, confirmOffset);
+        }
+
+        private void transferToSlave() throws Exception {
+            if (this.lastWriteOver) {
+                long interval =
+                    haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
+
+                if (interval > haService.getDefaultMessageStore().getMessageStoreConfig()
+                    .getHaSendHeartbeatInterval()) {
+
+                    buildTransferHeaderBuffer(this.nextTransferFromWhere, 0);
+
+                    this.lastWriteOver = this.transferData(0);
+                    if (!this.lastWriteOver) {
+                        return;
+                    }
+                }
+            } else {
+                // maxTransferSize == -1 means to continue transfer remaining data.
+                this.lastWriteOver = this.transferData(-1);
+                if (!this.lastWriteOver) {
+                    return;
+                }
+            }
+
+            int size = this.getNextTransferDataSize();
+            if (size > 0) {
+                if (size > haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
+                    size = haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
+                }
+                int canTransferMaxBytes = flowMonitor.canTransferMaxByteNum();
+                if (size > canTransferMaxBytes) {
+                    if (System.currentTimeMillis() - lastPrintTimestamp > 1000) {
+                        LOGGER.warn("Trigger HA flow control, max transfer speed {}KB/s, current speed: {}KB/s",
+                            String.format("%.2f", flowMonitor.maxTransferByteInSecond() / 1024.0),
+                            String.format("%.2f", flowMonitor.getTransferredByteInSecond() / 1024.0));
+                        lastPrintTimestamp = System.currentTimeMillis();
+                    }
+                    size = canTransferMaxBytes;
+                }
+                if (size <= 0) {
+                    this.releaseData();
+                    this.waitForRunning(100);
+                    return;
+                }
+
+                // We must ensure that the transmitted logs are within the same epoch
+                // If currentEpochEndOffset == -1, means that currentTransferEpoch = last epoch, so the endOffset = +∞.
+                final long currentEpochEndOffset = AutoSwitchHAConnection.this.currentTransferEpochEndOffset;
+                if (currentEpochEndOffset != -1 && this.nextTransferFromWhere + size > currentEpochEndOffset) {
+                    final EpochEntry epochEntry = AutoSwitchHAConnection.this.epochCache.nextEntry(AutoSwitchHAConnection.this.currentTransferEpoch);
+                    if (epochEntry == null) {
+                        LOGGER.error("Can't find a bigger epochEntry than epoch {}", AutoSwitchHAConnection.this.currentTransferEpoch);
+                        waitForRunning(100);
+                        return;
+                    }
+                    size = (int) (currentEpochEndOffset - this.nextTransferFromWhere);
+                    changeTransferEpochToNext(epochEntry);
+                }

Review Comment:
   master epoch 变化的话, 可以走一个统一的接口. master 会把所有 haconnection 清除重连, 所以 {currentTransferEpoch} 和
   {currentTransferEpochEndOffset} 都会动态变化的



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org