You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/12/01 01:16:03 UTC
[iotdb] 03/09: add async sequencer
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6df34d6e339a6a70481e54470b618e8995e5d16e
Author: jt <jt...@163.com>
AuthorDate: Mon Nov 1 15:08:44 2021 +0800
add async sequencer
---
.../log/sequencing/AsynchronousSequencer.java | 92 +++++++++++++++++++++-
1 file changed, 91 insertions(+), 1 deletion(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
index 1e089c6..eb33bea 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
@@ -19,20 +19,110 @@
package org.apache.iotdb.cluster.log.sequencing;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
+import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class AsynchronousSequencer implements LogSequencer{
+public class AsynchronousSequencer implements LogSequencer {
+
+ private static final Logger logger = LoggerFactory.getLogger(AsynchronousSequencer.class);
+ private static final ExecutorService SEQUENCER_POOL =
+ IoTDBThreadPoolFactory.newCachedThreadPool("SequencerPool");
+ private static final int SEQUENCER_PARALLELISM = 4;
private RaftMember member;
private RaftLogManager logManager;
+ private BlockingQueue<SendLogRequest> unsequencedLogQueue;
+
public AsynchronousSequencer(RaftMember member,
RaftLogManager logManager) {
this.member = member;
this.logManager = logManager;
+ unsequencedLogQueue = new ArrayBlockingQueue<>(4096);
+ for (int i = 0; i < SEQUENCER_PARALLELISM; i++) {
+ SEQUENCER_POOL.submit(this::sequenceTask);
+ }
+ }
+
+ public SendLogRequest enqueueSendLogRequest(Log log) {
+ AtomicInteger voteCounter = new AtomicInteger(member.getAllNodes().size() / 2);
+ AtomicBoolean leaderShipStale = new AtomicBoolean(false);
+ AtomicLong newLeaderTerm = new AtomicLong(member.getTerm().get());
+
+ SendLogRequest request = new SendLogRequest(log, voteCounter, leaderShipStale, newLeaderTerm,
+ null);
+ try {
+ unsequencedLogQueue.put(request);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.warn("Interrupted while putting {}", log);
+ }
+ return request;
+ }
+
+ private void sequenceLogs(List<SendLogRequest> sendLogRequests) {
+ long startTime;
+ synchronized (logManager) {
+ for (SendLogRequest sendLogRequest : sendLogRequests) {
+ Log log = sendLogRequest.getLog();
+ log.setCurrLogTerm(member.getTerm().get());
+ log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
+ if (log instanceof PhysicalPlanLog) {
+ ((PhysicalPlanLog) log).getPlan().setIndex(log.getCurrLogIndex());
+ }
+
+ startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
+ // just like processPlanLocally,we need to check the size of log
+
+ // logDispatcher will serialize log, and set log size, and we will use the size after it
+ logManager.append(log);
+ Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.calOperationCostTimeFromStart(startTime);
+
+ AppendEntryRequest appendEntryRequest = member.buildAppendEntryRequest(log, false);
+ sendLogRequest.setAppendEntryRequest(appendEntryRequest);
+
+ startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
+ log.setCreateTime(System.nanoTime());
+ member.getLogDispatcher().offer(sendLogRequest);
+ Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
+ }
+ }
+ sendLogRequests.clear();
+ }
+
+ private void sequenceTask() {
+ List<SendLogRequest> sendLogRequests = new ArrayList<>();
+ while (!Thread.interrupted()) {
+ try {
+ synchronized (unsequencedLogQueue) {
+ SendLogRequest request = unsequencedLogQueue.take();
+ sendLogRequests.add(request);
+ unsequencedLogQueue.drainTo(sendLogRequests);
+ }
+
+ sequenceLogs(sendLogRequests);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
}
@Override