You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/12/01 01:16:03 UTC

[iotdb] 03/09: add async sequencer

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6df34d6e339a6a70481e54470b618e8995e5d16e
Author: jt <jt...@163.com>
AuthorDate: Mon Nov 1 15:08:44 2021 +0800

    add async sequencer
---
 .../log/sequencing/AsynchronousSequencer.java      | 92 +++++++++++++++++++++-
 1 file changed, 91 insertions(+), 1 deletion(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
index 1e089c6..eb33bea 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
@@ -19,20 +19,110 @@
 
 package org.apache.iotdb.cluster.log.sequencing;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
+import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class AsynchronousSequencer implements LogSequencer{
+public class AsynchronousSequencer implements LogSequencer {
+
+  private static final Logger logger = LoggerFactory.getLogger(AsynchronousSequencer.class);
+  private static final ExecutorService SEQUENCER_POOL =
+      IoTDBThreadPoolFactory.newCachedThreadPool("SequencerPool");
+  private static final int SEQUENCER_PARALLELISM = 4;
 
   private RaftMember member;
   private RaftLogManager logManager;
 
+  private BlockingQueue<SendLogRequest> unsequencedLogQueue;
+
   public AsynchronousSequencer(RaftMember member,
       RaftLogManager logManager) {
     this.member = member;
     this.logManager = logManager;
+    unsequencedLogQueue = new ArrayBlockingQueue<>(4096);
+    for (int i = 0; i < SEQUENCER_PARALLELISM; i++) {
+      SEQUENCER_POOL.submit(this::sequenceTask);
+    }
+  }
+
+  public SendLogRequest enqueueSendLogRequest(Log log) {
+    AtomicInteger voteCounter = new AtomicInteger(member.getAllNodes().size() / 2);
+    AtomicBoolean leaderShipStale = new AtomicBoolean(false);
+    AtomicLong newLeaderTerm = new AtomicLong(member.getTerm().get());
+
+    SendLogRequest request = new SendLogRequest(log, voteCounter, leaderShipStale, newLeaderTerm,
+        null);
+    try {
+      unsequencedLogQueue.put(request);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      logger.warn("Interrupted while putting {}", log);
+    }
+    return request;
+  }
+
+  private void sequenceLogs(List<SendLogRequest> sendLogRequests) {
+    long startTime;
+    synchronized (logManager) {
+      for (SendLogRequest sendLogRequest : sendLogRequests) {
+        Log log = sendLogRequest.getLog();
+        log.setCurrLogTerm(member.getTerm().get());
+        log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
+        if (log instanceof PhysicalPlanLog) {
+          ((PhysicalPlanLog) log).getPlan().setIndex(log.getCurrLogIndex());
+        }
+
+        startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
+        // just like processPlanLocally,we need to check the size of log
+
+        // logDispatcher will serialize log, and set log size, and we will use the size after it
+        logManager.append(log);
+        Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.calOperationCostTimeFromStart(startTime);
+
+        AppendEntryRequest appendEntryRequest = member.buildAppendEntryRequest(log, false);
+        sendLogRequest.setAppendEntryRequest(appendEntryRequest);
+
+        startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
+        log.setCreateTime(System.nanoTime());
+        member.getLogDispatcher().offer(sendLogRequest);
+        Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
+      }
+    }
+    sendLogRequests.clear();
+  }
+
+  private void sequenceTask() {
+    List<SendLogRequest> sendLogRequests = new ArrayList<>();
+    while (!Thread.interrupted()) {
+      try {
+        synchronized (unsequencedLogQueue) {
+          SendLogRequest request = unsequencedLogQueue.take();
+          sendLogRequests.add(request);
+          unsequencedLogQueue.drainTo(sendLogRequests);
+        }
+
+        sequenceLogs(sendLogRequests);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return;
+      }
+    }
   }
 
   @Override