You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/12/01 01:16:07 UTC
[iotdb] 07/09: fix client in LogDispatcher
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 224002987d8f8bedefb3bfecb6730dcadf7ffba1
Author: jt <jt...@163.com>
AuthorDate: Mon Nov 8 18:25:16 2021 +0800
fix client in LogDispatcher
---
.../org/apache/iotdb/cluster/expr/ExprBench.java | 88 ++++++++--------
.../org/apache/iotdb/cluster/expr/ExprServer.java | 5 +
.../apache/iotdb/cluster/expr/VotingLogList.java | 6 ++
.../iotdb/cluster/log/IndirectLogDispatcher.java | 8 +-
.../java/org/apache/iotdb/cluster/log/Log.java | 7 +-
.../apache/iotdb/cluster/log/LogDispatcher.java | 87 ++++++++++------
.../cluster/log/logtypes/PhysicalPlanLog.java | 14 ++-
.../cluster/server/member/DataGroupMember.java | 7 +-
.../cluster/server/member/MetaGroupMember.java | 9 +-
.../iotdb/cluster/server/member/RaftMember.java | 111 +++++++++++++--------
.../apache/iotdb/cluster/server/monitor/Timer.java | 36 ++++++-
.../cluster/server/member/MetaGroupMemberTest.java | 4 +-
12 files changed, 251 insertions(+), 131 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
index e3f7b57..d0891a3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
@@ -32,6 +32,8 @@ import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
public class ExprBench {
@@ -44,6 +46,7 @@ public class ExprBench {
private SyncClientPool clientPool;
private Node target;
private int maxRequestNum;
+ private ExecutorService pool = Executors.newCachedThreadPool();
public ExprBench(Node target) {
this.target = target;
@@ -54,52 +57,51 @@ public class ExprBench {
public void benchmark() {
long startTime = System.currentTimeMillis();
for (int i = 0; i < threadNum; i++) {
- new Thread(
- () -> {
- Client client = clientPool.getClient(target);
- ExecutNonQueryReq request = new ExecutNonQueryReq();
- DummyPlan plan = new DummyPlan();
- plan.setWorkload(new byte[workloadSize]);
- plan.setNeedForward(true);
- ByteBuffer byteBuffer = ByteBuffer.allocate(workloadSize + 4096);
- plan.serialize(byteBuffer);
- byteBuffer.flip();
- request.setPlanBytes(byteBuffer);
- long currRequsetNum = -1;
- while (true) {
+ pool.submit(
+ () -> {
+ Client client = clientPool.getClient(target);
+ ExecutNonQueryReq request = new ExecutNonQueryReq();
+ DummyPlan plan = new DummyPlan();
+ plan.setWorkload(new byte[workloadSize]);
+ plan.setNeedForward(true);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(workloadSize + 4096);
+ plan.serialize(byteBuffer);
+ byteBuffer.flip();
+ request.setPlanBytes(byteBuffer);
+ long currRequsetNum = -1;
+ while (true) {
- long reqLatency = System.nanoTime();
- try {
- client.executeNonQueryPlan(request);
- currRequsetNum = requestCounter.incrementAndGet();
- if (currRequsetNum > threadNum * 10) {
- reqLatency = System.nanoTime() - reqLatency;
- maxLatency = Math.max(maxLatency, reqLatency);
- latencySum.addAndGet(reqLatency);
- }
- } catch (TException e) {
- e.printStackTrace();
- }
+ long reqLatency = System.nanoTime();
+ try {
+ client.executeNonQueryPlan(request);
+ currRequsetNum = requestCounter.incrementAndGet();
+ if (currRequsetNum > threadNum * 10) {
+ reqLatency = System.nanoTime() - reqLatency;
+ maxLatency = Math.max(maxLatency, reqLatency);
+ latencySum.addAndGet(reqLatency);
+ }
+ } catch (TException e) {
+ e.printStackTrace();
+ }
- if (currRequsetNum % 1000 == 0) {
- long elapsedTime = System.currentTimeMillis() - startTime;
- System.out.println(
- String.format(
- "%d %d %f(%f) %f %f",
- elapsedTime,
- currRequsetNum,
- (currRequsetNum + 0.0) / elapsedTime,
- currRequsetNum * workloadSize / (1024.0 * 1024.0) / elapsedTime,
- maxLatency / 1000.0,
- (latencySum.get() + 0.0) / currRequsetNum));
- }
+ if (currRequsetNum % 1000 == 0) {
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ System.out.println(
+ String.format(
+ "%d %d %f(%f) %f %f",
+ elapsedTime,
+ currRequsetNum,
+ (currRequsetNum + 0.0) / elapsedTime,
+ currRequsetNum * workloadSize / (1024.0 * 1024.0) / elapsedTime,
+ maxLatency / 1000.0,
+ (latencySum.get() + 0.0) / currRequsetNum));
+ }
- if (currRequsetNum >= maxRequestNum) {
- break;
- }
- }
- })
- .start();
+ if (currRequsetNum >= maxRequestNum) {
+ break;
+ }
+ }
+ });
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
index 4381746..6aac585 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.expr;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogDispatcher;
import org.apache.iotdb.cluster.server.MetaClusterServer;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
@@ -79,6 +80,8 @@ public class ExprServer extends MetaClusterServer {
boolean useSW = Boolean.parseBoolean(args[5]);
boolean enableWeakAcceptance = Boolean.parseBoolean(args[6]);
boolean enableCommitReturn = Boolean.parseBoolean(args[7]);
+ int maxBatchSize = Integer.parseInt(args[8]);
+ int defaultLogBufferSize = Integer.parseInt(args[9]);
ClusterDescriptor.getInstance().getConfig().setSeedNodeUrls(Arrays.asList(allNodeStr));
ClusterDescriptor.getInstance().getConfig().setInternalMetaPort(port);
@@ -89,10 +92,12 @@ public class ExprServer extends MetaClusterServer {
RaftMember.USE_LOG_DISPATCHER = true;
RaftMember.USE_INDIRECT_LOG_DISPATCHER = useIndirectDispatcher;
LogDispatcher.bindingThreadNum = dispatcherThreadNum;
+ LogDispatcher.maxBatchSize = maxBatchSize;
ExprMember.bypassRaft = bypassRaft;
ExprMember.useSlidingWindow = useSW;
ExprMember.ENABLE_WEAK_ACCEPTANCE = enableWeakAcceptance;
ExprMember.ENABLE_COMMIT_RETURN = enableCommitReturn;
+ Log.DEFAULT_BUFFER_SIZE = defaultLogBufferSize * 1024 + 512;
ExprServer server = new ExprServer();
server.start();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
index 28eb19d..7e13336 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
import java.util.List;
public class VotingLogList {
+
private static final Logger logger = LoggerFactory.getLogger(VotingLogList.class);
private List<VotingLog> logList = new ArrayList<>();
@@ -75,6 +76,11 @@ public class VotingLogList {
if (votingLog.getStronglyAcceptedNodeIds().size() >= quorumSize) {
lastEntryIndexToCommit = i;
}
+ if (votingLog.getStronglyAcceptedNodeIds().size()
+ + votingLog.getWeaklyAcceptedNodeIds().size()
+ >= quorumSize) {
+ votingLog.acceptedTime = System.nanoTime();
+ }
} else if (votingLog.getLog().getCurrLogIndex() > index) {
break;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
index 65496c3..767736d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -87,9 +87,9 @@ public class IndirectLogDispatcher extends LogDispatcher {
}
}
- for (Node node : directToIndirectFollowerMap.keySet()) {
- nodeLogQueues.add(createQueueAndBindingThread(node));
- }
+ // for (Node node : directToIndirectFollowerMap.keySet()) {
+ // nodeLogQueues.add(createQueueAndBindingThread(node));
+ // }
}
class DispatcherThread extends LogDispatcher.DispatcherThread {
@@ -110,7 +110,7 @@ public class IndirectLogDispatcher extends LogDispatcher {
logRequest.getAppendEntryRequest(),
logRequest.getQuorumSize(),
directToIndirectFollowerMap.get(receiver));
- Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_END.calOperationCostTimeFromStart(
+ Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENT.calOperationCostTimeFromStart(
logRequest.getVotingLog().getLog().getCreateTime());
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index e70c326..4845bba 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -32,7 +32,8 @@ public abstract class Log implements Comparable<Log> {
private static final Comparator<Log> COMPARATOR =
Comparator.comparingLong(Log::getCurrLogIndex).thenComparing(Log::getCurrLogTerm);
- protected static final int DEFAULT_BUFFER_SIZE = 4096;
+ // make this configurable or adaptive
+ public static int DEFAULT_BUFFER_SIZE = 16 * 1024;
private long currLogIndex;
private long currLogTerm;
@@ -51,6 +52,10 @@ public abstract class Log implements Comparable<Log> {
public abstract void deserialize(ByteBuffer buffer);
+ public void serialize(ByteBuffer buffer) {
+ buffer.put(serialize());
+ }
+
public enum Types {
// DO CHECK LogParser when you add a new type of log
ADD_NODE,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 6c3d2073..9558714 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -31,10 +31,12 @@ import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.server.monitor.Peer;
import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.db.utils.TestOnly;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -68,13 +70,14 @@ public class LogDispatcher {
RaftMember member;
private ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
private boolean useBatchInLogCatchUp = clusterConfig.isUseBatchInLogCatchUp();
- List<BlockingQueue<SendLogRequest>> nodeLogQueues = new ArrayList<>();
+ List<BlockingQueue<SendLogRequest>> nodesLogQueues = new ArrayList<>();
ExecutorService executorService;
private static ExecutorService serializationService =
Executors.newFixedThreadPool(
- Runtime.getRuntime().availableProcessors() * 2,
+ CommonUtils.getCpuCores() * 2,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DispatcherEncoder-%d").build());
public static int bindingThreadNum = 1;
+ public static int maxBatchSize = 1;
public LogDispatcher(RaftMember member) {
this.member = member;
@@ -85,7 +88,7 @@ public class LogDispatcher {
void createQueueAndBindingThreads() {
for (Node node : member.getAllNodes()) {
if (!ClusterUtils.isNodeEquals(node, member.getThisNode())) {
- nodeLogQueues.add(createQueueAndBindingThread(node));
+ nodesLogQueues.add(createQueueAndBindingThread(node));
}
}
}
@@ -96,20 +99,22 @@ public class LogDispatcher {
executorService.awaitTermination(10, TimeUnit.SECONDS);
}
+ private ByteBuffer serializeTask(SendLogRequest request) {
+ ByteBuffer byteBuffer = request.getVotingLog().getLog().serialize();
+ request.getVotingLog().getLog().setByteSize(byteBuffer.capacity());
+ return byteBuffer;
+ }
+
public void offer(SendLogRequest request) {
// do serialization here to avoid taking LogManager for too long
- if (!nodeLogQueues.isEmpty()) {
- request.serializedLogFuture =
- serializationService.submit(
- () -> {
- ByteBuffer byteBuffer = request.getVotingLog().getLog().serialize();
- request.getVotingLog().getLog().setByteSize(byteBuffer.array().length);
- return byteBuffer;
- });
+ if (!nodesLogQueues.isEmpty()) {
+ request.serializedLogFuture = serializationService.submit(() -> serializeTask(request));
}
- for (int i = 0; i < nodeLogQueues.size(); i++) {
- BlockingQueue<SendLogRequest> nodeLogQueue = nodeLogQueues.get(i);
+ long startTime = Statistic.LOG_DISPATCHER_LOG_ENQUEUE.getOperationStartTime();
+ request.getVotingLog().getLog().setEnqueueTime(System.nanoTime());
+ for (int i = 0; i < nodesLogQueues.size(); i++) {
+ BlockingQueue<SendLogRequest> nodeLogQueue = nodesLogQueues.get(i);
try {
boolean addSucceeded;
if (ClusterDescriptor.getInstance().getConfig().isWaitForSlowNode()) {
@@ -135,10 +140,17 @@ public class LogDispatcher {
Thread.currentThread().interrupt();
}
}
+ Statistic.LOG_DISPATCHER_LOG_ENQUEUE.calOperationCostTimeFromStart(startTime);
+
+ if (Timer.ENABLE_INSTRUMENTING) {
+ Statistic.LOG_DISPATCHER_FROM_CREATE_TO_ENQUEUE.calOperationCostTimeFromStart(
+ request.getVotingLog().getLog().getCreateTime());
+ }
}
BlockingQueue<SendLogRequest> createQueueAndBindingThread(Node node) {
- BlockingQueue<SendLogRequest> logBlockingQueue =
+ BlockingQueue<SendLogRequest> logBlockingQueue;
+ logBlockingQueue =
new ArrayBlockingQueue<>(
ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
for (int i = 0; i < bindingThreadNum; i++) {
@@ -234,6 +246,7 @@ public class LogDispatcher {
private BlockingQueue<SendLogRequest> logBlockingDeque;
private List<SendLogRequest> currBatch = new ArrayList<>();
private Peer peer;
+ Client client;
DispatcherThread(Node receiver, BlockingQueue<SendLogRequest> logBlockingDeque) {
this.receiver = receiver;
@@ -242,6 +255,7 @@ public class LogDispatcher {
member
.getPeerMap()
.computeIfAbsent(receiver, r -> new Peer(member.getLogManager().getLastLogIndex()));
+ client = member.getSyncClient(receiver);
}
@Override
@@ -252,15 +266,20 @@ public class LogDispatcher {
synchronized (logBlockingDeque) {
SendLogRequest poll = logBlockingDeque.take();
currBatch.add(poll);
- if (useBatchInLogCatchUp) {
- logBlockingDeque.drainTo(currBatch);
+ if (maxBatchSize > 1) {
+ logBlockingDeque.drainTo(currBatch, maxBatchSize - 1);
}
}
if (logger.isDebugEnabled()) {
logger.debug("Sending {} logs to {}", currBatch.size(), receiver);
}
+ Statistic.LOG_DISPATCHER_LOG_BATCH_SIZE.add(currBatch.size());
for (SendLogRequest request : currBatch) {
+ Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
+ request.getVotingLog().getLog().getEnqueueTime());
+ long start = Statistic.RAFT_SENDER_SERIALIZE_LOG.getOperationStartTime();
request.getAppendEntryRequest().entry = request.serializedLogFuture.get();
+ Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(start);
}
sendBatchLogs(currBatch);
currBatch.clear();
@@ -364,8 +383,6 @@ public class LogDispatcher {
break;
}
logSize -= curSize;
- Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
- currBatch.get(logIndex).getVotingLog().getLog().getCreateTime());
logList.add(currBatch.get(logIndex).getAppendEntryRequest().entry);
}
@@ -376,7 +393,7 @@ public class LogDispatcher {
appendEntriesSync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
}
for (; prevIndex < logIndex; prevIndex++) {
- Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_END.calOperationCostTimeFromStart(
+ Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENT.calOperationCostTimeFromStart(
currBatch.get(prevIndex).getVotingLog().getLog().getCreateTime());
}
}
@@ -397,17 +414,27 @@ public class LogDispatcher {
}
void sendLog(SendLogRequest logRequest) {
- Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
- logRequest.getVotingLog().getLog().getCreateTime());
- member.sendLogToFollower(
- logRequest.getVotingLog(),
- receiver,
- logRequest.getLeaderShipStale(),
- logRequest.getNewLeaderTerm(),
- logRequest.getAppendEntryRequest(),
- logRequest.getQuorumSize());
- Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_END.calOperationCostTimeFromStart(
- logRequest.getVotingLog().getLog().getCreateTime());
+ AppendNodeEntryHandler handler =
+ member.getAppendNodeEntryHandler(
+ logRequest.getVotingLog(),
+ receiver,
+ logRequest.leaderShipStale,
+ logRequest.newLeaderTerm,
+ peer,
+ logRequest.quorumSize);
+ try {
+ long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
+ AppendEntryResult result = client.appendEntry(logRequest.appendEntryRequest);
+ Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(operationStartTime);
+ handler.onComplete(result);
+ } catch (TException e) {
+ client.getInputProtocol().getTransport().close();
+ ClientUtils.putBackSyncClient(client);
+ client = member.getSyncClient(receiver);
+ handler.onError(e);
+ } catch (Exception e) {
+ handler.onError(e);
+ }
}
class AppendEntriesHandler implements AsyncMethodCallback<AppendEntryResult> {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
index ad24794..7ecdf5f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
@@ -22,11 +22,11 @@ package org.apache.iotdb.cluster.log.logtypes;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -48,7 +48,7 @@ public class PhysicalPlanLog extends Log {
@Override
public ByteBuffer serialize() {
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(DEFAULT_BUFFER_SIZE);
+ PublicBAOS byteArrayOutputStream = new PublicBAOS(DEFAULT_BUFFER_SIZE);
try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
dataOutputStream.writeByte((byte) PHYSICAL_PLAN.ordinal());
@@ -60,7 +60,15 @@ public class PhysicalPlanLog extends Log {
// unreachable
}
- return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+ }
+
+ @Override
+ public void serialize(ByteBuffer buffer) {
+ buffer.put((byte) PHYSICAL_PLAN.ordinal());
+ buffer.putLong(getCurrLogIndex());
+ buffer.putLong(getCurrLogTerm());
+ plan.serialize(buffer);
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index eeea9d6..292dafe 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogApplier;
import org.apache.iotdb.cluster.log.LogParser;
import org.apache.iotdb.cluster.log.Snapshot;
+import org.apache.iotdb.cluster.log.VotingLog;
import org.apache.iotdb.cluster.log.applier.AsyncDataLogApplier;
import org.apache.iotdb.cluster.log.applier.DataLogApplier;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
@@ -617,16 +618,18 @@ public class DataGroupMember extends RaftMember {
return false;
}
CloseFileLog log = new CloseFileLog(storageGroupName, partitionId, isSeq);
+ VotingLog votingLog;
synchronized (logManager) {
log.setCurrLogTerm(getTerm().get());
log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
logManager.append(log);
-
+ votingLog = buildVotingLog(log);
+ votingLogList.insert(votingLog);
logger.info("Send the close file request of {} to other nodes", log);
}
try {
- return appendLogInGroup(log);
+ return appendLogInGroup(votingLog);
} catch (LogExecutionException e) {
logger.error("Cannot close partition {}#{} seq:{}", storageGroupName, partitionId, isSeq, e);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 9dafebe..ad0f55c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
import org.apache.iotdb.cluster.exception.SnapshotInstallationException;
import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
import org.apache.iotdb.cluster.log.LogApplier;
+import org.apache.iotdb.cluster.log.VotingLog;
import org.apache.iotdb.cluster.log.applier.MetaLogApplier;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
@@ -938,6 +939,7 @@ public class MetaGroupMember extends RaftMember {
}
AddNodeLog addNodeLog = new AddNodeLog();
+ VotingLog votingLog = buildVotingLog(addNodeLog);
// node adding is serialized to reduce potential concurrency problem
synchronized (logManager) {
// update partition table
@@ -954,6 +956,7 @@ public class MetaGroupMember extends RaftMember {
addNodeLog.setNewNode(newNode);
logManager.append(addNodeLog);
+ votingLogList.insert(votingLog);
}
int retryTime = 0;
@@ -963,7 +966,7 @@ public class MetaGroupMember extends RaftMember {
name,
newNode,
retryTime);
- AppendLogResult result = sendLogToFollowers(addNodeLog);
+ AppendLogResult result = sendLogToFollowers(votingLog);
switch (result) {
case OK:
commitLog(addNodeLog);
@@ -1693,6 +1696,7 @@ public class MetaGroupMember extends RaftMember {
}
RemoveNodeLog removeNodeLog = new RemoveNodeLog();
+ VotingLog votingLog = buildVotingLog(removeNodeLog);
// node removal must be serialized to reduce potential concurrency problem
synchronized (logManager) {
// update partition table
@@ -1708,6 +1712,7 @@ public class MetaGroupMember extends RaftMember {
removeNodeLog.setRemovedNode(target);
logManager.append(removeNodeLog);
+ votingLogList.insert(votingLog);
}
int retryTime = 0;
@@ -1717,7 +1722,7 @@ public class MetaGroupMember extends RaftMember {
name,
target,
retryTime);
- AppendLogResult result = sendLogToFollowers(removeNodeLog);
+ AppendLogResult result = sendLogToFollowers(votingLog);
switch (result) {
case OK:
commitLog(removeNodeLog);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 829e76b..81d4ff3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -1130,38 +1130,43 @@ public abstract class RaftMember {
long startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG.getOperationStartTime();
Log log;
+ if (plan instanceof LogPlan) {
+ try {
+ log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
+ } catch (UnknownLogTypeException e) {
+ logger.error("Can not parse LogPlan {}", plan, e);
+ return StatusUtils.PARSE_LOG_ERROR;
+ }
+ } else {
+ log = new PhysicalPlanLog();
+ ((PhysicalPlanLog) log).setPlan(plan);
+ plan.setIndex(logManager.getLastLogIndex() + 1);
+ }
+ // if a single log exceeds the threshold
+ // we need to return error code to the client as in server mode
+ // if (log.serialize().capacity() + Integer.BYTES
+ // >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
+ // logger.error(
+ // "Log cannot fit into buffer, please increase raft_log_buffer_size;"
+ // + "or reduce the size of requests you send.");
+ // return StatusUtils.INTERNAL_ERROR;
+ // }
+
// assign term and index to the new log and append it
+ VotingLog votingLog;
synchronized (logManager) {
- if (plan instanceof LogPlan) {
- try {
- log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
- } catch (UnknownLogTypeException e) {
- logger.error("Can not parse LogPlan {}", plan, e);
- return StatusUtils.PARSE_LOG_ERROR;
- }
- } else {
- log = new PhysicalPlanLog();
- ((PhysicalPlanLog) log).setPlan(plan);
- plan.setIndex(logManager.getLastLogIndex() + 1);
- }
log.setCurrLogTerm(getTerm().get());
log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
-
- // if a single log exceeds the threshold
- // we need to return error code to the client as in server mode
- if (log.serialize().capacity() + Integer.BYTES
- >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
- logger.error(
- "Log cannot fit into buffer, please increase raft_log_buffer_size;"
- + "or reduce the size of requests you send.");
- return StatusUtils.INTERNAL_ERROR;
- }
logManager.append(log);
+ votingLog = buildVotingLog(log);
+ votingLogList.insert(votingLog);
}
+ log.setCreateTime(System.nanoTime());
+
Timer.Statistic.RAFT_SENDER_APPEND_LOG.calOperationCostTimeFromStart(startTime);
try {
- if (appendLogInGroup(log)) {
+ if (appendLogInGroup(votingLog)) {
return StatusUtils.OK;
}
} catch (LogExecutionException e) {
@@ -1171,6 +1176,7 @@ public abstract class RaftMember {
}
protected TSStatus processPlanLocallyV2(PhysicalPlan plan) {
+ long totalStartTime = System.nanoTime();
logger.debug("{}: Processing plan {}", name, plan);
if (readOnly) {
return StatusUtils.NODE_READ_ONLY;
@@ -1190,13 +1196,13 @@ public abstract class RaftMember {
}
// just like processPlanLocally,we need to check the size of log
- if (log.serialize().capacity() + Integer.BYTES
- >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
- logger.error(
- "Log cannot fit into buffer, please increase raft_log_buffer_size;"
- + "or reduce the size of requests you send.");
- return StatusUtils.INTERNAL_ERROR;
- }
+ // if (log.serialize().capacity() + Integer.BYTES
+ // >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
+ // logger.error(
+ // "Log cannot fit into buffer, please increase raft_log_buffer_size;"
+ // + "or reduce the size of requests you send.");
+ // return StatusUtils.INTERNAL_ERROR;
+ // }
// assign term and index to the new log and append it
SendLogRequest sendLogRequest = logSequencer.sequence(log);
@@ -1215,12 +1221,18 @@ public abstract class RaftMember {
case WEAK_ACCEPT:
// TODO: change to weak
Statistic.RAFT_WEAK_ACCEPT.add(1);
+ Statistic.LOG_DISPATCHER_FROM_CREATE_TO_OK.calOperationCostTimeFromStart(
+ log.getCreateTime());
+ Statistic.LOG_DISPATCHER_TOTAL.calOperationCostTimeFromStart(totalStartTime);
return StatusUtils.OK;
case OK:
logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
commitLog(log);
Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
+ Statistic.LOG_DISPATCHER_FROM_CREATE_TO_OK.calOperationCostTimeFromStart(
+ log.getCreateTime());
+ Statistic.LOG_DISPATCHER_TOTAL.calOperationCostTimeFromStart(totalStartTime);
return StatusUtils.OK;
case TIME_OUT:
logger.debug("{}: log {} timed out...", name, log);
@@ -1610,11 +1622,12 @@ public abstract class RaftMember {
VotingLog log, AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm, int quorumSize) {
// wait for the followers to vote
long startTime = Timer.Statistic.RAFT_SENDER_VOTE_COUNTER.getOperationStartTime();
- long nextTimeToPrint = 3000;
+ long nextTimeToPrint = 15000;
int stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
int weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
int totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
+
synchronized (log) {
long waitStart = System.currentTimeMillis();
long alreadyWait = 0;
@@ -1644,7 +1657,7 @@ public abstract class RaftMember {
totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
}
- if (alreadyWait > 3000) {
+ if (alreadyWait > 15000) {
logger.info(
"Slow entry {}, strongly accepted {}, weakly " + "accepted {}, waited time {}ms",
log,
@@ -1683,9 +1696,9 @@ public abstract class RaftMember {
@SuppressWarnings("java:S2445")
protected void commitLog(Log log) throws LogExecutionException {
- long startTime =
- Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.getOperationStartTime();
+ long startTime;
if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
+ startTime = Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.getOperationStartTime();
synchronized (logManager) {
if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.calOperationCostTimeFromStart(
@@ -1693,9 +1706,12 @@ public abstract class RaftMember {
startTime = Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.getOperationStartTime();
logManager.commitTo(log.getCurrLogIndex());
+ Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(startTime);
+
+ startTime = Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.getOperationStartTime();
}
}
- Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(startTime);
+ Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.calOperationCostTimeFromStart(startTime);
}
if (ENABLE_COMMIT_RETURN) {
return;
@@ -1752,9 +1768,11 @@ public abstract class RaftMember {
AppendEntryRequest request = new AppendEntryRequest();
request.setTerm(term.get());
if (serializeNow) {
+ long start = Statistic.RAFT_SENDER_SERIALIZE_LOG.getOperationStartTime();
ByteBuffer byteBuffer = log.serialize();
log.setByteSize(byteBuffer.array().length);
- request.setEntry(byteBuffer);
+ request.entry = byteBuffer;
+ Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(start);
}
request.setLeader(getThisNode());
// don't need lock because even if it's larger than the commitIndex when appending this log to
@@ -1840,12 +1858,13 @@ public abstract class RaftMember {
*
* @return true if the log is accepted by the quorum of the group, false otherwise
*/
- boolean appendLogInGroup(Log log) throws LogExecutionException {
+ boolean appendLogInGroup(VotingLog log) throws LogExecutionException {
+ long totalStartTime = Statistic.LOG_DISPATCHER_TOTAL.getOperationStartTime();
if (allNodes.size() == 1) {
// single node group, no followers
long startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
- commitLog(log);
+ commitLog(log.getLog());
Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
return true;
}
@@ -1861,11 +1880,19 @@ public abstract class RaftMember {
AppendLogResult result = sendLogToFollowers(log);
Timer.Statistic.RAFT_SENDER_SEND_LOG_TO_FOLLOWERS.calOperationCostTimeFromStart(startTime);
switch (result) {
+ case WEAK_ACCEPT:
+ // TODO: change to weak
+ Statistic.RAFT_WEAK_ACCEPT.add(1);
+ Statistic.LOG_DISPATCHER_FROM_CREATE_TO_OK.calOperationCostTimeFromStart(
+ log.getLog().getCreateTime());
+ Statistic.LOG_DISPATCHER_TOTAL.calOperationCostTimeFromStart(totalStartTime);
+ return true;
case OK:
startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
- commitLog(log);
+ commitLog(log.getLog());
Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
+ Statistic.LOG_DISPATCHER_TOTAL.calOperationCostTimeFromStart(totalStartTime);
return true;
case TIME_OUT:
logger.debug("{}: log {} timed out, retrying...", name, log);
@@ -1893,14 +1920,14 @@ public abstract class RaftMember {
*
* @return an AppendLogResult
*/
- protected AppendLogResult sendLogToFollowers(Log log) {
+ protected AppendLogResult sendLogToFollowers(VotingLog log) {
int requiredQuorum = allNodes.size() / 2;
if (requiredQuorum <= 0) {
// use half of the members' size as the quorum
- return sendLogToFollowers(buildVotingLog(log), allNodes.size() / 2);
+ return sendLogToFollowers(log, allNodes.size() / 2);
} else {
// make sure quorum does not exceed the number of members - 1
- return sendLogToFollowers(buildVotingLog(log), Math.min(requiredQuorum, allNodes.size() - 1));
+ return sendLogToFollowers(log, Math.min(requiredQuorum, allNodes.size() - 1));
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index c0afaa7..c794211 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -170,6 +170,12 @@ public class Timer {
TIME_SCALE,
RaftMember.USE_LOG_DISPATCHER,
RAFT_SENDER_COMMIT_LOG),
+ RAFT_SENDER_EXIT_LOG_MANAGER(
+ RAFT_MEMBER_SENDER,
+ "exiting log manager synchronizer",
+ TIME_SCALE,
+ RaftMember.USE_LOG_DISPATCHER,
+ RAFT_SENDER_COMMIT_LOG),
RAFT_SENDER_COMMIT_GET_LOGS(
RAFT_MEMBER_SENDER,
"get logs to be committed",
@@ -231,15 +237,41 @@ public class Timer {
RAFT_MEMBER_RECEIVER, "append entrys", TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
RAFT_RECEIVER_INDEX_DIFF(RAFT_MEMBER_RECEIVER, "index diff", 1.0, true, ROOT),
// log dispatcher
+ LOG_DISPATCHER_FROM_CREATE_TO_ENQUEUE(
+ LOG_DISPATCHER,
+ "from create to queue",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ LOG_DISPATCHER_LOG_ENQUEUE(
+ LOG_DISPATCHER,
+ "enqueue",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
LOG_DISPATCHER_LOG_IN_QUEUE(
LOG_DISPATCHER,
"in queue",
TIME_SCALE,
true,
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
- LOG_DISPATCHER_FROM_CREATE_TO_END(
+ LOG_DISPATCHER_LOG_BATCH_SIZE(
+ LOG_DISPATCHER, "batch size", 1, true, META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ LOG_DISPATCHER_FROM_CREATE_TO_SENT(
+ LOG_DISPATCHER,
+ "from create to sent",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ LOG_DISPATCHER_FROM_CREATE_TO_OK(
+ LOG_DISPATCHER,
+ "from create to OK",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ LOG_DISPATCHER_TOTAL(
LOG_DISPATCHER,
- "from create to end",
+ "total process time",
TIME_SCALE,
true,
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 58e51e6..9c5c1ab 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -35,7 +35,7 @@ import org.apache.iotdb.cluster.exception.EmptyIntervalException;
import org.apache.iotdb.cluster.exception.LogExecutionException;
import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
-import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.VotingLog;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
@@ -230,7 +230,7 @@ public class MetaGroupMemberTest extends BaseMember {
}
@Override
- protected AppendLogResult sendLogToFollowers(Log log) {
+ protected AppendLogResult sendLogToFollowers(VotingLog log) {
return AppendLogResult.OK;
}