You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/04/13 09:12:46 UTC
[iotdb] branch native_raft updated: refactor log dispatcher
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new f86efa19e7 refactor log dispatcher
f86efa19e7 is described below
commit f86efa19e76d8d2a8c01fc2d973fea34417c1387
Author: jt2594838 <jt...@163.com>
AuthorDate: Thu Apr 13 17:12:31 2023 +0800
refactor log dispatcher
---
.../protocol/log/dispatch/DispatcherGroup.java | 111 +++++++
.../protocol/log/dispatch/DispatcherThread.java | 252 ++++++++++++++++
.../protocol/log/dispatch/LogDispatcher.java | 319 +++------------------
.../log/dispatch/flowcontrol/FlowBalancer.java | 23 +-
4 files changed, 410 insertions(+), 295 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
new file mode 100644
index 0000000000..0104695c23
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
+import org.apache.ratis.thirdparty.com.google.common.util.concurrent.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DispatcherGroup {
+ private static final Logger logger = LoggerFactory.getLogger(DispatcherGroup.class);
+ private final Peer peer;
+ private final BlockingQueue<VotingEntry> entryQueue;
+ private boolean nodeEnabled;
+ private final RateLimiter rateLimiter;
+ private final ExecutorService dispatcherThreadPool;
+ private final LogDispatcher logDispatcher;
+ private final AtomicInteger groupThreadNum = new AtomicInteger();
+
+ public DispatcherGroup(Peer peer, LogDispatcher logDispatcher, int bindingThreadNum) {
+ this.logDispatcher = logDispatcher;
+ this.peer = peer;
+ this.entryQueue = new ArrayBlockingQueue<>(logDispatcher.getConfig().getMaxNumOfLogsInMem());
+ this.nodeEnabled = true;
+ this.rateLimiter = RateLimiter.create(Double.MAX_VALUE);
+ this.dispatcherThreadPool = createPool(peer, logDispatcher.getMember().getName());
+ for (int i = 0; i < bindingThreadNum; i++) {
+ addThread();
+ }
+ }
+
+ public void close() {
+ dispatcherThreadPool.shutdownNow();
+ boolean closeSucceeded = false;
+ try {
+ closeSucceeded = dispatcherThreadPool.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ if (!closeSucceeded) {
+ logger.warn("Cannot shut down dispatcher pool of {}-{}", logDispatcher.member.getName(),
+ peer);
+ }
+ }
+ public void addThread() {
+ dispatcherThreadPool
+ .submit(newDispatcherThread(peer, entryQueue, rateLimiter));
+ groupThreadNum.incrementAndGet();
+ }
+
+ DispatcherThread newDispatcherThread(Peer node, BlockingQueue<VotingEntry> logBlockingQueue,
+ RateLimiter rateLimiter) {
+ return new DispatcherThread(logDispatcher, node, logBlockingQueue, rateLimiter, this);
+ }
+
+ public void updateRate(double rate) {
+ rateLimiter.setRate(rate);
+ }
+
+ ExecutorService createPool(Peer node, String name) {
+ return IoTDBThreadPoolFactory.newCachedThreadPool(
+ "LogDispatcher-"
+ + name
+ + "-"
+ + node.getEndpoint().getIp()
+ + "-"
+ + node.getEndpoint().getPort()
+ + "-"
+ + node.getNodeId());
+ }
+
+ public int getQueueSize() {
+ return entryQueue.size();
+ }
+
+ public boolean isNodeEnabled() {
+ return nodeEnabled;
+ }
+
+ public BlockingQueue<VotingEntry> getEntryQueue() {
+ return entryQueue;
+ }
+
+ public AtomicInteger getGroupThreadNum() {
+ return groupThreadNum;
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
new file mode 100644
index 0000000000..8808479b45
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
+import org.apache.iotdb.consensus.natraft.client.SyncClientAdaptor;
+import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
+import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.FlowMonitorManager;
+import org.apache.iotdb.consensus.natraft.utils.LogUtils;
+import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
+import org.apache.iotdb.consensus.raft.thrift.AppendCompressedEntriesRequest;
+import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
+import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
+import org.apache.ratis.thirdparty.com.google.common.util.concurrent.RateLimiter;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class DispatcherThread implements Runnable {
+ private static final Logger logger = LoggerFactory.getLogger(DispatcherThread.class);
+
+ private final LogDispatcher logDispatcher;
+ Peer receiver;
+ private final BlockingQueue<VotingEntry> logBlockingDeque;
+ protected List<VotingEntry> currBatch = new ArrayList<>();
+ private final String baseName;
+ private final RateLimiter rateLimiter;
+ private final DispatcherGroup group;
+
+ protected DispatcherThread(LogDispatcher logDispatcher, Peer receiver,
+ BlockingQueue<VotingEntry> logBlockingDeque, RateLimiter rateLimiter,
+ DispatcherGroup group) {
+ this.logDispatcher = logDispatcher;
+ this.receiver = receiver;
+ this.logBlockingDeque = logBlockingDeque;
+ this.rateLimiter = rateLimiter;
+ this.group = group;
+ this.baseName = "LogDispatcher-" + logDispatcher.member.getName() + "-" + receiver;
+ }
+
+ @Override
+ public void run() {
+ if (logger.isDebugEnabled()) {
+ Thread.currentThread().setName(baseName);
+ }
+ try {
+ while (!Thread.interrupted()) {
+ synchronized (logBlockingDeque) {
+ VotingEntry poll = logBlockingDeque.poll();
+ if (poll != null) {
+ currBatch.add(poll);
+ logBlockingDeque.drainTo(currBatch, logDispatcher.maxBatchSize - 1);
+ } else {
+ logBlockingDeque.wait(10);
+ continue;
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Sending {} logs to {}", currBatch.size(), receiver);
+ }
+ serializeEntries();
+ if (!logDispatcher.queueOrdered) {
+ currBatch.sort(Comparator.comparingLong(s -> s.getEntry().getCurrLogIndex()));
+ }
+ sendLogs(currBatch);
+ currBatch.clear();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ logger.error("Unexpected error in log dispatcher", e);
+ }
+ logger.info("Dispatcher exits");
+ group.getGroupThreadNum().decrementAndGet();
+ }
+
+ protected void serializeEntries() throws InterruptedException {
+ for (VotingEntry request : currBatch) {
+
+ request.getAppendEntryRequest().entry = request.getEntry().serialize();
+ request.getEntry().setByteSize(request.getAppendEntryRequest().entry.limit());
+ }
+ }
+
+ private void appendEntriesAsync(
+ List<ByteBuffer> logList, AppendEntriesRequest request, List<VotingEntry> currBatch) {
+ AsyncMethodCallback<AppendEntryResult> handler = new AppendEntriesHandler(currBatch);
+ AsyncRaftServiceClient client = logDispatcher.member.getClient(receiver.getEndpoint());
+ try {
+ long startTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
+ AppendEntryResult appendEntryResult = SyncClientAdaptor.appendEntries(client, request);
+ Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
+ handler.onComplete(appendEntryResult);
+ } catch (Exception e) {
+ handler.onError(e);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "{}: append entries {} with {} logs", logDispatcher.member.getName(), receiver,
+ logList.size());
+ }
+ }
+
+ private void appendEntriesAsync(
+ List<ByteBuffer> logList,
+ AppendCompressedEntriesRequest request,
+ List<VotingEntry> currBatch) {
+ AsyncMethodCallback<AppendEntryResult> handler = new AppendEntriesHandler(currBatch);
+ AsyncRaftServiceClient client = logDispatcher.member.getClient(receiver.getEndpoint());
+ try {
+ long startTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
+ AppendEntryResult appendEntryResult =
+ SyncClientAdaptor.appendCompressedEntries(client, request);
+ Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
+ handler.onComplete(appendEntryResult);
+ } catch (Exception e) {
+ handler.onError(e);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "{}: append entries {} with {} logs", logDispatcher.member.getName(), receiver,
+ logList.size());
+ }
+ }
+
+ protected AppendEntriesRequest prepareRequest(List<ByteBuffer> logList) {
+ AppendEntriesRequest request = new AppendEntriesRequest();
+
+ request.setGroupId(logDispatcher.member.getRaftGroupId().convertToTConsensusGroupId());
+ request.setLeader(logDispatcher.member.getThisNode().getEndpoint());
+ request.setLeaderId(logDispatcher.member.getThisNode().getNodeId());
+ request.setLeaderCommit(logDispatcher.member.getLogManager().getCommitLogIndex());
+ request.setTerm(logDispatcher.member.getStatus().getTerm().get());
+ request.setEntries(logList);
+ return request;
+ }
+
+ protected AppendCompressedEntriesRequest prepareCompressedRequest(List<ByteBuffer> logList) {
+ AppendCompressedEntriesRequest request = new AppendCompressedEntriesRequest();
+
+ request.setGroupId(logDispatcher.member.getRaftGroupId().convertToTConsensusGroupId());
+ request.setLeader(logDispatcher.member.getThisNode().getEndpoint());
+ request.setLeaderId(logDispatcher.member.getThisNode().getNodeId());
+ request.setLeaderCommit(logDispatcher.member.getLogManager().getCommitLogIndex());
+ request.setTerm(logDispatcher.member.getStatus().getTerm().get());
+ request.setEntryBytes(LogUtils.compressEntries(logList, logDispatcher.compressor));
+ request.setCompressionType((byte) logDispatcher.compressor.getType().ordinal());
+ return request;
+ }
+
+ private void sendLogs(List<VotingEntry> currBatch) {
+ if (currBatch.isEmpty()) {
+ return;
+ }
+
+ int logIndex = 0;
+ logger.debug(
+ "send logs from index {} to {}",
+ currBatch.get(0).getEntry().getCurrLogIndex(),
+ currBatch.get(currBatch.size() - 1).getEntry().getCurrLogIndex());
+ while (logIndex < currBatch.size()) {
+ long logSize = 0;
+ long logSizeLimit = logDispatcher.getConfig().getThriftMaxFrameSize();
+ List<ByteBuffer> logList = new ArrayList<>();
+ int prevIndex = logIndex;
+
+ for (; logIndex < currBatch.size(); logIndex++) {
+ VotingEntry entry = currBatch.get(logIndex);
+ long curSize = entry.getAppendEntryRequest().entry.array().length;
+ if (logSizeLimit - curSize - logSize <= IoTDBConstant.LEFT_SIZE_IN_REQUEST) {
+ break;
+ }
+ logSize += curSize;
+ logList.add(entry.getAppendEntryRequest().entry);
+ Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENDING.calOperationCostTimeFromStart(
+ entry.getEntry().createTime);
+ }
+
+ if (!logDispatcher.enableCompressedDispatching) {
+ AppendEntriesRequest appendEntriesRequest = prepareRequest(logList);
+ appendEntriesAsync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
+ } else {
+ AppendCompressedEntriesRequest appendEntriesRequest = prepareCompressedRequest(logList);
+ appendEntriesAsync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
+ }
+
+ if (logDispatcher.getConfig().isUseFollowerLoadBalance()) {
+ FlowMonitorManager.INSTANCE.report(receiver, logSize);
+ }
+ rateLimiter.acquire((int) logSize);
+ }
+ }
+
+ public AppendNodeEntryHandler getAppendNodeEntryHandler(VotingEntry log, Peer node) {
+ AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
+ handler.setDirectReceiver(node);
+ handler.setVotingEntry(log);
+ handler.setMember(logDispatcher.member);
+ return handler;
+ }
+
+ class AppendEntriesHandler implements AsyncMethodCallback<AppendEntryResult> {
+
+ private final List<AsyncMethodCallback<AppendEntryResult>> singleEntryHandlers;
+
+ private AppendEntriesHandler(List<VotingEntry> batch) {
+ singleEntryHandlers = new ArrayList<>(batch.size());
+ for (VotingEntry sendLogRequest : batch) {
+ AppendNodeEntryHandler handler = getAppendNodeEntryHandler(sendLogRequest, receiver);
+ singleEntryHandlers.add(handler);
+ }
+ }
+
+ @Override
+ public void onComplete(AppendEntryResult aLong) {
+ for (AsyncMethodCallback<AppendEntryResult> singleEntryHandler : singleEntryHandlers) {
+ singleEntryHandler.onComplete(aLong);
+ }
+ }
+
+ @Override
+ public void onError(Exception e) {
+ for (AsyncMethodCallback<AppendEntryResult> singleEntryHandler : singleEntryHandlers) {
+ singleEntryHandler.onError(e);
+ }
+ }
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index 463f9f0748..5ab499da20 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -19,42 +19,23 @@
package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
-import org.apache.iotdb.consensus.natraft.client.SyncClientAdaptor;
-import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
-import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
-import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
-import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.FlowMonitorManager;
-import org.apache.iotdb.consensus.natraft.utils.LogUtils;
-import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
-import org.apache.iotdb.consensus.raft.thrift.AppendCompressedEntriesRequest;
-import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
-import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
-import org.apache.iotdb.tsfile.compress.ICompressor;
-
-import com.google.common.util.concurrent.RateLimiter;
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.iotdb.consensus.natraft.utils.NodeUtils.unionNodes;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.iotdb.consensus.natraft.utils.NodeUtils.unionNodes;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
+import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
+import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A LogDispatcher serves a raft leader by queuing logs that the leader wants to send to its
@@ -67,18 +48,14 @@ public class LogDispatcher {
private static final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
protected RaftMember member;
- private RaftConfig config;
+ protected RaftConfig config;
protected List<Peer> allNodes;
protected List<Peer> newNodes;
- protected Map<Peer, BlockingQueue<VotingEntry>> nodesLogQueuesMap = new HashMap<>();
- protected Map<Peer, Boolean> nodesEnabled;
- protected Map<Peer, RateLimiter> nodesRateLimiter = new HashMap<>();
+ protected Map<Peer, DispatcherGroup> dispatcherGroupMap = new HashMap<>();
protected Map<Peer, Double> nodesRate = new HashMap<>();
- protected Map<Peer, ExecutorService> executorServices = new HashMap<>();
protected boolean queueOrdered;
protected boolean enableCompressedDispatching;
protected ICompressor compressor;
-
public int bindingThreadNum;
public int maxBatchSize = 10;
@@ -98,39 +75,18 @@ public class LogDispatcher {
public void updateRateLimiter() {
logger.info("TEndPoint rates: {}", nodesRate);
for (Entry<Peer, Double> nodeDoubleEntry : nodesRate.entrySet()) {
- nodesRateLimiter.get(nodeDoubleEntry.getKey()).setRate(nodeDoubleEntry.getValue());
+ dispatcherGroupMap.get(nodeDoubleEntry.getKey()).updateRate(nodeDoubleEntry.getValue());
}
}
- void createQueue(Peer node) {
- BlockingQueue<VotingEntry> logBlockingQueue;
- logBlockingQueue = new ArrayBlockingQueue<>(config.getMaxNumOfLogsInMem());
- nodesLogQueuesMap.put(node, logBlockingQueue);
- nodesRateLimiter.put(node, RateLimiter.create(Double.MAX_VALUE));
-
- for (int i = 0; i < bindingThreadNum; i++) {
- executorServices
- .computeIfAbsent(node, n -> createPool(node))
- .submit(newDispatcherThread(node, logBlockingQueue));
- }
- }
-
- ExecutorService createPool(Peer node) {
- return IoTDBThreadPoolFactory.newCachedThreadPool(
- "LogDispatcher-"
- + member.getName()
- + "-"
- + node.getEndpoint().getIp()
- + "-"
- + node.getEndpoint().getPort()
- + "-"
- + node.getNodeId());
+ void createDispatcherGroup(Peer node) {
+ dispatcherGroupMap.computeIfAbsent(node, n -> new DispatcherGroup(n, this, bindingThreadNum));
}
void createQueueAndBindingThreads(Collection<Peer> peers) {
for (Peer node : peers) {
if (!node.equals(member.getThisNode())) {
- createQueue(node);
+ createDispatcherGroup(node);
}
}
updateRateLimiter();
@@ -138,13 +94,9 @@ public class LogDispatcher {
@TestOnly
public void close() throws InterruptedException {
- for (Entry<Peer, ExecutorService> entry : executorServices.entrySet()) {
- ExecutorService pool = entry.getValue();
- pool.shutdownNow();
- boolean closeSucceeded = pool.awaitTermination(10, TimeUnit.SECONDS);
- if (!closeSucceeded) {
- logger.warn("Cannot shut down dispatcher pool of {}-{}", member.getName(), entry.getKey());
- }
+ for (Entry<Peer, DispatcherGroup> entry : dispatcherGroupMap.entrySet()) {
+ DispatcherGroup group = entry.getValue();
+ group.close();
}
}
@@ -160,14 +112,14 @@ public class LogDispatcher {
public void offer(VotingEntry request) {
- for (Entry<Peer, BlockingQueue<VotingEntry>> entry : nodesLogQueuesMap.entrySet()) {
- if (nodesEnabled != null && !this.nodesEnabled.getOrDefault(entry.getKey(), false)) {
+ for (Entry<Peer, DispatcherGroup> entry : dispatcherGroupMap.entrySet()) {
+ DispatcherGroup dispatcherGroup = entry.getValue();
+ if (!dispatcherGroup.isNodeEnabled()) {
continue;
}
- BlockingQueue<VotingEntry> nodeLogQueue = entry.getValue();
try {
- boolean addSucceeded = addToQueue(nodeLogQueue, request);
+ boolean addSucceeded = addToQueue(dispatcherGroup.getEntryQueue(), request);
if (!addSucceeded) {
logger.debug(
@@ -184,226 +136,21 @@ public class LogDispatcher {
}
}
- DispatcherThread newDispatcherThread(Peer node, BlockingQueue<VotingEntry> logBlockingQueue) {
- return new DispatcherThread(node, logBlockingQueue);
- }
+
public void applyNewNodes() {
allNodes = newNodes;
newNodes = null;
List<Peer> nodesToRemove = new ArrayList<>();
- for (Entry<Peer, ExecutorService> entry : executorServices.entrySet()) {
+ for (Entry<Peer, DispatcherGroup> entry : dispatcherGroupMap.entrySet()) {
if (!allNodes.contains(entry.getKey())) {
nodesToRemove.add(entry.getKey());
}
}
for (Peer peer : nodesToRemove) {
- ExecutorService executorService = executorServices.remove(peer);
- executorService.shutdownNow();
- nodesRate.remove(peer);
- nodesRateLimiter.remove(peer);
- nodesEnabled.remove(peer);
- nodesLogQueuesMap.remove(peer);
- }
- }
-
- protected class DispatcherThread implements Runnable {
-
- Peer receiver;
- private final BlockingQueue<VotingEntry> logBlockingDeque;
- protected List<VotingEntry> currBatch = new ArrayList<>();
- private final String baseName;
-
- protected DispatcherThread(Peer receiver, BlockingQueue<VotingEntry> logBlockingDeque) {
- this.receiver = receiver;
- this.logBlockingDeque = logBlockingDeque;
- baseName = "LogDispatcher-" + member.getName() + "-" + receiver;
- }
-
- @Override
- public void run() {
- if (logger.isDebugEnabled()) {
- Thread.currentThread().setName(baseName);
- }
- try {
- while (!Thread.interrupted()) {
- synchronized (logBlockingDeque) {
- VotingEntry poll = logBlockingDeque.poll();
- if (poll != null) {
- currBatch.add(poll);
- logBlockingDeque.drainTo(currBatch, maxBatchSize - 1);
- } else {
- logBlockingDeque.wait(10);
- continue;
- }
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Sending {} logs to {}", currBatch.size(), receiver);
- }
- serializeEntries();
- if (!queueOrdered) {
- currBatch.sort(Comparator.comparingLong(s -> s.getEntry().getCurrLogIndex()));
- }
- sendLogs(currBatch);
- currBatch.clear();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- logger.error("Unexpected error in log dispatcher", e);
- }
- logger.info("Dispatcher exits");
- }
-
- protected void serializeEntries() throws InterruptedException {
- for (VotingEntry request : currBatch) {
-
- request.getAppendEntryRequest().entry = request.getEntry().serialize();
- request.getEntry().setByteSize(request.getAppendEntryRequest().entry.limit());
- }
- }
-
- private void appendEntriesAsync(
- List<ByteBuffer> logList, AppendEntriesRequest request, List<VotingEntry> currBatch) {
- AsyncMethodCallback<AppendEntryResult> handler = new AppendEntriesHandler(currBatch);
- AsyncRaftServiceClient client = member.getClient(receiver.getEndpoint());
- try {
- long startTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
- AppendEntryResult appendEntryResult = SyncClientAdaptor.appendEntries(client, request);
- Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
- handler.onComplete(appendEntryResult);
- } catch (Exception e) {
- handler.onError(e);
- }
- if (logger.isDebugEnabled()) {
- logger.debug(
- "{}: append entries {} with {} logs", member.getName(), receiver, logList.size());
- }
- }
-
- private void appendEntriesAsync(
- List<ByteBuffer> logList,
- AppendCompressedEntriesRequest request,
- List<VotingEntry> currBatch) {
- AsyncMethodCallback<AppendEntryResult> handler = new AppendEntriesHandler(currBatch);
- AsyncRaftServiceClient client = member.getClient(receiver.getEndpoint());
- try {
- long startTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
- AppendEntryResult appendEntryResult =
- SyncClientAdaptor.appendCompressedEntries(client, request);
- Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
- handler.onComplete(appendEntryResult);
- } catch (Exception e) {
- handler.onError(e);
- }
- if (logger.isDebugEnabled()) {
- logger.debug(
- "{}: append entries {} with {} logs", member.getName(), receiver, logList.size());
- }
- }
-
- protected AppendEntriesRequest prepareRequest(List<ByteBuffer> logList) {
- AppendEntriesRequest request = new AppendEntriesRequest();
-
- request.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
- request.setLeader(member.getThisNode().getEndpoint());
- request.setLeaderId(member.getThisNode().getNodeId());
- request.setLeaderCommit(member.getLogManager().getCommitLogIndex());
- request.setTerm(member.getStatus().getTerm().get());
- request.setEntries(logList);
- return request;
- }
-
- protected AppendCompressedEntriesRequest prepareCompressedRequest(List<ByteBuffer> logList) {
- AppendCompressedEntriesRequest request = new AppendCompressedEntriesRequest();
-
- request.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
- request.setLeader(member.getThisNode().getEndpoint());
- request.setLeaderId(member.getThisNode().getNodeId());
- request.setLeaderCommit(member.getLogManager().getCommitLogIndex());
- request.setTerm(member.getStatus().getTerm().get());
- request.setEntryBytes(LogUtils.compressEntries(logList, compressor));
- request.setCompressionType((byte) compressor.getType().ordinal());
- return request;
- }
-
- private void sendLogs(List<VotingEntry> currBatch) {
- if (currBatch.isEmpty()) {
- return;
- }
-
- int logIndex = 0;
- logger.debug(
- "send logs from index {} to {}",
- currBatch.get(0).getEntry().getCurrLogIndex(),
- currBatch.get(currBatch.size() - 1).getEntry().getCurrLogIndex());
- while (logIndex < currBatch.size()) {
- long logSize = 0;
- long logSizeLimit = config.getThriftMaxFrameSize();
- List<ByteBuffer> logList = new ArrayList<>();
- int prevIndex = logIndex;
-
- for (; logIndex < currBatch.size(); logIndex++) {
- VotingEntry entry = currBatch.get(logIndex);
- long curSize = entry.getAppendEntryRequest().entry.array().length;
- if (logSizeLimit - curSize - logSize <= IoTDBConstant.LEFT_SIZE_IN_REQUEST) {
- break;
- }
- logSize += curSize;
- logList.add(entry.getAppendEntryRequest().entry);
- Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENDING.calOperationCostTimeFromStart(
- entry.getEntry().createTime);
- }
-
- if (!enableCompressedDispatching) {
- AppendEntriesRequest appendEntriesRequest = prepareRequest(logList);
- appendEntriesAsync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
- } else {
- AppendCompressedEntriesRequest appendEntriesRequest = prepareCompressedRequest(logList);
- appendEntriesAsync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
- }
-
- if (config.isUseFollowerLoadBalance()) {
- FlowMonitorManager.INSTANCE.report(receiver, logSize);
- }
- nodesRateLimiter.get(receiver).acquire((int) logSize);
- }
- }
-
- public AppendNodeEntryHandler getAppendNodeEntryHandler(VotingEntry log, Peer node) {
- AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
- handler.setDirectReceiver(node);
- handler.setVotingEntry(log);
- handler.setMember(member);
- return handler;
- }
-
- class AppendEntriesHandler implements AsyncMethodCallback<AppendEntryResult> {
-
- private final List<AsyncMethodCallback<AppendEntryResult>> singleEntryHandlers;
-
- private AppendEntriesHandler(List<VotingEntry> batch) {
- singleEntryHandlers = new ArrayList<>(batch.size());
- for (VotingEntry sendLogRequest : batch) {
- AppendNodeEntryHandler handler = getAppendNodeEntryHandler(sendLogRequest, receiver);
- singleEntryHandlers.add(handler);
- }
- }
-
- @Override
- public void onComplete(AppendEntryResult aLong) {
- for (AsyncMethodCallback<AppendEntryResult> singleEntryHandler : singleEntryHandlers) {
- singleEntryHandler.onComplete(aLong);
- }
- }
-
- @Override
- public void onError(Exception e) {
- for (AsyncMethodCallback<AppendEntryResult> singleEntryHandler : singleEntryHandlers) {
- singleEntryHandler.onError(e);
- }
- }
+ DispatcherGroup removed = dispatcherGroupMap.remove(peer);
+ removed.close();
}
}
@@ -411,16 +158,24 @@ public class LogDispatcher {
return nodesRate;
}
- public Map<Peer, BlockingQueue<VotingEntry>> getNodesLogQueuesMap() {
- return nodesLogQueuesMap;
+ public Map<Peer, DispatcherGroup> getDispatcherGroupMap() {
+ return dispatcherGroupMap;
}
public void setNewNodes(List<Peer> newNodes) {
this.newNodes = newNodes;
for (Peer newNode : newNodes) {
if (!allNodes.contains(newNode)) {
- createQueue(newNode);
+ createDispatcherGroup(newNode);
}
}
}
+
+ public RaftConfig getConfig() {
+ return config;
+ }
+
+ public RaftMember getMember() {
+ return member;
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
index 3c838f81b7..27ffe6b86d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
@@ -19,26 +19,23 @@
package org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.RaftRole;
-import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
+import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.DispatcherGroup;
import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.LogDispatcher;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
public class FlowBalancer {
private static final Logger logger = LoggerFactory.getLogger(FlowBalancer.class);
@@ -88,11 +85,11 @@ public class FlowBalancer {
double thisNodeFlow = flowMonitorManager.averageFlow(member.getThisNode(), windowsToUse);
double assumedFlow = thisNodeFlow * overestimateFactor;
logger.info("Flow of this node: {}", thisNodeFlow);
- Map<Peer, BlockingQueue<VotingEntry>> nodesLogQueuesMap = logDispatcher.getNodesLogQueuesMap();
+ Map<Peer, DispatcherGroup> dispatcherGroupMap = logDispatcher.getDispatcherGroupMap();
Map<Peer, Double> nodesRate = logDispatcher.getNodesRate();
// sort followers according to their queue length
- followers.sort(Comparator.comparing(node -> nodesLogQueuesMap.get(node).size()));
+ followers.sort(Comparator.comparing(node -> dispatcherGroupMap.get(node).getQueueSize()));
if (assumedFlow * followerNum > maxFlow) {
enterBurst(nodesRate, nodeNum, assumedFlow, followers);
} else {