You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/04/26 02:14:08 UTC
[iotdb] branch native_raft updated: refactor log flush with dual buffer
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 2c2787fd85 refactor log flush with dual buffer
2c2787fd85 is described below
commit 2c2787fd8532b0d8992fee07fe2ffdfcc98b64f7
Author: Tian Jiang <jt...@163.com>
AuthorDate: Wed Apr 26 10:16:34 2023 +0800
refactor log flush with dual buffer
---
.../consensus/natraft/protocol/RaftMember.java | 1 +
.../protocol/log/manager/RaftLogManager.java | 4 +
.../manager/serialization/StableEntryManager.java | 2 +
.../serialization/SyncLogDequeSerializer.java | 209 ++++++++++++++-------
.../iotdb/consensus/natraft/utils/NodeReport.java | 5 +
.../commons/concurrent/dynamic/DynamicThread.java | 4 +-
6 files changed, 158 insertions(+), 67 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index 1c2711e184..f7f58b5721 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -1384,6 +1384,7 @@ public class RaftMember {
lastReportIndex,
logManager.getCommitLogIndex(),
logManager.getCommitLogTerm(),
+ logManager.getPersistedLogIndex(),
readOnly,
heartbeatThread.getLastHeartbeatReceivedTime(),
prevLastLogIndex,
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index d73613c7a3..1aa12cbe6b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -973,4 +973,8 @@ public abstract class RaftLogManager {
public ReentrantReadWriteLock getLock() {
return lock;
}
+
+ public long getPersistedLogIndex() {
+ return stableEntryManager.getPersistedLogIndex();
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
index a57a8a0774..f1a7e6ebfb 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
@@ -62,4 +62,6 @@ public interface StableEntryManager {
* should be cleaned
*/
void clearAllLogs(long commitIndex);
+
+ long getPersistedLogIndex();
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index d4b3240491..59947f3ff7 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -18,6 +18,10 @@
*/
package org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.file.SystemFileFactory;
@@ -71,10 +75,14 @@ public class SyncLogDequeSerializer implements StableEntryManager {
private static final String LOG_DATA_FILE_SUFFIX = "data";
private static final String LOG_INDEX_FILE_SUFFIX = "idx";
- /** the log data files */
+ /**
+ * the log data files
+ */
private List<File> logDataFileList;
- /** the log index files */
+ /**
+ * the log index files
+ */
private List<IndexFileDescriptor> logIndexFileList;
private LogParser parser = LogParser.getINSTANCE();
@@ -83,11 +91,16 @@ public class SyncLogDequeSerializer implements StableEntryManager {
private FileOutputStream currentLogIndexOutputStream;
private LogManagerMeta meta;
private HardState state;
+ private String name;
- /** min version of available log */
+ /**
+ * min version of available log
+ */
private long minAvailableVersion = 0;
- /** max version of available log */
+ /**
+ * max version of available log
+ */
private long maxAvailableVersion = Long.MAX_VALUE;
private String logDir;
@@ -96,6 +109,8 @@ public class SyncLogDequeSerializer implements StableEntryManager {
private ByteBuffer logDataBuffer;
private ByteBuffer logIndexBuffer;
+ private ByteBuffer flushingLogDataBuffer;
+ private ByteBuffer flushingLogIndexBuffer;
private long offsetOfTheCurrentLogDataOutputStream = 0;
@@ -121,8 +136,10 @@ public class SyncLogDequeSerializer implements StableEntryManager {
private int maxPersistRaftLogNumberOnDisk;
- private ScheduledExecutorService persistLogDeleteExecutorService;
+ private ScheduledExecutorService persistLogExecutorService;
private ScheduledFuture<?> persistLogDeleteLogFuture;
+ private ExecutorService flushingLogExecutorService;
+ private volatile Future<?> flushingLogFuture;
/**
* indicate the first raft log's index of {@link SyncLogDequeSerializer#logIndexOffsetList}, for
@@ -133,6 +150,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
private long firstLogIndex = 0;
private long lastLogIndex = 0;
+ private long persistedLogIndex = 0;
/**
* the index and file offset of the log, for example, the first pair is the offset of index
@@ -142,7 +160,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
private static final int LOG_DELETE_CHECK_INTERVAL_SECOND = 5;
- /** the lock uses when change the log data files or log index files */
+ /**
+ * the lock uses when change the log data files or log index files
+ */
private final Lock lock = new ReentrantLock();
private volatile boolean isClosed = false;
@@ -153,6 +173,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
private void initCommonProperties() {
logDataBuffer = ByteBuffer.allocate(config.getRaftLogBufferSize());
logIndexBuffer = ByteBuffer.allocate(config.getRaftLogBufferSize());
+ flushingLogDataBuffer = ByteBuffer.allocate(config.getRaftLogBufferSize());
+ flushingLogIndexBuffer = ByteBuffer.allocate(config.getRaftLogBufferSize());
+
maxNumberOfLogsPerFetchOnDisk = config.getMaxNumberOfLogsPerFetchOnDisk();
maxRaftLogIndexSizeInMemory = config.getMaxRaftLogIndexSizeInMemory();
maxNumberOfPersistRaftLogFiles = config.getMaxNumberOfPersistRaftLogFiles();
@@ -170,18 +193,20 @@ public class SyncLogDequeSerializer implements StableEntryManager {
} catch (IOException e) {
logger.error("log serializer build version controller failed", e);
}
- this.persistLogDeleteExecutorService =
+ this.persistLogExecutorService =
new ScheduledThreadPoolExecutor(
1,
new BasicThreadFactory.Builder()
.namingPattern("persist-log-delete-" + logDir)
.daemon(true)
.build());
+ this.flushingLogExecutorService = Executors.newSingleThreadExecutor(
+ (r) -> new Thread(r, name + "-flushRaftLog"));
this.persistLogDeleteLogFuture =
ScheduledExecutorUtil.safelyScheduleAtFixedRate(
- persistLogDeleteExecutorService,
- this::checkDeletePersistRaftLog,
+ persistLogExecutorService,
+ this::checkPersistRaftLog,
LOG_DELETE_CHECK_INTERVAL_SECOND,
LOG_DELETE_CHECK_INTERVAL_SECOND,
TimeUnit.SECONDS);
@@ -194,6 +219,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
*/
public SyncLogDequeSerializer(ConsensusGroupId groupId, RaftConfig config) {
this.config = config;
+ name = groupId.toString();
logDir = getLogDir(groupId);
initCommonProperties();
initMetaAndLogFiles();
@@ -204,7 +230,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
return systemDir + File.separator + groupId + File.separator + "raftLog" + File.separator;
}
- /** for log tools */
+ /**
+ * for log tools
+ */
@Override
public LogManagerMeta getMeta() {
return meta;
@@ -216,7 +244,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
meta.setLastAppliedIndex(applyIndex);
}
- /** Recover all the logs in disk. This function will be called once this instance is created. */
+ /**
+ * Recover all the logs in disk. This function will be called once this instance is created.
+ */
@Override
public List<Entry> getAllEntriesAfterAppliedIndex() {
logger.debug(
@@ -230,8 +260,8 @@ public class SyncLogDequeSerializer implements StableEntryManager {
* is uncommitted for persistent LogManagerMeta(meta's info is stale).We need to recover these
* already persistent logs.
*
- * <p>For example,commitIndex is 5 in persistent LogManagerMeta,But the log file has actually been
- * flushed to 7,when we restart cluster,we need to recover 6 and 7.
+ * <p>For example,commitIndex is 5 in persistent LogManagerMeta,But the log file has actually
+ * been flushed to 7,when we restart cluster,we need to recover 6 and 7.
*
* <p>Maybe,we can extract getAllEntriesAfterAppliedIndex and getAllEntriesAfterCommittedIndex
* into getAllEntriesByIndex,but now there are too many test cases using it.
@@ -375,37 +405,63 @@ public class SyncLogDequeSerializer implements StableEntryManager {
if (isClosed || logDataBuffer.position() == 0) {
return;
}
- lock.lock();
- try {
- // write into disk
+ if (flushingLogFuture != null) {
try {
- checkStream();
- // 1. write to the log data file
- byte[] compressed = compressor.compress(logDataBuffer.array(), 0, logDataBuffer.position());
- ReadWriteIOUtils.write(compressed.length, currentLogDataOutputStream);
- currentLogDataOutputStream.write(compressed);
- logIndexOffsetList.add(new Pair<>(lastLogIndex, offsetOfTheCurrentLogDataOutputStream));
- logIndexBuffer.putLong(lastLogIndex);
- logIndexBuffer.putLong(offsetOfTheCurrentLogDataOutputStream);
- ReadWriteIOUtils.writeWithoutSize(
- logIndexBuffer, 0, logIndexBuffer.position(), currentLogIndexOutputStream);
- offsetOfTheCurrentLogDataOutputStream += Integer.BYTES + compressed.length;
- if (config.getFlushRaftLogThreshold() == 0) {
- currentLogDataOutputStream.getChannel().force(true);
- currentLogIndexOutputStream.getChannel().force(true);
- }
- } catch (IOException e) {
- logger.error("Error in logs serialization: ", e);
- return;
+ flushingLogFuture.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ logger.error("Unexpected exception when flushing log in {}", name);
+ throw new RuntimeException(e);
}
- logDataBuffer.clear();
- logIndexBuffer.clear();
- logger.debug("End flushing log buffer.");
- } finally {
- lock.unlock();
+ } else {
+ switchBuffer();
+ flushingLogFuture = flushingLogExecutorService.submit(() -> flushLogBufferTask(lastLogIndex));
}
}
+ private void switchBuffer() {
+ ByteBuffer temp = logDataBuffer;
+ logDataBuffer = flushingLogIndexBuffer;
+ flushingLogDataBuffer = temp;
+ temp = logIndexBuffer;
+ logIndexBuffer = flushingLogIndexBuffer;
+ flushingLogIndexBuffer = temp;
+ }
+
+ private void flushLogBufferTask(long currentLastIndex) {
+ // write into disk
+ try {
+ checkStream();
+ // 1. write to the log data file
+ byte[] compressed = compressor.compress(flushingLogDataBuffer.array(), 0,
+ flushingLogDataBuffer.position());
+ ReadWriteIOUtils.write(compressed.length, currentLogDataOutputStream);
+ logIndexOffsetList.add(new Pair<>(lastLogIndex, offsetOfTheCurrentLogDataOutputStream));
+ flushingLogIndexBuffer.putLong(lastLogIndex);
+ flushingLogIndexBuffer.putLong(offsetOfTheCurrentLogDataOutputStream);
+ offsetOfTheCurrentLogDataOutputStream += Integer.BYTES + compressed.length;
+
+ currentLogDataOutputStream.write(compressed);
+ ReadWriteIOUtils.writeWithoutSize(
+ logIndexBuffer, 0, logIndexBuffer.position(), currentLogIndexOutputStream);
+ if (config.getFlushRaftLogThreshold() == 0) {
+ currentLogDataOutputStream.getChannel().force(true);
+ currentLogIndexOutputStream.getChannel().force(true);
+ }
+ persistedLogIndex = currentLastIndex;
+ } catch (IOException e) {
+ logger.error("Error in logs serialization: ", e);
+ return;
+ }
+
+ flushingLogDataBuffer.clear();
+ flushingLogIndexBuffer.clear();
+
+ switchBuffer();
+ logger.debug("End flushing log buffer.");
+ }
+
private void forceFlushLogBufferWithoutCloseFile() {
if (isClosed) {
return;
@@ -429,7 +485,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
}
}
- /** flush the log buffer and check if the file needs to be closed */
+ /**
+ * flush the log buffer and check if the file needs to be closed
+ */
@Override
public void forceFlushLogBuffer() {
lock.lock();
@@ -473,7 +531,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
}
}
- /** The file name rules are as follows: ${startLogIndex}-${endLogIndex}-${version}.data */
+ /**
+ * The file name rules are as follows: ${startLogIndex}-${endLogIndex}-${version}.data
+ */
private void recoverLogFiles() {
// 1. first we should recover the log index file
recoverLogFiles(LOG_INDEX_FILE_SUFFIX);
@@ -520,9 +580,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
/**
* Check that the file is legal or not
*
- * @param file file needs to be check
- * @param fileType {@link SyncLogDequeSerializer#LOG_DATA_FILE_SUFFIX} or {@link
- * SyncLogDequeSerializer#LOG_INDEX_FILE_SUFFIX}
+ * @param file file needs to be check
+ * @param fileType {@link SyncLogDequeSerializer#LOG_DATA_FILE_SUFFIX} or
+ * {@link SyncLogDequeSerializer#LOG_INDEX_FILE_SUFFIX}
* @return true if the file legal otherwise false
*/
private boolean checkLogFile(File file, String fileType) {
@@ -750,7 +810,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
}
}
- /** for unclosed file, the file name is ${startIndex}-${Long.MAX_VALUE}-{version} */
+ /**
+ * for unclosed file, the file name is ${startIndex}-${Long.MAX_VALUE}-{version}
+ */
private void createNewLogFile(String dirName, long startLogIndex) throws IOException {
lock.lock();
try {
@@ -826,6 +888,8 @@ public class SyncLogDequeSerializer implements StableEntryManager {
meta = new LogManagerMeta();
state = new HardState();
}
+
+ persistedLogIndex = meta.getLastLogIndex();
logger.info(
"Recovered log meta: {}, availableVersion: [{},{}], state: {}",
meta,
@@ -877,15 +941,15 @@ public class SyncLogDequeSerializer implements StableEntryManager {
lock.unlock();
}
- if (persistLogDeleteExecutorService != null) {
- persistLogDeleteExecutorService.shutdownNow();
+ if (persistLogExecutorService != null) {
+ persistLogExecutorService.shutdownNow();
persistLogDeleteLogFuture.cancel(true);
try {
- persistLogDeleteExecutorService.awaitTermination(20, TimeUnit.SECONDS);
+ persistLogExecutorService.awaitTermination(20, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
- persistLogDeleteExecutorService = null;
+ persistLogExecutorService = null;
}
}
@@ -945,10 +1009,14 @@ public class SyncLogDequeSerializer implements StableEntryManager {
return Long.parseLong(file.getName().split(FILE_NAME_SEPARATOR)[2]);
}
- public void checkDeletePersistRaftLog() {
- // 1. check the log index offset list size
+ public void checkPersistRaftLog() {
+
lock.lock();
try {
+ // 1. flush logs in buffer
+ flushLogBuffer();
+
+ // 2. check the log index offset list size
if (logIndexOffsetList.size() > maxRaftLogIndexSizeInMemory) {
int compactIndex = logIndexOffsetList.size() - maxRaftLogIndexSizeInMemory;
logIndexOffsetList.subList(0, compactIndex).clear();
@@ -958,7 +1026,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
lock.unlock();
}
- // 2. check the persist log file number
+ // 3. check the persist log file number
lock.lock();
try {
while (logDataFileList.size() > maxNumberOfPersistRaftLogFiles) {
@@ -968,7 +1036,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
lock.unlock();
}
- // 3. check the persist log index number
+ // 4. check the persisted log index number
lock.lock();
try {
while (logDataFileList.size() > 1) {
@@ -1095,7 +1163,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
/**
* @param startIndex the log start index
- * @param endIndex the log end index
+ * @param endIndex the log end index
* @return the raft log which index between [startIndex, endIndex] or empty if not found
*/
@Override
@@ -1200,9 +1268,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
/**
* @param startIndex the log start index
- * @param endIndex the log end index
+ * @param endIndex the log end index
* @return first value-> the log data file, second value-> the left value is the start offset of
- * the file, the right is the end offset of the file
+ * the file, the right is the end offset of the file
*/
private List<Pair<File, Pair<Long, Long>>> getLogDataFileAndOffset(
long startIndex, long endIndex) {
@@ -1269,8 +1337,8 @@ public class SyncLogDequeSerializer implements StableEntryManager {
/**
* @param startIndex the start log index
* @return the first value of the pair is the log index file which contains the start index; the
- * second pair's first value is the file's start log index. the second pair's second value is
- * the file's end log index. null if not found
+ * second pair's first value is the file's start log index. the second pair's second value is the
+ * file's end log index. null if not found
*/
public IndexFileDescriptor getLogIndexFile(long startIndex) {
for (IndexFileDescriptor descriptor : logIndexFileList) {
@@ -1285,8 +1353,8 @@ public class SyncLogDequeSerializer implements StableEntryManager {
/**
* @param startIndex the start log index
* @return the first value of the pair is the log data file which contains the start index; the
- * second pair's first value is the file's start log index. the second pair's second value is
- * the file's end log index. null if not found
+ * second pair's first value is the file's start log index. the second pair's second value is the
+ * file's end log index. null if not found
*/
public Pair<File, Pair<Long, Long>> getLogDataFile(long startIndex) {
for (File file : logDataFileList) {
@@ -1305,9 +1373,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
}
/**
- * @param file the log data file
+ * @param file the log data file
* @param startAndEndOffset the left value is the start offset of the file, the right is the end
- * offset of the file
+ * offset of the file
* @return the logs between start offset and end offset
*/
private List<Entry> getLogsFromOneLogDataFile(File file, Pair<Long, Long> startAndEndOffset) {
@@ -1364,6 +1432,11 @@ public class SyncLogDequeSerializer implements StableEntryManager {
return result;
}
+ @Override
+ public long getPersistedLogIndex() {
+ return persistedLogIndex;
+ }
+
/**
* VersionController manages the version(a monotonically increasing long) of a storage group. We
* define that each memtable flush, data deletion, or data update will generate a new version of
@@ -1400,7 +1473,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
* automatically increased by saveInterval to avoid conflicts.
*/
private static long saveInterval = 100;
- /** time partition id to dividing time series into different storage group */
+ /**
+ * time partition id to dividing time series into different storage group
+ */
private long timePartitionId;
private long prevVersion;
@@ -1414,7 +1489,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
restore();
}
- /** only used for upgrading */
+ /**
+ * only used for upgrading
+ */
public SimpleFileVersionController(String directoryPath) throws IOException {
this.directoryPath = directoryPath + File.separator + UPGRADE_DIR;
restore();
@@ -1477,7 +1554,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
prevVersion = currVersion;
}
- /** recovery from disk */
+ /**
+ * recovery from disk
+ */
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
private void restore() throws IOException {
File directory = SystemFileFactory.INSTANCE.getFile(directoryPath);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeReport.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeReport.java
index e6220a188d..ebbd5a6188 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeReport.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeReport.java
@@ -66,6 +66,7 @@ public class NodeReport {
long lastLogIndex;
long commitIndex;
long commitTerm;
+ long persistedIndex;
boolean isReadOnly;
long lastHeartbeatReceivedTime;
long prevLastLogIndex;
@@ -79,6 +80,7 @@ public class NodeReport {
long lastLogIndex,
long commitIndex,
long commitTerm,
+ long persistedIndex,
boolean isReadOnly,
long lastHeartbeatReceivedTime,
long prevLastLogIndex,
@@ -90,6 +92,7 @@ public class NodeReport {
this.lastLogIndex = lastLogIndex;
this.commitIndex = commitIndex;
this.commitTerm = commitTerm;
+ this.persistedIndex = persistedIndex;
this.isReadOnly = isReadOnly;
this.lastHeartbeatReceivedTime = lastHeartbeatReceivedTime;
this.prevLastLogIndex = prevLastLogIndex;
@@ -114,6 +117,8 @@ public class NodeReport {
+ commitIndex
+ ", commitTerm="
+ commitTerm
+ + ", persistedIndex="
+ + persistedIndex
+ ", appliedLogIndex="
+ maxAppliedLogIndex
+ ", readOnly="
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThread.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThread.java
index 9fcb16ec8a..e301ba4400 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThread.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThread.java
@@ -36,8 +36,8 @@ public abstract class DynamicThread implements Runnable {
private long idleTimeSum;
private long runningTimeSum;
// TODO: add configuration for the values
- private double maximumIdleRatio = 0.5;
- private double minimumIdleRatio = 0.1;
+ private double maximumIdleRatio = 0.8;
+ private double minimumIdleRatio = 0.2;
private long minimumRunningTime = 10_000_000_000L;
public DynamicThread(DynamicThreadGroup threadGroup) {