You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/12/14 12:54:02 UTC

[iotdb] branch rel/1.0 updated: [To rel/1.0][IOTDB-5183]Fix CI OOM (#8413)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.0 by this push:
     new c8dcb58b04 [To rel/1.0][IOTDB-5183]Fix CI OOM (#8413)
c8dcb58b04 is described below

commit c8dcb58b048a89b51b800c2e149c19e8338caac9
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Wed Dec 14 20:53:57 2022 +0800

    [To rel/1.0][IOTDB-5183]Fix CI OOM (#8413)
---
 .../engine/compaction/CompactionTaskManager.java   | 28 +++++++++-----------
 .../engine/compaction/CompactionSchedulerTest.java | 30 +++++++++-------------
 .../CompactionSchedulerWithFastPerformerTest.java  | 30 ----------------------
 .../compaction/utils/CompactionConfigRestorer.java | 12 +++++++++
 4 files changed, 36 insertions(+), 64 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index edd79a608c..ea7e70805f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -113,7 +113,7 @@ public class CompactionTaskManager implements IService {
     this.subCompactionTaskExecutionPool =
         (WrappedThreadPoolExecutor)
             IoTDBThreadPoolFactory.newFixedThreadPool(
-                IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount()
+                compactionThreadNum
                     * IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum(),
                 ThreadName.COMPACTION_SUB_SERVICE.getName());
     for (int i = 0; i < compactionThreadNum; ++i) {
@@ -124,6 +124,7 @@ public class CompactionTaskManager implements IService {
   @Override
   public void stop() {
     if (taskExecutionPool != null) {
+      subCompactionTaskExecutionPool.shutdownNow();
       taskExecutionPool.shutdownNow();
       logger.info("Waiting for task taskExecutionPool to shut down");
       waitTermination();
@@ -135,6 +136,7 @@ public class CompactionTaskManager implements IService {
   @Override
   public void waitAndStop(long milliseconds) {
     if (taskExecutionPool != null) {
+      awaitTermination(subCompactionTaskExecutionPool, milliseconds);
       awaitTermination(taskExecutionPool, milliseconds);
       logger.info("Waiting for task taskExecutionPool to shut down in {} ms", milliseconds);
       waitTermination();
@@ -175,7 +177,7 @@ public class CompactionTaskManager implements IService {
   private void waitTermination() {
     long startTime = System.currentTimeMillis();
     int timeMillis = 0;
-    while (!taskExecutionPool.isTerminated()) {
+    while (!subCompactionTaskExecutionPool.isTerminated() || !taskExecutionPool.isTerminated()) {
       try {
         Thread.sleep(200);
       } catch (InterruptedException e) {
@@ -188,6 +190,7 @@ public class CompactionTaskManager implements IService {
       }
     }
     taskExecutionPool = null;
+    subCompactionTaskExecutionPool = null;
     storageGroupTasks.clear();
     logger.info("CompactionManager stopped");
   }
@@ -352,15 +355,6 @@ public class CompactionTaskManager implements IService {
   @TestOnly
   public void restart() throws InterruptedException {
     if (IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount() > 0) {
-      if (taskExecutionPool != null) {
-        this.taskExecutionPool.shutdownNow();
-        if (!this.taskExecutionPool.awaitTermination(MAX_WAITING_TIME, TimeUnit.MILLISECONDS)) {
-          throw new InterruptedException(
-              "Has been waiting over "
-                  + MAX_WAITING_TIME / 1000
-                  + " seconds for all compaction tasks to finish.");
-        }
-      }
       if (subCompactionTaskExecutionPool != null) {
         this.subCompactionTaskExecutionPool.shutdownNow();
         if (!this.subCompactionTaskExecutionPool.awaitTermination(
@@ -371,11 +365,13 @@ public class CompactionTaskManager implements IService {
                   + " seconds for all sub compaction tasks to finish.");
         }
       }
-      if (this.subCompactionTaskExecutionPool != null) {
-        subCompactionTaskExecutionPool.shutdownNow();
-        if (!this.subCompactionTaskExecutionPool.awaitTermination(
-            MAX_WAITING_TIME, TimeUnit.MILLISECONDS)) {
-          throw new RuntimeException("Failed to shutdown subCompactionTaskExecutionPool");
+      if (taskExecutionPool != null) {
+        this.taskExecutionPool.shutdownNow();
+        if (!this.taskExecutionPool.awaitTermination(MAX_WAITING_TIME, TimeUnit.MILLISECONDS)) {
+          throw new InterruptedException(
+              "Has been waiting over "
+                  + MAX_WAITING_TIME / 1000
+                  + " seconds for all compaction tasks to finish.");
         }
       }
       initThreadPool();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
index e51d57cde9..59a3ece46a 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
@@ -26,6 +26,9 @@ import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
 import org.apache.iotdb.db.engine.compaction.constant.CompactionPriority;
+import org.apache.iotdb.db.engine.compaction.constant.CrossCompactionPerformer;
+import org.apache.iotdb.db.engine.compaction.constant.InnerSeqCompactionPerformer;
+import org.apache.iotdb.db.engine.compaction.constant.InnerUnseqCompactionPerformer;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionClearUtils;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils;
@@ -55,12 +58,6 @@ import static org.junit.Assert.fail;
 public class CompactionSchedulerTest {
   private static final Logger logger = LoggerFactory.getLogger(CompactionSchedulerTest.class);
   static final String COMPACTION_TEST_SG = "root.compactionSchedulerTest_";
-  private static final boolean oldEnableInnerSeqCompaction =
-      IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
-  private static final boolean oldEnableInnerUnseqCompaction =
-      IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
-  private static final boolean oldEnableCrossCompaction =
-      IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
   static final long MAX_WAITING_TIME = 60_000;
   static final long SCHEDULE_AGAIN_TIME = 30_000;
   static final String[] fullPaths =
@@ -91,9 +88,15 @@ public class CompactionSchedulerTest {
     if (!basicOutputDir.exists()) {
       assertTrue(basicOutputDir.mkdirs());
     }
-    IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(true);
-    IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(true);
-    IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(true);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setCrossCompactionPerformer(CrossCompactionPerformer.READ_POINT);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setInnerSeqCompactionPerformer(InnerSeqCompactionPerformer.READ_CHUNK);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setInnerUnseqCompactionPerformer(InnerUnseqCompactionPerformer.READ_POINT);
     CompactionTaskManager.getInstance().start();
     while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) {
       try {
@@ -113,15 +116,6 @@ public class CompactionSchedulerTest {
     EnvironmentUtils.cleanAllDir();
     CompactionClearUtils.deleteEmptyDir(new File("target"));
     CompactionTaskManager.getInstance().stop();
-    IoTDBDescriptor.getInstance()
-        .getConfig()
-        .setEnableSeqSpaceCompaction(oldEnableInnerSeqCompaction);
-    IoTDBDescriptor.getInstance()
-        .getConfig()
-        .setEnableUnseqSpaceCompaction(oldEnableInnerUnseqCompaction);
-    IoTDBDescriptor.getInstance()
-        .getConfig()
-        .setEnableCrossSpaceCompaction(oldEnableCrossCompaction);
   }
 
   /**
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerWithFastPerformerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerWithFastPerformerTest.java
index 2f70060287..3d1b3b00e1 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerWithFastPerformerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerWithFastPerformerTest.java
@@ -58,12 +58,6 @@ import static org.junit.Assert.fail;
 public class CompactionSchedulerWithFastPerformerTest {
   private static final Logger logger = LoggerFactory.getLogger(CompactionSchedulerTest.class);
   static final String COMPACTION_TEST_SG = "root.compactionSchedulerTest_";
-  private static final boolean oldEnableInnerSeqCompaction =
-      IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
-  private static final boolean oldEnableInnerUnseqCompaction =
-      IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
-  private static final boolean oldEnableCrossCompaction =
-      IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
   static final long MAX_WAITING_TIME = 60_000;
   static final long SCHEDULE_AGAIN_TIME = 30_000;
   static final String[] fullPaths =
@@ -85,13 +79,6 @@ public class CompactionSchedulerWithFastPerformerTest {
         ".device1.sensor4",
       };
 
-  private CrossCompactionPerformer oldCrossPerformer =
-      IoTDBDescriptor.getInstance().getConfig().getCrossCompactionPerformer();
-  private InnerSeqCompactionPerformer oldInnerSeqPerformer =
-      IoTDBDescriptor.getInstance().getConfig().getInnerSeqCompactionPerformer();
-  private InnerUnseqCompactionPerformer oldInnerUnseqPerformer =
-      IoTDBDescriptor.getInstance().getConfig().getInnerUnseqCompactionPerformer();
-
   @Before
   public void setUp() throws MetadataException, IOException {
     CompactionClearUtils.clearAllCompactionFiles();
@@ -101,9 +88,6 @@ public class CompactionSchedulerWithFastPerformerTest {
     if (!basicOutputDir.exists()) {
       assertTrue(basicOutputDir.mkdirs());
     }
-    IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(true);
-    IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(true);
-    IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(true);
     IoTDBDescriptor.getInstance()
         .getConfig()
         .setCrossCompactionPerformer(CrossCompactionPerformer.FAST);
@@ -132,20 +116,6 @@ public class CompactionSchedulerWithFastPerformerTest {
     EnvironmentUtils.cleanAllDir();
     CompactionClearUtils.deleteEmptyDir(new File("target"));
     CompactionTaskManager.getInstance().stop();
-    IoTDBDescriptor.getInstance()
-        .getConfig()
-        .setEnableSeqSpaceCompaction(oldEnableInnerSeqCompaction);
-    IoTDBDescriptor.getInstance()
-        .getConfig()
-        .setEnableUnseqSpaceCompaction(oldEnableInnerUnseqCompaction);
-    IoTDBDescriptor.getInstance()
-        .getConfig()
-        .setEnableCrossSpaceCompaction(oldEnableCrossCompaction);
-    IoTDBDescriptor.getInstance().getConfig().setCrossCompactionPerformer(oldCrossPerformer);
-    IoTDBDescriptor.getInstance().getConfig().setInnerSeqCompactionPerformer(oldInnerSeqPerformer);
-    IoTDBDescriptor.getInstance()
-        .getConfig()
-        .setInnerUnseqCompactionPerformer(oldInnerUnseqPerformer);
   }
 
   /**
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
index a809d10262..613308c5d7 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
@@ -22,8 +22,11 @@ package org.apache.iotdb.db.engine.compaction.utils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.compaction.constant.CompactionPriority;
+import org.apache.iotdb.db.engine.compaction.constant.CrossCompactionPerformer;
 import org.apache.iotdb.db.engine.compaction.constant.CrossCompactionSelector;
+import org.apache.iotdb.db.engine.compaction.constant.InnerSeqCompactionPerformer;
 import org.apache.iotdb.db.engine.compaction.constant.InnerSequenceCompactionSelector;
+import org.apache.iotdb.db.engine.compaction.constant.InnerUnseqCompactionPerformer;
 
 public class CompactionConfigRestorer {
   private boolean enableSeqSpaceCompaction = true;
@@ -44,6 +47,12 @@ public class CompactionConfigRestorer {
   private long compactionScheduleIntervalInMs = 60000L;
   private long compactionSubmissionIntervalInMs = 60000L;
   private int compactionWriteThroughputMbPerSec = 8;
+  private CrossCompactionPerformer oldCrossPerformer =
+      IoTDBDescriptor.getInstance().getConfig().getCrossCompactionPerformer();
+  private InnerSeqCompactionPerformer oldInnerSeqPerformer =
+      IoTDBDescriptor.getInstance().getConfig().getInnerSeqCompactionPerformer();
+  private InnerUnseqCompactionPerformer oldInnerUnseqPerformer =
+      IoTDBDescriptor.getInstance().getConfig().getInnerUnseqCompactionPerformer();
 
   public CompactionConfigRestorer() {}
 
@@ -66,5 +75,8 @@ public class CompactionConfigRestorer {
     config.setCompactionScheduleIntervalInMs(compactionScheduleIntervalInMs);
     config.setCompactionSubmissionIntervalInMs(compactionSubmissionIntervalInMs);
     config.setCompactionWriteThroughputMbPerSec(compactionWriteThroughputMbPerSec);
+    config.setCrossCompactionPerformer(oldCrossPerformer);
+    config.setInnerSeqCompactionPerformer(oldInnerSeqPerformer);
+    config.setInnerUnseqCompactionPerformer(oldInnerUnseqPerformer);
   }
 }