You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/04/24 04:46:50 UTC

[iotdb] branch native_raft updated: use DynamicThread

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new e348f39cda use DynamicThread
e348f39cda is described below

commit e348f39cdac7b20771dd587e826f2b0def0e34ce
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon Apr 24 12:49:16 2023 +0800

    use DynamicThread
---
 .../protocol/log/dispatch/DispatcherGroup.java     | 44 ++++-------
 .../protocol/log/dispatch/DispatcherThread.java    | 52 +++----------
 .../manager/serialization/StableEntryManager.java  |  1 +
 .../serialization/SyncLogDequeSerializer.java      | 90 ++++++++--------------
 4 files changed, 54 insertions(+), 133 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
index e953d1bc0a..1fef54f95d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
@@ -19,21 +19,20 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
 
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.dynamic.DynamicThreadGroup;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
-
 import org.apache.ratis.thirdparty.com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
 public class DispatcherGroup {
+
   private static final Logger logger = LoggerFactory.getLogger(DispatcherGroup.class);
   private final Peer peer;
   private final BlockingQueue<VotingEntry> entryQueue;
@@ -41,9 +40,8 @@ public class DispatcherGroup {
   private final RateLimiter rateLimiter;
   private final ExecutorService dispatcherThreadPool;
   private final LogDispatcher logDispatcher;
-  private final AtomicInteger groupThreadNum = new AtomicInteger();
-  private int maxBindingThreadNum;
   private boolean delayed;
+  private DynamicThreadGroup dynamicThreadGroup;
 
   public DispatcherGroup(Peer peer, LogDispatcher logDispatcher, int maxBindingThreadNum) {
     this.logDispatcher = logDispatcher;
@@ -51,11 +49,10 @@ public class DispatcherGroup {
     this.entryQueue = new ArrayBlockingQueue<>(logDispatcher.getConfig().getMaxNumOfLogsInMem());
     this.nodeEnabled = true;
     this.rateLimiter = RateLimiter.create(Double.MAX_VALUE);
-    this.maxBindingThreadNum = maxBindingThreadNum;
     this.dispatcherThreadPool = createPool(peer, logDispatcher.getMember().getName());
-    for (int i = 0; i < maxBindingThreadNum; i++) {
-      addThread();
-    }
+    this.dynamicThreadGroup = new DynamicThreadGroup(logDispatcher.member.getName() + "-" + peer,
+        dispatcherThreadPool::submit, () -> newDispatcherThread(peer, entryQueue, rateLimiter), 1,
+        maxBindingThreadNum);
   }
 
   public void close() {
@@ -72,15 +69,6 @@ public class DispatcherGroup {
     }
   }
 
-  public void addThread() {
-    int threadNum = groupThreadNum.incrementAndGet();
-    if (threadNum <= maxBindingThreadNum) {
-      dispatcherThreadPool.submit(newDispatcherThread(peer, entryQueue, rateLimiter));
-    } else {
-      groupThreadNum.decrementAndGet();
-    }
-  }
-
   DispatcherThread newDispatcherThread(
       Peer node, BlockingQueue<VotingEntry> logBlockingQueue, RateLimiter rateLimiter) {
     return new DispatcherThread(logDispatcher, node, logBlockingQueue, rateLimiter, this);
@@ -115,14 +103,6 @@ public class DispatcherGroup {
     return entryQueue;
   }
 
-  public AtomicInteger getGroupThreadNum() {
-    return groupThreadNum;
-  }
-
-  public int getMaxBindingThreadNum() {
-    return maxBindingThreadNum;
-  }
-
   public boolean isDelayed() {
     return delayed;
   }
@@ -130,4 +110,8 @@ public class DispatcherGroup {
   public void setDelayed(boolean delayed) {
     this.delayed = delayed;
   }
+
+  public DynamicThreadGroup getDynamicThreadGroup() {
+    return dynamicThreadGroup;
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
index e571d57cb4..31f71707cc 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
 
+import org.apache.iotdb.commons.concurrent.dynamic.DynamicThread;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
@@ -42,7 +43,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 
-class DispatcherThread implements Runnable {
+class DispatcherThread extends DynamicThread {
 
   private static final Logger logger = LoggerFactory.getLogger(DispatcherThread.class);
 
@@ -53,8 +54,6 @@ class DispatcherThread implements Runnable {
   private final String baseName;
   private final RateLimiter rateLimiter;
   private final DispatcherGroup group;
-  private long idleTimeSum;
-  private long runningTimeSum;
   private long lastDispatchTime;
 
   protected DispatcherThread(
@@ -63,6 +62,7 @@ class DispatcherThread implements Runnable {
       BlockingQueue<VotingEntry> logBlockingDeque,
       RateLimiter rateLimiter,
       DispatcherGroup group) {
+    super(group.getDynamicThreadGroup());
     this.logDispatcher = logDispatcher;
     this.receiver = receiver;
     this.logBlockingDeque = logBlockingDeque;
@@ -72,13 +72,11 @@ class DispatcherThread implements Runnable {
   }
 
   @Override
-  public void run() {
+  public void runInternal() {
     if (logger.isDebugEnabled()) {
       Thread.currentThread().setName(baseName);
     }
     try {
-      long idleStart = System.nanoTime();
-      long runningStart = 0;
       while (!Thread.interrupted()) {
         if (group.isDelayed()) {
           if (logBlockingDeque.size() < logDispatcher.maxBatchSize
@@ -100,9 +98,7 @@ class DispatcherThread implements Runnable {
             continue;
           }
         }
-        long currTime = System.nanoTime();
-        idleTimeSum += currTime - idleStart;
-        runningStart = currTime;
+       idleToRunning();
         if (logger.isDebugEnabled()) {
           logger.debug("Sending {} logs to {}", currBatch.size(), receiver);
         }
@@ -113,33 +109,11 @@ class DispatcherThread implements Runnable {
         }
         sendLogs(currBatch);
         currBatch.clear();
+        lastDispatchTime = System.nanoTime();
+        runningToIdle();
 
-        currTime = System.nanoTime();
-        lastDispatchTime = currTime;
-        runningTimeSum += currTime - runningStart;
-        idleStart = currTime;
-
-        // thread too idle
-        if (idleTimeSum * 1.0 / (idleTimeSum + runningTimeSum) > 0.5
-            && runningTimeSum > 10_000_000_000L) {
-          int remaining = group.getGroupThreadNum().decrementAndGet();
-          if (remaining > 1) {
-            logger.info("Dispatcher thread too idle");
-            group.getGroupThreadNum().incrementAndGet();
-            break;
-          } else {
-            group.getGroupThreadNum().incrementAndGet();
-          }
-          // thread too busy
-        } else if (idleTimeSum * 1.0 / (idleTimeSum + runningTimeSum) < 0.1
-            && runningTimeSum > 10_000_000_000L) {
-          int groupThreadNum = group.getGroupThreadNum().get();
-          if (groupThreadNum < group.getMaxBindingThreadNum()) {
-            group.addThread();
-          }
-          // avoid frequent change
-          runningTimeSum = 0;
-          idleTimeSum = 0;
+        if (shouldExit()) {
+          break;
         }
       }
     } catch (InterruptedException e) {
@@ -147,14 +121,6 @@ class DispatcherThread implements Runnable {
     } catch (Exception e) {
       logger.error("Unexpected error in log dispatcher", e);
     }
-    if (runningTimeSum > 0) {
-      logger.info(
-          "Dispatcher exits, idle ratio: {}, running time: {}ms, idle time: {}ms, remaining threads: {}",
-          idleTimeSum * 1.0 / (idleTimeSum + runningTimeSum),
-          runningTimeSum / 1_000_000L,
-          idleTimeSum / 1_000_000L,
-          group.getGroupThreadNum().decrementAndGet());
-    }
   }
 
   protected void serializeEntries() throws InterruptedException {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
index 92e7603036..a57a8a0774 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
@@ -45,6 +45,7 @@ public interface StableEntryManager {
   HardState getHardState();
 
   void updateMeta(long commitIndex, long applyIndex);
+
   LogManagerMeta getMeta();
 
   /**
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index 90ef700867..751e0ed510 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -71,14 +71,10 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   private static final String LOG_DATA_FILE_SUFFIX = "data";
   private static final String LOG_INDEX_FILE_SUFFIX = "idx";
 
-  /**
-   * the log data files
-   */
+  /** the log data files */
   private List<File> logDataFileList;
 
-  /**
-   * the log index files
-   */
+  /** the log index files */
   private List<IndexFileDescriptor> logIndexFileList;
 
   private LogParser parser = LogParser.getINSTANCE();
@@ -88,14 +84,10 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   private LogManagerMeta meta;
   private HardState state;
 
-  /**
-   * min version of available log
-   */
+  /** min version of available log */
   private long minAvailableVersion = 0;
 
-  /**
-   * max version of available log
-   */
+  /** max version of available log */
   private long maxAvailableVersion = Long.MAX_VALUE;
 
   private String logDir;
@@ -150,9 +142,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
 
   private static final int LOG_DELETE_CHECK_INTERVAL_SECOND = 5;
 
-  /**
-   * the lock uses when change the log data files or log index files
-   */
+  /** the lock uses when change the log data files or log index files */
   private final Lock lock = new ReentrantLock();
 
   private volatile boolean isClosed = false;
@@ -211,9 +201,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     return systemDir + File.separator + groupId + File.separator + "raftLog" + File.separator;
   }
 
-  /**
-   * for log tools
-   */
+  /** for log tools */
   @Override
   public LogManagerMeta getMeta() {
     return meta;
@@ -225,9 +213,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     meta.setLastAppliedIndex(applyIndex);
   }
 
-  /**
-   * Recover all the logs in disk. This function will be called once this instance is created.
-   */
+  /** Recover all the logs in disk. This function will be called once this instance is created. */
   @Override
   public List<Entry> getAllEntriesAfterAppliedIndex() {
     logger.debug(
@@ -241,8 +227,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
    * is uncommitted for persistent LogManagerMeta(meta's info is stale).We need to recover these
    * already persistent logs.
    *
-   * <p>For example,commitIndex is 5 in persistent LogManagerMeta,But the log file has actually
-   * been
+   * <p>For example,commitIndex is 5 in persistent LogManagerMeta,But the log file has actually been
    * flushed to 7,when we restart cluster,we need to recover 6 and 7.
    *
    * <p>Maybe,we can extract getAllEntriesAfterAppliedIndex and getAllEntriesAfterCommittedIndex
@@ -393,11 +378,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
       try {
         checkStream();
         // 1. write to the log data file
-        byte[] compressed =
-            compressor.compress(
-                logDataBuffer.array(),
-                0,
-                logDataBuffer.position());
+        byte[] compressed = compressor.compress(logDataBuffer.array(), 0, logDataBuffer.position());
         ReadWriteIOUtils.write(compressed.length, currentLogDataOutputStream);
         currentLogDataOutputStream.write(compressed);
         logIndexOffsetList.add(new Pair<>(lastLogIndex, offsetOfTheCurrentLogDataOutputStream));
@@ -445,9 +426,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     }
   }
 
-  /**
-   * flush the log buffer and check if the file needs to be closed
-   */
+  /** flush the log buffer and check if the file needs to be closed */
   @Override
   public void forceFlushLogBuffer() {
     lock.lock();
@@ -491,9 +470,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     }
   }
 
-  /**
-   * The file name rules are as follows: ${startLogIndex}-${endLogIndex}-${version}.data
-   */
+  /** The file name rules are as follows: ${startLogIndex}-${endLogIndex}-${version}.data */
   private void recoverLogFiles() {
     // 1. first we should recover the log index file
     recoverLogFiles(LOG_INDEX_FILE_SUFFIX);
@@ -540,9 +517,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   /**
    * Check that the file is legal or not
    *
-   * @param file     file needs to be check
-   * @param fileType {@link SyncLogDequeSerializer#LOG_DATA_FILE_SUFFIX} or
-   *                 {@link SyncLogDequeSerializer#LOG_INDEX_FILE_SUFFIX}
+   * @param file file needs to be check
+   * @param fileType {@link SyncLogDequeSerializer#LOG_DATA_FILE_SUFFIX} or {@link
+   *     SyncLogDequeSerializer#LOG_INDEX_FILE_SUFFIX}
    * @return true if the file legal otherwise false
    */
   private boolean checkLogFile(File file, String fileType) {
@@ -770,9 +747,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     }
   }
 
-  /**
-   * for unclosed file, the file name is ${startIndex}-${Long.MAX_VALUE}-{version}
-   */
+  /** for unclosed file, the file name is ${startIndex}-${Long.MAX_VALUE}-{version} */
   private void createNewLogFile(String dirName, long startLogIndex) throws IOException {
     lock.lock();
     try {
@@ -1117,7 +1092,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
 
   /**
    * @param startIndex the log start index
-   * @param endIndex   the log end index
+   * @param endIndex the log end index
    * @return the raft log which index between [startIndex, endIndex] or empty if not found
    */
   @Override
@@ -1222,9 +1197,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
 
   /**
    * @param startIndex the log start index
-   * @param endIndex   the log end index
+   * @param endIndex the log end index
    * @return first value-> the log data file, second value-> the left value is the start offset of
-   * the file, the right is the end offset of the file
+   *     the file, the right is the end offset of the file
    */
   private List<Pair<File, Pair<Long, Long>>> getLogDataFileAndOffset(
       long startIndex, long endIndex) {
@@ -1272,8 +1247,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     if (logDataFileWithStartAndEndLogIndex != null) {
       // this means the endIndex's offset can not be found in the file
       // logDataFileWithStartAndEndLogIndex.left
-      long endOffset = getOffsetAccordingToLogIndex(
-          Math.min(endIndex, logDataFileWithStartAndEndLogIndex.right.right));
+      long endOffset =
+          getOffsetAccordingToLogIndex(
+              Math.min(endIndex, logDataFileWithStartAndEndLogIndex.right.right));
       fileNameWithStartAndEndOffset.add(
           new Pair<>(logDataFileWithStartAndEndLogIndex.left, new Pair<>(startOffset, endOffset)));
       logger.debug(
@@ -1290,8 +1266,8 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   /**
    * @param startIndex the start log index
    * @return the first value of the pair is the log index file which contains the start index; the
-   * second pair's first value is the file's start log index. the second pair's second value is the
-   * file's end log index. null if not found
+   *     second pair's first value is the file's start log index. the second pair's second value is
+   *     the file's end log index. null if not found
    */
   public IndexFileDescriptor getLogIndexFile(long startIndex) {
     for (IndexFileDescriptor descriptor : logIndexFileList) {
@@ -1306,8 +1282,8 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   /**
    * @param startIndex the start log index
    * @return the first value of the pair is the log data file which contains the start index; the
-   * second pair's first value is the file's start log index. the second pair's second value is the
-   * file's end log index. null if not found
+   *     second pair's first value is the file's start log index. the second pair's second value is
+   *     the file's end log index. null if not found
    */
   public Pair<File, Pair<Long, Long>> getLogDataFile(long startIndex) {
     for (File file : logDataFileList) {
@@ -1326,9 +1302,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   }
 
   /**
-   * @param file              the log data file
+   * @param file the log data file
    * @param startAndEndOffset the left value is the start offset of the file, the right is the end
-   *                          offset of the file
+   *     offset of the file
    * @return the logs between start offset and end offset
    */
   private List<Entry> getLogsFromOneLogDataFile(File file, Pair<Long, Long> startAndEndOffset) {
@@ -1421,9 +1397,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
        * automatically increased by saveInterval to avoid conflicts.
        */
       private static long saveInterval = 100;
-      /**
-       * time partition id to dividing time series into different storage group
-       */
+      /** time partition id to dividing time series into different storage group */
       private long timePartitionId;
 
       private long prevVersion;
@@ -1437,9 +1411,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
         restore();
       }
 
-      /**
-       * only used for upgrading
-       */
+      /** only used for upgrading */
       public SimpleFileVersionController(String directoryPath) throws IOException {
         this.directoryPath = directoryPath + File.separator + UPGRADE_DIR;
         restore();
@@ -1502,9 +1474,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
         prevVersion = currVersion;
       }
 
-      /**
-       * recovery from disk
-       */
+      /** recovery from disk */
       @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
       private void restore() throws IOException {
         File directory = SystemFileFactory.INSTANCE.getFile(directoryPath);