You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/12/14 12:54:02 UTC
[iotdb] branch rel/1.0 updated: [To rel/1.0][IOTDB-5183]Fix CI OOM (#8413)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.0 by this push:
new c8dcb58b04 [To rel/1.0][IOTDB-5183]Fix CI OOM (#8413)
c8dcb58b04 is described below
commit c8dcb58b048a89b51b800c2e149c19e8338caac9
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Wed Dec 14 20:53:57 2022 +0800
[To rel/1.0][IOTDB-5183]Fix CI OOM (#8413)
---
.../engine/compaction/CompactionTaskManager.java | 28 +++++++++-----------
.../engine/compaction/CompactionSchedulerTest.java | 30 +++++++++-------------
.../CompactionSchedulerWithFastPerformerTest.java | 30 ----------------------
.../compaction/utils/CompactionConfigRestorer.java | 12 +++++++++
4 files changed, 36 insertions(+), 64 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index edd79a608c..ea7e70805f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -113,7 +113,7 @@ public class CompactionTaskManager implements IService {
this.subCompactionTaskExecutionPool =
(WrappedThreadPoolExecutor)
IoTDBThreadPoolFactory.newFixedThreadPool(
- IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount()
+ compactionThreadNum
* IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum(),
ThreadName.COMPACTION_SUB_SERVICE.getName());
for (int i = 0; i < compactionThreadNum; ++i) {
@@ -124,6 +124,7 @@ public class CompactionTaskManager implements IService {
@Override
public void stop() {
if (taskExecutionPool != null) {
+ subCompactionTaskExecutionPool.shutdownNow();
taskExecutionPool.shutdownNow();
logger.info("Waiting for task taskExecutionPool to shut down");
waitTermination();
@@ -135,6 +136,7 @@ public class CompactionTaskManager implements IService {
@Override
public void waitAndStop(long milliseconds) {
if (taskExecutionPool != null) {
+ awaitTermination(subCompactionTaskExecutionPool, milliseconds);
awaitTermination(taskExecutionPool, milliseconds);
logger.info("Waiting for task taskExecutionPool to shut down in {} ms", milliseconds);
waitTermination();
@@ -175,7 +177,7 @@ public class CompactionTaskManager implements IService {
private void waitTermination() {
long startTime = System.currentTimeMillis();
int timeMillis = 0;
- while (!taskExecutionPool.isTerminated()) {
+ while (!subCompactionTaskExecutionPool.isTerminated() || !taskExecutionPool.isTerminated()) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
@@ -188,6 +190,7 @@ public class CompactionTaskManager implements IService {
}
}
taskExecutionPool = null;
+ subCompactionTaskExecutionPool = null;
storageGroupTasks.clear();
logger.info("CompactionManager stopped");
}
@@ -352,15 +355,6 @@ public class CompactionTaskManager implements IService {
@TestOnly
public void restart() throws InterruptedException {
if (IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount() > 0) {
- if (taskExecutionPool != null) {
- this.taskExecutionPool.shutdownNow();
- if (!this.taskExecutionPool.awaitTermination(MAX_WAITING_TIME, TimeUnit.MILLISECONDS)) {
- throw new InterruptedException(
- "Has been waiting over "
- + MAX_WAITING_TIME / 1000
- + " seconds for all compaction tasks to finish.");
- }
- }
if (subCompactionTaskExecutionPool != null) {
this.subCompactionTaskExecutionPool.shutdownNow();
if (!this.subCompactionTaskExecutionPool.awaitTermination(
@@ -371,11 +365,13 @@ public class CompactionTaskManager implements IService {
+ " seconds for all sub compaction tasks to finish.");
}
}
- if (this.subCompactionTaskExecutionPool != null) {
- subCompactionTaskExecutionPool.shutdownNow();
- if (!this.subCompactionTaskExecutionPool.awaitTermination(
- MAX_WAITING_TIME, TimeUnit.MILLISECONDS)) {
- throw new RuntimeException("Failed to shutdown subCompactionTaskExecutionPool");
+ if (taskExecutionPool != null) {
+ this.taskExecutionPool.shutdownNow();
+ if (!this.taskExecutionPool.awaitTermination(MAX_WAITING_TIME, TimeUnit.MILLISECONDS)) {
+ throw new InterruptedException(
+ "Has been waiting over "
+ + MAX_WAITING_TIME / 1000
+ + " seconds for all compaction tasks to finish.");
}
}
initThreadPool();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
index e51d57cde9..59a3ece46a 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
@@ -26,6 +26,9 @@ import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.engine.compaction.constant.CompactionPriority;
+import org.apache.iotdb.db.engine.compaction.constant.CrossCompactionPerformer;
+import org.apache.iotdb.db.engine.compaction.constant.InnerSeqCompactionPerformer;
+import org.apache.iotdb.db.engine.compaction.constant.InnerUnseqCompactionPerformer;
import org.apache.iotdb.db.engine.compaction.utils.CompactionClearUtils;
import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer;
import org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils;
@@ -55,12 +58,6 @@ import static org.junit.Assert.fail;
public class CompactionSchedulerTest {
private static final Logger logger = LoggerFactory.getLogger(CompactionSchedulerTest.class);
static final String COMPACTION_TEST_SG = "root.compactionSchedulerTest_";
- private static final boolean oldEnableInnerSeqCompaction =
- IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
- private static final boolean oldEnableInnerUnseqCompaction =
- IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
- private static final boolean oldEnableCrossCompaction =
- IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
static final long MAX_WAITING_TIME = 60_000;
static final long SCHEDULE_AGAIN_TIME = 30_000;
static final String[] fullPaths =
@@ -91,9 +88,15 @@ public class CompactionSchedulerTest {
if (!basicOutputDir.exists()) {
assertTrue(basicOutputDir.mkdirs());
}
- IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(true);
- IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(true);
- IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(true);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setCrossCompactionPerformer(CrossCompactionPerformer.READ_POINT);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setInnerSeqCompactionPerformer(InnerSeqCompactionPerformer.READ_CHUNK);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setInnerUnseqCompactionPerformer(InnerUnseqCompactionPerformer.READ_POINT);
CompactionTaskManager.getInstance().start();
while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) {
try {
@@ -113,15 +116,6 @@ public class CompactionSchedulerTest {
EnvironmentUtils.cleanAllDir();
CompactionClearUtils.deleteEmptyDir(new File("target"));
CompactionTaskManager.getInstance().stop();
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setEnableSeqSpaceCompaction(oldEnableInnerSeqCompaction);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setEnableUnseqSpaceCompaction(oldEnableInnerUnseqCompaction);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setEnableCrossSpaceCompaction(oldEnableCrossCompaction);
}
/**
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerWithFastPerformerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerWithFastPerformerTest.java
index 2f70060287..3d1b3b00e1 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerWithFastPerformerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerWithFastPerformerTest.java
@@ -58,12 +58,6 @@ import static org.junit.Assert.fail;
public class CompactionSchedulerWithFastPerformerTest {
private static final Logger logger = LoggerFactory.getLogger(CompactionSchedulerTest.class);
static final String COMPACTION_TEST_SG = "root.compactionSchedulerTest_";
- private static final boolean oldEnableInnerSeqCompaction =
- IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
- private static final boolean oldEnableInnerUnseqCompaction =
- IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
- private static final boolean oldEnableCrossCompaction =
- IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
static final long MAX_WAITING_TIME = 60_000;
static final long SCHEDULE_AGAIN_TIME = 30_000;
static final String[] fullPaths =
@@ -85,13 +79,6 @@ public class CompactionSchedulerWithFastPerformerTest {
".device1.sensor4",
};
- private CrossCompactionPerformer oldCrossPerformer =
- IoTDBDescriptor.getInstance().getConfig().getCrossCompactionPerformer();
- private InnerSeqCompactionPerformer oldInnerSeqPerformer =
- IoTDBDescriptor.getInstance().getConfig().getInnerSeqCompactionPerformer();
- private InnerUnseqCompactionPerformer oldInnerUnseqPerformer =
- IoTDBDescriptor.getInstance().getConfig().getInnerUnseqCompactionPerformer();
-
@Before
public void setUp() throws MetadataException, IOException {
CompactionClearUtils.clearAllCompactionFiles();
@@ -101,9 +88,6 @@ public class CompactionSchedulerWithFastPerformerTest {
if (!basicOutputDir.exists()) {
assertTrue(basicOutputDir.mkdirs());
}
- IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(true);
- IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(true);
- IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(true);
IoTDBDescriptor.getInstance()
.getConfig()
.setCrossCompactionPerformer(CrossCompactionPerformer.FAST);
@@ -132,20 +116,6 @@ public class CompactionSchedulerWithFastPerformerTest {
EnvironmentUtils.cleanAllDir();
CompactionClearUtils.deleteEmptyDir(new File("target"));
CompactionTaskManager.getInstance().stop();
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setEnableSeqSpaceCompaction(oldEnableInnerSeqCompaction);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setEnableUnseqSpaceCompaction(oldEnableInnerUnseqCompaction);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setEnableCrossSpaceCompaction(oldEnableCrossCompaction);
- IoTDBDescriptor.getInstance().getConfig().setCrossCompactionPerformer(oldCrossPerformer);
- IoTDBDescriptor.getInstance().getConfig().setInnerSeqCompactionPerformer(oldInnerSeqPerformer);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setInnerUnseqCompactionPerformer(oldInnerUnseqPerformer);
}
/**
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
index a809d10262..613308c5d7 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
@@ -22,8 +22,11 @@ package org.apache.iotdb.db.engine.compaction.utils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.constant.CompactionPriority;
+import org.apache.iotdb.db.engine.compaction.constant.CrossCompactionPerformer;
import org.apache.iotdb.db.engine.compaction.constant.CrossCompactionSelector;
+import org.apache.iotdb.db.engine.compaction.constant.InnerSeqCompactionPerformer;
import org.apache.iotdb.db.engine.compaction.constant.InnerSequenceCompactionSelector;
+import org.apache.iotdb.db.engine.compaction.constant.InnerUnseqCompactionPerformer;
public class CompactionConfigRestorer {
private boolean enableSeqSpaceCompaction = true;
@@ -44,6 +47,12 @@ public class CompactionConfigRestorer {
private long compactionScheduleIntervalInMs = 60000L;
private long compactionSubmissionIntervalInMs = 60000L;
private int compactionWriteThroughputMbPerSec = 8;
+ private CrossCompactionPerformer oldCrossPerformer =
+ IoTDBDescriptor.getInstance().getConfig().getCrossCompactionPerformer();
+ private InnerSeqCompactionPerformer oldInnerSeqPerformer =
+ IoTDBDescriptor.getInstance().getConfig().getInnerSeqCompactionPerformer();
+ private InnerUnseqCompactionPerformer oldInnerUnseqPerformer =
+ IoTDBDescriptor.getInstance().getConfig().getInnerUnseqCompactionPerformer();
public CompactionConfigRestorer() {}
@@ -66,5 +75,8 @@ public class CompactionConfigRestorer {
config.setCompactionScheduleIntervalInMs(compactionScheduleIntervalInMs);
config.setCompactionSubmissionIntervalInMs(compactionSubmissionIntervalInMs);
config.setCompactionWriteThroughputMbPerSec(compactionWriteThroughputMbPerSec);
+ config.setCrossCompactionPerformer(oldCrossPerformer);
+ config.setInnerSeqCompactionPerformer(oldInnerSeqPerformer);
+ config.setInnerUnseqCompactionPerformer(oldInnerUnseqPerformer);
}
}