You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/04/20 02:20:41 UTC
[iotdb] 01/01: add dynamic thread group
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch ISSUE_5792
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 83d6cb5ac72e563204b61a7ee9054a7195e0d33e
Author: Tian Jiang <jt...@163.com>
AuthorDate: Thu Apr 20 10:23:07 2023 +0800
add dynamic thread group
---
.../commons/concurrent/dynamic/DynamicThread.java | 97 ++++++++++++++++++++++
.../concurrent/dynamic/DynamicThreadGroup.java | 77 +++++++++++++++++
2 files changed, 174 insertions(+)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThread.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThread.java
new file mode 100644
index 0000000000..4620de5159
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThread.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.concurrent.dynamic;
+
+/**
+ * DynamicThread record the idle time and running time of the thread and trigger addThread() or
+ * onThreadExit() in DynamicThreadGroup to change the number of threads in a thread group
+ * dynamically. IMPORTANT: the implementation must call idleToRunning(), runningToIdle(), and
+ * shouldExit() properly in runInternal().
+ */
+public abstract class DynamicThread implements Runnable {
+
+ private DynamicThreadGroup threadGroup;
+ private long idleStart;
+ private long runningStart;
+ private long idleTimeSum;
+ private long runningTimeSum;
+ // TODO: add configuration for the two values
+ private double maximumIdleRatio = 0.5;
+ private double minimumIdleRatio = 0.1;
+
+ /**
+ * The implementation must call idleToRunning() and runningToIdle() properly. E.g., {
+ * {@code {while(! Thread.interrupted ()) { Object obj = blockingQueue.poll(); idleToRunning();
+ * process(obj); RunningToIdle(); if (shouldExit()) { return; } }
+ *
+ * @<code>}
+ */
+ public abstract void runInternal();
+
+ protected void idleToRunning() {
+ if (idleStart != 0) {
+ long idleTime = System.nanoTime() - idleStart;
+ idleTimeSum += idleTime;
+ }
+ runningStart = System.nanoTime();
+ }
+
+ protected void runningToIdle() {
+ if (runningStart != 0) {
+ long runningTime = System.nanoTime() - runningStart;
+ runningTimeSum += runningTime;
+ }
+ idleStart = System.nanoTime();
+ }
+
+ protected double idleRatio() {
+ return idleTimeSum * 1.0 / (idleTimeSum + runningTimeSum);
+ }
+
+ protected boolean shouldExit() {
+ double idleRatio = idleRatio();
+ if (idleRatio < minimumIdleRatio) {
+ // Thread too busy, try adding a new thread
+ threadGroup.addThread();
+ return false;
+ } else if (idleRatio > maximumIdleRatio) {
+ // Thread too idle, exit if there is still enough threads
+ int afterCnt = threadGroup.getThreadCnt().decrementAndGet();
+ if (afterCnt >= threadGroup.getMinThreadCnt()) {
+ // notice that onThreadExit() will also decrease the counter, so we add it back here to avoid
+ // the counter being decreased twice
+ threadGroup.getThreadCnt().incrementAndGet();
+ return true;
+ } else {
+ threadGroup.getThreadCnt().incrementAndGet();
+ return false;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void run() {
+ try {
+ runInternal();
+ } finally {
+ threadGroup.onThreadExit(this);
+ }
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
new file mode 100644
index 0000000000..7216437ea9
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.concurrent.dynamic;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+public class DynamicThreadGroup {
+
+ private ExecutorService hostingPool;
+ private String name;
+ private Supplier<DynamicThread> threadFactory;
+ private AtomicInteger threadCnt = new AtomicInteger();
+ private int minThreadCnt = 1;
+ private int maxThreadCnt = 8;
+ private Map<DynamicThread, Future<?>> threadFutureMap = new ConcurrentHashMap<>();
+
+ public DynamicThreadGroup(String name, ExecutorService hostingPool,
+ Supplier<DynamicThread> threadFactory, int minThreadCnt, int maxThreadCnt) {
+ this.name = name;
+ this.hostingPool = hostingPool;
+ this.threadFactory = threadFactory;
+ this.minThreadCnt = Math.max(1, minThreadCnt);
+ this.maxThreadCnt = Math.max(this.minThreadCnt, maxThreadCnt);
+ }
+
+ /**
+ * Add a thread to this group if the number of threads does not reach the maximum.
+ */
+ public void addThread() {
+ int afterCnt = threadCnt.incrementAndGet();
+ if (afterCnt <= maxThreadCnt) {
+ DynamicThread dynamicThread = threadFactory.get();
+ Future<?> submit = hostingPool.submit(dynamicThread);
+ threadFutureMap.put(dynamicThread, submit);
+ } else {
+ threadCnt.decrementAndGet();
+ }
+ }
+
+ public AtomicInteger getThreadCnt() {
+ return threadCnt;
+ }
+
+ public int getMinThreadCnt() {
+ return minThreadCnt;
+ }
+
+ /**
+ * Remove a thread from the group when it exits.
+ * @param dynamicThread exiting thread
+ */
+ public void onThreadExit(DynamicThread dynamicThread) {
+ threadCnt.decrementAndGet();
+ threadFutureMap.remove(dynamicThread);
+ }
+}