You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/04/20 02:20:41 UTC

[iotdb] 01/01: add dynamic thread group

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch ISSUE_5792
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 83d6cb5ac72e563204b61a7ee9054a7195e0d33e
Author: Tian Jiang <jt...@163.com>
AuthorDate: Thu Apr 20 10:23:07 2023 +0800

    add dynamic thread group
---
 .../commons/concurrent/dynamic/DynamicThread.java  | 97 ++++++++++++++++++++++
 .../concurrent/dynamic/DynamicThreadGroup.java     | 77 +++++++++++++++++
 2 files changed, 174 insertions(+)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThread.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThread.java
new file mode 100644
index 0000000000..4620de5159
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThread.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.concurrent.dynamic;
+
+/**
+ * DynamicThread record the idle time and running time of the thread and trigger addThread() or
+ * onThreadExit() in DynamicThreadGroup to change the number of threads in a thread group
+ * dynamically. IMPORTANT: the implementation must call idleToRunning(), runningToIdle(), and
+ * shouldExit() properly in runInternal().
+ */
+public abstract class DynamicThread implements Runnable {
+
+  private DynamicThreadGroup threadGroup;
+  private long idleStart;
+  private long runningStart;
+  private long idleTimeSum;
+  private long runningTimeSum;
+  // TODO: add configuration for the two values
+  private double maximumIdleRatio = 0.5;
+  private double minimumIdleRatio = 0.1;
+
+  /**
+   * The implementation must call idleToRunning() and runningToIdle() properly. E.g., {
+   * {@code {while(! Thread.interrupted ()) { Object obj = blockingQueue.poll(); idleToRunning();
+   * process(obj); RunningToIdle(); if (shouldExit()) { return; } }
+   *
+   * @<code>}
+   */
+  public abstract void runInternal();
+
+  protected void idleToRunning() {
+    if (idleStart != 0) {
+      long idleTime = System.nanoTime() - idleStart;
+      idleTimeSum += idleTime;
+    }
+    runningStart = System.nanoTime();
+  }
+
+  protected void runningToIdle() {
+    if (runningStart != 0) {
+      long runningTime = System.nanoTime() - runningStart;
+      runningTimeSum += runningTime;
+    }
+    idleStart = System.nanoTime();
+  }
+
+  protected double idleRatio() {
+    return idleTimeSum * 1.0 / (idleTimeSum + runningTimeSum);
+  }
+
+  protected boolean shouldExit() {
+    double idleRatio = idleRatio();
+    if (idleRatio < minimumIdleRatio) {
+      // Thread too busy, try adding a new thread
+      threadGroup.addThread();
+      return false;
+    } else if (idleRatio > maximumIdleRatio) {
+      // Thread too idle, exit if there is still enough threads
+      int afterCnt = threadGroup.getThreadCnt().decrementAndGet();
+      if (afterCnt >= threadGroup.getMinThreadCnt()) {
+        // notice that onThreadExit() will also decrease the counter, so we add it back here to avoid
+        // the counter being decreased twice
+        threadGroup.getThreadCnt().incrementAndGet();
+        return true;
+      } else {
+        threadGroup.getThreadCnt().incrementAndGet();
+        return false;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public void run() {
+    try {
+      runInternal();
+    } finally {
+      threadGroup.onThreadExit(this);
+    }
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
new file mode 100644
index 0000000000..7216437ea9
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.concurrent.dynamic;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+public class DynamicThreadGroup {
+
+  private ExecutorService hostingPool;
+  private String name;
+  private Supplier<DynamicThread> threadFactory;
+  private AtomicInteger threadCnt = new AtomicInteger();
+  private int minThreadCnt = 1;
+  private int maxThreadCnt = 8;
+  private Map<DynamicThread, Future<?>> threadFutureMap = new ConcurrentHashMap<>();
+
+  public DynamicThreadGroup(String name, ExecutorService hostingPool,
+      Supplier<DynamicThread> threadFactory, int minThreadCnt, int maxThreadCnt) {
+    this.name = name;
+    this.hostingPool = hostingPool;
+    this.threadFactory = threadFactory;
+    this.minThreadCnt = Math.max(1, minThreadCnt);
+    this.maxThreadCnt = Math.max(this.minThreadCnt, maxThreadCnt);
+  }
+
+  /**
+   * Add a thread to this group if the number of threads does not reach the maximum.
+   */
+  public void addThread() {
+    int afterCnt = threadCnt.incrementAndGet();
+    if (afterCnt <= maxThreadCnt) {
+      DynamicThread dynamicThread = threadFactory.get();
+      Future<?> submit = hostingPool.submit(dynamicThread);
+      threadFutureMap.put(dynamicThread, submit);
+    } else {
+      threadCnt.decrementAndGet();
+    }
+  }
+
+  public AtomicInteger getThreadCnt() {
+    return threadCnt;
+  }
+
+  public int getMinThreadCnt() {
+    return minThreadCnt;
+  }
+
+  /**
+   * Remove a thread from the group when it exits.
+   * @param dynamicThread exiting thread
+   */
+  public void onThreadExit(DynamicThread dynamicThread) {
+    threadCnt.decrementAndGet();
+    threadFutureMap.remove(dynamicThread);
+  }
+}