You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/11/27 05:24:18 UTC

[iotdb] branch master_performance updated (1489035 -> 92bb0b4)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch master_performance
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


    from 1489035  add info logs (#4464)
     new 5cef83f  [IOTDB-2027] Ignore too many WAL BufferOverflow log (#4467)
     new 3865ef6  catch throwable in sub query threads (#4462)
     new 89d5c48  [IOTDB-1673] CLI refactor and upgrade to JLine3 (#4458)
     new 0d0363a  [IOTDB-2010] fix incomplete show timeseries result (#4388)
     new 5ceecfd  [IOTDB-2062] UDF Framework: Potential Memory Leak in `SingleInputColumnSingleReferenceIntermediateLayer` (#4472)
     new e697afb  [IOTDB-2064] fix the NPE caused by map serde (#4473)
     new 451bb21  [IOTDB-2063] Fix MinTimeDescAggregationResult implementation (#4471)
     new c0233e6  [IOTDB-2065] TsFileSequenceReader will be cached for 100s even no longer used (#4478)
     new 0224027  [IOTDB-1899] Fix stream closed exception during compaction (#4457)
     new 92bb0b4  add split log timer

The 10 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 LICENSE-binary                                     |   6 +-
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4    |   4 +-
 cli/pom.xml                                        |  11 +-
 cli/src/assembly/resources/sbin/start-cli.bat      |   2 +-
 cli/src/main/java/org/apache/iotdb/cli/Cli.java    |  40 ++--
 .../apache/iotdb/cli/IoTDBSyntaxHighlighter.java   |  76 +++++++
 cli/src/main/java/org/apache/iotdb/cli/WinCli.java | 175 --------------
 .../org/apache/iotdb/cli/utils/JlineUtils.java     | 107 +++++++++
 .../org/apache/iotdb/tool/AbstractCsvTool.java     |   3 +-
 .../main/java/org/apache/iotdb/tool/ExportCsv.java |  19 +-
 .../iotdb/cluster/coordinator/Coordinator.java     |   4 +-
 .../apache/iotdb/cluster/metadata/CMManager.java   |  68 +++---
 .../handlers/caller/ShowTimeSeriesHandler.java     | 133 +++++++++++
 .../apache/iotdb/cluster/server/monitor/Timer.java |   1 +
 pom.xml                                            |   6 +-
 .../db/engine/compaction/CompactionScheduler.java  |   3 +-
 .../engine/compaction/CompactionTaskManager.java   |  36 ++-
 .../inner/AbstractInnerSpaceCompactionTask.java    |   2 +
 .../sizetiered/SizeTieredCompactionSelector.java   |   2 +-
 .../inner/sizetiered/SizeTieredCompactionTask.java | 105 ++++-----
 .../compaction/task/AbstractCompactionTask.java    |   2 +
 .../compaction/task/CompactionRecoverTask.java     |   2 +-
 .../db/exception/query/QueryProcessException.java  |   4 +
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |   1 -
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |   4 +
 .../aggregation/impl/MinTimeDescAggrResult.java    |   5 +
 .../iotdb/db/query/control/FileReaderManager.java  | 110 +++------
 .../dataset/RawQueryDataSetWithoutValueFilter.java |  14 +-
 .../db/query/udf/core/executor/UDTFExecutor.java   |   6 +
 ...nputColumnSingleReferenceIntermediateLayer.java |   4 +
 .../db/writelog/node/ExclusiveWriteLogNode.java    |   8 +
 .../iotdb/db/engine/cache/ChunkCacheTest.java      |   1 -
 .../engine/compaction/CompactionSchedulerTest.java |  18 +-
 .../compaction/CompactionTaskManagerTest.java      | 252 +++++++++++++++++++++
 .../db/engine/compaction/cross/MergeTest.java      |   1 -
 .../compaction/inner/InnerCompactionChunkTest.java |   4 +-
 .../compaction/inner/InnerCompactionLogTest.java   |   4 +-
 .../inner/InnerCompactionMoreDataTest.java         |   2 +-
 .../inner/InnerCompactionSchedulerTest.java        |   4 +-
 .../compaction/inner/InnerCompactionTest.java      |   3 +-
 .../inner/InnerSpaceCompactionUtilsTest.java       |   4 +-
 .../SizeTieredCompactionRecoverTest.java           |   1 -
 .../inner/sizetiered/SizeTieredCompactionTest.java |   1 -
 .../task/FakedInnerSpaceCompactionTask.java        |  44 ++--
 .../compaction/utils/CompactionClearUtils.java     |   1 -
 .../engine/storagegroup/FakedTsFileResource.java   |  11 +
 .../storagegroup/StorageGroupProcessorTest.java    |   2 +-
 .../db/integration/IoTDBNewTsFileCompactionIT.java |   2 +-
 .../db/query/control/FileReaderManagerTest.java    |  19 +-
 .../query/reader/series/SeriesReaderTestUtil.java  |   1 -
 .../iotdb/db/rescon/ResourceManagerTest.java       |   1 -
 .../apache/iotdb/db/writelog/WriteLogNodeTest.java |  49 ++++
 .../tsfile/read/common/ExceptionBatchData.java     |  10 +-
 .../iotdb/tsfile/utils/ReadWriteIOUtils.java       |  11 +
 .../iotdb/tsfile/utils/ReadWriteIOUtilsTest.java   |  30 ++-
 55 files changed, 969 insertions(+), 470 deletions(-)
 create mode 100644 cli/src/main/java/org/apache/iotdb/cli/IoTDBSyntaxHighlighter.java
 delete mode 100644 cli/src/main/java/org/apache/iotdb/cli/WinCli.java
 create mode 100644 cli/src/main/java/org/apache/iotdb/cli/utils/JlineUtils.java
 create mode 100644 cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ShowTimeSeriesHandler.java
 create mode 100644 server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java

[iotdb] 07/10: [IOTDB-2063] Fix MinTimeDescAggregationResult implementation (#4471)

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master_performance
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 451bb2133a653949553a12884f5ebe02c86790a7
Author: BaiJian <er...@hotmail.com>
AuthorDate: Fri Nov 26 15:06:22 2021 +0800

    [IOTDB-2063] Fix MinTimeDescAggregationResult implementation (#4471)
---
 .../iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java       | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
index 9abceb5..071d3d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
@@ -67,4 +67,9 @@ public class MinTimeDescAggrResult extends MinTimeAggrResult {
   public boolean hasFinalResult() {
     return false;
   }
+
+  @Override
+  public boolean isAscending() {
+    return false;
+  }
 }

[iotdb] 06/10: [IOTDB-2064] fix the NPE caused by map serde (#4473)

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master_performance
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e697afbe0d5f8f0f6b66f4f4f5096486366c59bb
Author: Zhong Wang <wa...@alibaba-inc.com>
AuthorDate: Fri Nov 26 15:02:21 2021 +0800

    [IOTDB-2064] fix the NPE caused by map serde (#4473)
---
 .../iotdb/tsfile/utils/ReadWriteIOUtils.java       | 11 ++++++++
 .../iotdb/tsfile/utils/ReadWriteIOUtilsTest.java   | 30 ++++++++++++++++++++--
 2 files changed, 39 insertions(+), 2 deletions(-)

diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index a54ad6e..6744c24 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -111,6 +111,10 @@ public class ReadWriteIOUtils {
   }
 
   public static int write(Map<String, String> map, DataOutputStream stream) throws IOException {
+    if (map == null) {
+      return write(-1, stream);
+    }
+
     int length = 0;
     stream.writeInt(map.size());
     length += 4;
@@ -129,6 +133,10 @@ public class ReadWriteIOUtils {
   }
 
   public static int write(Map<String, String> map, ByteBuffer buffer) {
+    if (map == null) {
+      return write(-1, buffer);
+    }
+
     int length = 0;
     byte[] bytes;
     buffer.putInt(map.size());
@@ -649,6 +657,9 @@ public class ReadWriteIOUtils {
 
   public static Map<String, String> readMap(ByteBuffer buffer) {
     int length = readInt(buffer);
+    if (length == -1) {
+      return null;
+    }
     Map<String, String> map = new HashMap<>(length);
     for (int i = 0; i < length; i++) {
       // key
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtilsTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtilsTest.java
index 48a89c0..7662508 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtilsTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtilsTest.java
@@ -25,6 +25,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -35,7 +36,7 @@ public class ReadWriteIOUtilsTest {
   protected static final int DEFAULT_BUFFER_SIZE = 4096;
 
   @Test
-  public void readStringBufferTest() {
+  public void stringSerdeTest() {
     // 1. not null value
     String str = "string";
     ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(DEFAULT_BUFFER_SIZE);
@@ -67,7 +68,7 @@ public class ReadWriteIOUtilsTest {
   }
 
   @Test
-  public void readMapTest() {
+  public void mapSerdeTest() {
     // 1. key: not null; value: not null
     String key = "string";
     String value = "string";
@@ -136,5 +137,30 @@ public class ReadWriteIOUtilsTest {
     result = ReadWriteIOUtils.readMap(ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
     Assert.assertNotNull(result);
     Assert.assertEquals(map, result);
+
+    // 5. empty map
+    map = Collections.emptyMap();
+    byteArrayOutputStream = new ByteArrayOutputStream(DEFAULT_BUFFER_SIZE);
+    stream = new DataOutputStream(byteArrayOutputStream);
+    try {
+      ReadWriteIOUtils.write(map, stream);
+    } catch (IOException e) {
+      fail(e.toString());
+    }
+    result = ReadWriteIOUtils.readMap(ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
+    Assert.assertNotNull(result);
+    Assert.assertTrue(result.isEmpty());
+
+    // 6. null
+    map = null;
+    byteArrayOutputStream = new ByteArrayOutputStream(DEFAULT_BUFFER_SIZE);
+    stream = new DataOutputStream(byteArrayOutputStream);
+    try {
+      ReadWriteIOUtils.write(map, stream);
+    } catch (IOException e) {
+      fail(e.toString());
+    }
+    result = ReadWriteIOUtils.readMap(ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
+    Assert.assertNull(result);
   }
 }

[iotdb] 09/10: [IOTDB-1899] Fix stream closed exception during compaction (#4457)

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master_performance
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0224027c50ecca51ae3f40f2f9abc9710b1fe7ac
Author: liuxuxin <37...@users.noreply.github.com>
AuthorDate: Sat Nov 27 09:59:38 2021 +0800

    [IOTDB-1899] Fix stream closed exception during compaction (#4457)
---
 .../db/engine/compaction/CompactionScheduler.java  |   3 +-
 .../engine/compaction/CompactionTaskManager.java   |  36 ++-
 .../inner/AbstractInnerSpaceCompactionTask.java    |   2 +
 .../sizetiered/SizeTieredCompactionSelector.java   |   2 +-
 .../inner/sizetiered/SizeTieredCompactionTask.java | 105 ++++-----
 .../compaction/task/AbstractCompactionTask.java    |   2 +
 .../compaction/task/CompactionRecoverTask.java     |   2 +-
 .../engine/compaction/CompactionSchedulerTest.java |  18 +-
 .../compaction/CompactionTaskManagerTest.java      | 252 +++++++++++++++++++++
 .../compaction/inner/InnerCompactionChunkTest.java |   4 +-
 .../compaction/inner/InnerCompactionLogTest.java   |   4 +-
 .../inner/InnerCompactionMoreDataTest.java         |   2 +-
 .../inner/InnerCompactionSchedulerTest.java        |   4 +-
 .../compaction/inner/InnerCompactionTest.java      |   2 +-
 .../inner/InnerSpaceCompactionUtilsTest.java       |   4 +-
 .../task/FakedInnerSpaceCompactionTask.java        |  44 ++--
 .../engine/storagegroup/FakedTsFileResource.java   |  11 +
 .../storagegroup/StorageGroupProcessorTest.java    |   2 +-
 .../db/integration/IoTDBNewTsFileCompactionIT.java |   2 +-
 19 files changed, 394 insertions(+), 107 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java
index fb2ecf5..6bccf4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java
@@ -106,7 +106,8 @@ public class CompactionScheduler {
     boolean taskSubmitted = true;
     int concurrentCompactionThread = config.getConcurrentCompactionThread();
     while (taskSubmitted
-        && CompactionTaskManager.getInstance().getTaskCount() < concurrentCompactionThread) {
+        && CompactionTaskManager.getInstance().getExecutingTaskCount()
+            < concurrentCompactionThread) {
       taskSubmitted =
           tryToSubmitInnerSpaceCompactionTask(
               logicalStorageGroupName,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index 8f85d2c..959b9fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -32,9 +32,11 @@ import com.google.common.collect.MinMaxPriorityQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
@@ -53,11 +55,12 @@ public class CompactionTaskManager implements IService {
   private WrappedScheduledExecutorService taskExecutionPool;
   public static volatile AtomicInteger currentTaskNum = new AtomicInteger(0);
   // TODO: record the task in time partition
-  private MinMaxPriorityQueue<AbstractCompactionTask> compactionTaskQueue =
+  private MinMaxPriorityQueue<AbstractCompactionTask> candidateCompactionTaskQueue =
       MinMaxPriorityQueue.orderedBy(new CompactionTaskComparator()).maximumSize(1000).create();
   private Map<String, Set<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
   private Map<String, Map<Long, Set<Future<Void>>>> compactionTaskFutures =
       new ConcurrentHashMap<>();
+  private List<AbstractCompactionTask> runningCompactionTaskList = new ArrayList<>();
   private ScheduledExecutorService compactionTaskSubmissionThreadPool;
   private final long TASK_SUBMIT_INTERVAL =
       IoTDBDescriptor.getInstance().getConfig().getCompactionSubmissionInterval();
@@ -172,13 +175,9 @@ public class CompactionTaskManager implements IService {
    * with last priority will be removed from the task.
    */
   public synchronized boolean addTaskToWaitingQueue(AbstractCompactionTask compactionTask) {
-    if (!compactionTaskQueue.contains(compactionTask)) {
-      logger.debug(
-          "Add a compaction task {} to queue, current queue size is {}, current task num is {}",
-          compactionTask,
-          compactionTaskQueue.size(),
-          currentTaskNum.get());
-      compactionTaskQueue.add(compactionTask);
+    if (!candidateCompactionTaskQueue.contains(compactionTask)
+        && !runningCompactionTaskList.contains(compactionTask)) {
+      candidateCompactionTaskQueue.add(compactionTask);
       return true;
     }
     return false;
@@ -191,14 +190,19 @@ public class CompactionTaskManager implements IService {
   public synchronized void submitTaskFromTaskQueue() {
     while (currentTaskNum.get()
             < IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()
-        && compactionTaskQueue.size() > 0) {
-      AbstractCompactionTask task = compactionTaskQueue.poll();
-      if (task.checkValidAndSetMerging()) {
+        && candidateCompactionTaskQueue.size() > 0) {
+      AbstractCompactionTask task = candidateCompactionTaskQueue.poll();
+      if (task != null && task.checkValidAndSetMerging()) {
         submitTask(task.getFullStorageGroupName(), task.getTimePartition(), task);
+        runningCompactionTaskList.add(task);
       }
     }
   }
 
+  public synchronized void removeRunningTaskFromList(AbstractCompactionTask task) {
+    runningCompactionTaskList.remove(task);
+  }
+
   /**
    * This method will directly submit a task to thread pool if there is available thread.
    *
@@ -240,10 +244,18 @@ public class CompactionTaskManager implements IService {
     }
   }
 
-  public int getTaskCount() {
+  public int getExecutingTaskCount() {
     return taskExecutionPool.getActiveCount() + taskExecutionPool.getQueue().size();
   }
 
+  public int getTotalTaskCount() {
+    return getExecutingTaskCount() + candidateCompactionTaskQueue.size();
+  }
+
+  public synchronized List<AbstractCompactionTask> getRunningCompactionTaskList() {
+    return new ArrayList<>(runningCompactionTaskList);
+  }
+
   public long getFinishTaskNum() {
     return taskExecutionPool.getCompletedTaskCount();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/AbstractInnerSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/AbstractInnerSpaceCompactionTask.java
index 38d2883..fa1e830 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/AbstractInnerSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/AbstractInnerSpaceCompactionTask.java
@@ -124,6 +124,8 @@ public abstract class AbstractInnerSpaceCompactionTask extends AbstractCompactio
         .append(timePartition)
         .append(" task file num is ")
         .append(selectedTsFileResourceList.size())
+        .append(", files is ")
+        .append(selectedTsFileResourceList)
         .append(", total compaction count is ")
         .append(sumOfCompactionCount)
         .toString();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
index c3c416a..db665bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
@@ -82,7 +82,7 @@ public class SizeTieredCompactionSelector extends AbstractInnerSpaceCompactionSe
         IoTDBDescriptor.getInstance().getConfig().getTargetCompactionFileSize(),
         IoTDBDescriptor.getInstance().getConfig().getMaxCompactionCandidateFileNum(),
         CompactionTaskManager.currentTaskNum.get(),
-        CompactionTaskManager.getInstance().getTaskCount(),
+        CompactionTaskManager.getInstance().getExecutingTaskCount(),
         IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread());
     tsFileResources.readLock();
     PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue =
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
index 6f85f21..0661da8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
@@ -137,62 +137,65 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
       LOGGER.info(
           "{} [SizeTiredCompactionTask] compact finish, close the logger", fullStorageGroupName);
       sizeTieredCompactionLogger.close();
-    } finally {
-      for (TsFileResource resource : selectedTsFileResourceList) {
-        resource.setMerging(false);
+
+      LOGGER.info(
+          "{} [Compaction] compaction finish, start to delete old files", fullStorageGroupName);
+      if (Thread.currentThread().isInterrupted()) {
+        throw new InterruptedException(
+            String.format("%s [Compaction] abort", fullStorageGroupName));
       }
-    }
-    LOGGER.info(
-        "{} [Compaction] compaction finish, start to delete old files", fullStorageGroupName);
-    if (Thread.currentThread().isInterrupted()) {
-      throw new InterruptedException(String.format("%s [Compaction] abort", fullStorageGroupName));
-    }
-    // get write lock for TsFileResource list with timeout
-    try {
-      tsFileManager.writeLockWithTimeout("size-tired compaction", 60_000);
-    } catch (WriteLockFailedException e) {
-      // if current compaction thread couldn't get writelock
-      // a WriteLockFailException will be thrown, then terminate the thread itself
-      LOGGER.warn(
-          "{} [SizeTiredCompactionTask] failed to get write lock, abort the task and delete the target file {}",
+      // get write lock for TsFileResource list with timeout
+      try {
+        tsFileManager.writeLockWithTimeout("size-tired compaction", 60_000);
+      } catch (WriteLockFailedException e) {
+        // if current compaction thread couldn't get writelock
+        // a WriteLockFailException will be thrown, then terminate the thread itself
+        LOGGER.warn(
+            "{} [SizeTiredCompactionTask] failed to get write lock, abort the task and delete the target file {}",
+            fullStorageGroupName,
+            targetTsFileResource.getTsFile(),
+            e);
+        targetTsFileResource.getTsFile().delete();
+        logFile.delete();
+        throw new InterruptedException(
+            String.format(
+                "%s [Compaction] compaction abort because cannot acquire write lock",
+                fullStorageGroupName));
+      }
+      try {
+        // replace the old files with new file, the new is in same position as the old
+        for (TsFileResource resource : selectedTsFileResourceList) {
+          TsFileResourceManager.getInstance().removeTsFileResource(resource);
+        }
+        tsFileResourceList.insertBefore(selectedTsFileResourceList.get(0), targetTsFileResource);
+        TsFileResourceManager.getInstance().registerSealedTsFileResource(targetTsFileResource);
+        for (TsFileResource resource : selectedTsFileResourceList) {
+          tsFileResourceList.remove(resource);
+        }
+      } finally {
+        tsFileManager.writeUnlock();
+      }
+      // delete the old files
+      InnerSpaceCompactionUtils.deleteTsFilesInDisk(
+          selectedTsFileResourceList, fullStorageGroupName);
+      LOGGER.info(
+          "{} [SizeTiredCompactionTask] old file deleted, start to rename mods file",
+          fullStorageGroupName);
+      combineModsInCompaction(selectedTsFileResourceList, targetTsFileResource);
+      long costTime = System.currentTimeMillis() - startTime;
+      LOGGER.info(
+          "{} [SizeTiredCompactionTask] all compaction task finish, target file is {},"
+              + "time cost is {} s",
           fullStorageGroupName,
-          targetTsFileResource.getTsFile(),
-          e);
-      targetTsFileResource.getTsFile().delete();
-      logFile.delete();
-      throw new InterruptedException(
-          String.format(
-              "%s [Compaction] compaction abort because cannot acquire write lock",
-              fullStorageGroupName));
-    }
-    try {
-      // replace the old files with new file, the new is in same position as the old
-      for (TsFileResource resource : selectedTsFileResourceList) {
-        TsFileResourceManager.getInstance().removeTsFileResource(resource);
+          targetFileName,
+          costTime / 1000);
+      if (logFile.exists()) {
+        logFile.delete();
       }
-      tsFileResourceList.insertBefore(selectedTsFileResourceList.get(0), targetTsFileResource);
-      TsFileResourceManager.getInstance().registerSealedTsFileResource(targetTsFileResource);
+    } finally {
       for (TsFileResource resource : selectedTsFileResourceList) {
-        tsFileResourceList.remove(resource);
+        resource.setMerging(false);
       }
-    } finally {
-      tsFileManager.writeUnlock();
-    }
-    // delete the old files
-    InnerSpaceCompactionUtils.deleteTsFilesInDisk(selectedTsFileResourceList, fullStorageGroupName);
-    LOGGER.info(
-        "{} [SizeTiredCompactionTask] old file deleted, start to rename mods file",
-        fullStorageGroupName);
-    combineModsInCompaction(selectedTsFileResourceList, targetTsFileResource);
-    long costTime = System.currentTimeMillis() - startTime;
-    LOGGER.info(
-        "{} [SizeTiredCompactionTask] all compaction task finish, target file is {},"
-            + "time cost is {} s",
-        fullStorageGroupName,
-        targetFileName,
-        costTime / 1000);
-    if (logFile.exists()) {
-      logFile.delete();
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
index b1d9128..4bf4d46 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.engine.compaction.task;
 
 import org.apache.iotdb.db.engine.compaction.CompactionScheduler;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.cross.inplace.InplaceCompactionRecoverTask;
 import org.apache.iotdb.db.engine.compaction.inner.sizetiered.SizeTieredCompactionRecoverTask;
 
@@ -61,6 +62,7 @@ public abstract class AbstractCompactionTask implements Callable<Void> {
       if (!(this instanceof InplaceCompactionRecoverTask)
           && !(this instanceof SizeTieredCompactionRecoverTask)) {
         CompactionScheduler.decPartitionCompaction(fullStorageGroupName, timePartition);
+        CompactionTaskManager.getInstance().removeRunningTaskFromList(this);
       }
       this.currentTaskNum.decrementAndGet();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionRecoverTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionRecoverTask.java
index 948e80f..63526d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionRecoverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionRecoverTask.java
@@ -66,7 +66,7 @@ public class CompactionRecoverTask implements Callable<Void> {
     compactionRecoverCallBack.call();
     logger.info(
         "recover task finish, current compaction thread is {}",
-        CompactionTaskManager.getInstance().getTaskCount());
+        CompactionTaskManager.getInstance().getExecutingTaskCount());
     return null;
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
index b871f15..8082e19 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
@@ -94,7 +94,7 @@ public class CompactionSchedulerTest {
     }
     MergeManager.getINSTANCE().start();
     CompactionTaskManager.getInstance().start();
-    while (CompactionTaskManager.getInstance().getTaskCount() > 0) {
+    while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) {
       try {
         Thread.sleep(1000);
       } catch (InterruptedException e) {
@@ -216,7 +216,7 @@ public class CompactionSchedulerTest {
         e.printStackTrace();
       }
     }
-    while (CompactionTaskManager.getInstance().getTaskCount() > 0) {
+    while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) {
       try {
         Thread.sleep(10);
       } catch (InterruptedException e) {
@@ -338,14 +338,14 @@ public class CompactionSchedulerTest {
         e.printStackTrace();
       }
     }
-    while (CompactionTaskManager.getInstance().getTaskCount() > 0) {
+    while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) {
       try {
         Thread.sleep(10);
       } catch (InterruptedException e) {
 
       }
     }
-    while (CompactionTaskManager.getInstance().getTaskCount() > 0) {
+    while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) {
       try {
         Thread.sleep(10);
       } catch (InterruptedException e) {
@@ -468,7 +468,7 @@ public class CompactionSchedulerTest {
       }
     }
 
-    while (CompactionTaskManager.getInstance().getTaskCount() > 0) {
+    while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) {
       try {
         Thread.sleep(10);
       } catch (InterruptedException e) {
@@ -566,7 +566,7 @@ public class CompactionSchedulerTest {
           fullPath, chunkPagePointsNum, 100 * i + 50, tsFileResource);
       tsFileManager.add(tsFileResource, false);
     }
-    while (CompactionTaskManager.getInstance().getTaskCount() > 0) {
+    while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) {
       try {
         Thread.sleep(10);
       } catch (InterruptedException e) {
@@ -723,7 +723,7 @@ public class CompactionSchedulerTest {
       }
     }
 
-    while (CompactionTaskManager.getInstance().getTaskCount() > 0) {
+    while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) {
       try {
         Thread.sleep(10);
       } catch (InterruptedException e) {
@@ -968,7 +968,7 @@ public class CompactionSchedulerTest {
         e.printStackTrace();
       }
     }
-    while (CompactionTaskManager.getInstance().getTaskCount() > 0) {
+    while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) {
       try {
         Thread.sleep(10);
       } catch (InterruptedException e) {
@@ -1064,7 +1064,7 @@ public class CompactionSchedulerTest {
       tsFileManager.add(tsFileResource, false);
     }
 
-    while (CompactionTaskManager.getInstance().getTaskCount() > 0) {
+    while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) {
       try {
         Thread.sleep(10);
       } catch (InterruptedException e) {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
new file mode 100644
index 0000000..46098c4
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction;
+
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.compaction.inner.InnerCompactionTest;
+import org.apache.iotdb.db.engine.compaction.inner.sizetiered.SizeTieredCompactionTask;
+import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask;
+import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class CompactionTaskManagerTest extends InnerCompactionTest {
+  static final Logger logger = LoggerFactory.getLogger(CompactionTaskManagerTest.class);
+  File tempSGDir;
+  final long MAX_WAITING_TIME = 120_000;
+
+  @Before
+  public void setUp() throws Exception {
+    tempSGDir = new File(TestConstant.getTestTsFileDir("root.compactionTest", 0, 0));
+    if (tempSGDir.exists()) {
+      FileUtils.deleteDirectory(tempSGDir);
+    }
+    Assert.assertTrue(tempSGDir.mkdirs());
+    super.setUp();
+  }
+
+  @Test
+  public void testRepeatedSubmitBeforeExecution() throws Exception {
+    logger.warn("testRepeatedSubmitBeforeExecution");
+    TsFileManager tsFileManager =
+        new TsFileManager("root.compactionTest", "0", tempSGDir.getAbsolutePath());
+    tsFileManager.addAll(seqResources, true);
+    SizeTieredCompactionTask task1 =
+        new SizeTieredCompactionTask(
+            "root.compactionTest",
+            "0",
+            0,
+            tsFileManager,
+            tsFileManager.getSequenceListByTimePartition(0),
+            seqResources,
+            true,
+            new AtomicInteger(0));
+    SizeTieredCompactionTask task2 =
+        new SizeTieredCompactionTask(
+            "root.compactionTest",
+            "0",
+            0,
+            tsFileManager,
+            tsFileManager.getSequenceListByTimePartition(0),
+            seqResources,
+            true,
+            new AtomicInteger(0));
+    tsFileManager.writeLock("test");
+    CompactionTaskManager manager = CompactionTaskManager.getInstance();
+    try {
+      Assert.assertTrue(manager.addTaskToWaitingQueue(task1));
+      Assert.assertEquals(manager.getTotalTaskCount(), 1);
+      // a same task should not be submitted compaction task manager
+      Assert.assertFalse(manager.addTaskToWaitingQueue(task2));
+      Assert.assertEquals(manager.getTotalTaskCount(), 1);
+      manager.submitTaskFromTaskQueue();
+    } finally {
+      tsFileManager.writeUnlock();
+    }
+    Thread.sleep(5000);
+    Assert.assertEquals(0, manager.getTotalTaskCount());
+    long waitingTime = 0;
+    while (manager.getRunningCompactionTaskList().size() > 0) {
+      Thread.sleep(100);
+      waitingTime += 100;
+      if (waitingTime % 10000 == 0) {
+        logger.warn("{}", manager.getRunningCompactionTaskList());
+      }
+      if (waitingTime > MAX_WAITING_TIME) {
+        Assert.fail();
+      }
+    }
+  }
+
+  @Test
+  public void testRepeatedSubmitWhenExecuting() throws Exception {
+    logger.warn("testRepeatedSubmitWhenExecuting");
+    TsFileManager tsFileManager =
+        new TsFileManager("root.compactionTest", "0", tempSGDir.getAbsolutePath());
+    tsFileManager.addAll(seqResources, true);
+    SizeTieredCompactionTask task1 =
+        new SizeTieredCompactionTask(
+            "root.compactionTest",
+            "0",
+            0,
+            tsFileManager,
+            tsFileManager.getSequenceListByTimePartition(0),
+            seqResources,
+            true,
+            new AtomicInteger(0));
+    SizeTieredCompactionTask task2 =
+        new SizeTieredCompactionTask(
+            "root.compactionTest",
+            "0",
+            0,
+            tsFileManager,
+            tsFileManager.getSequenceListByTimePartition(0),
+            seqResources,
+            true,
+            new AtomicInteger(0));
+    tsFileManager.writeLock("test");
+    try {
+      CompactionTaskManager manager = CompactionTaskManager.getInstance();
+      manager.addTaskToWaitingQueue(task1);
+      manager.submitTaskFromTaskQueue();
+      Thread.sleep(2000);
+      // When a same compaction task is executing, the compaction task should not be submitted!
+      Assert.assertEquals(manager.getExecutingTaskCount(), 1);
+      Assert.assertFalse(manager.addTaskToWaitingQueue(task2));
+    } finally {
+      tsFileManager.writeUnlock();
+    }
+    long waitingTime = 0;
+    while (CompactionTaskManager.getInstance().getRunningCompactionTaskList().size() > 0) {
+      Thread.sleep(100);
+      waitingTime += 100;
+      if (waitingTime % 10000 == 0) {
+        logger.warn("{}", CompactionTaskManager.getInstance().getRunningCompactionTaskList());
+      }
+      if (waitingTime > MAX_WAITING_TIME) {
+        Assert.fail();
+      }
+    }
+  }
+
+  @Test
+  public void testRepeatedSubmitAfterExecution() throws Exception {
+    logger.warn("testRepeatedSubmitAfterExecution");
+    TsFileManager tsFileManager =
+        new TsFileManager("root.compactionTest", "0", tempSGDir.getAbsolutePath());
+    tsFileManager.addAll(seqResources, true);
+    SizeTieredCompactionTask task1 =
+        new SizeTieredCompactionTask(
+            "root.compactionTest",
+            "0",
+            0,
+            tsFileManager,
+            tsFileManager.getSequenceListByTimePartition(0),
+            seqResources,
+            true,
+            new AtomicInteger(0));
+    SizeTieredCompactionTask task2 =
+        new SizeTieredCompactionTask(
+            "root.compactionTest",
+            "0",
+            0,
+            tsFileManager,
+            tsFileManager.getSequenceListByTimePartition(0),
+            seqResources,
+            true,
+            new AtomicInteger(0));
+    CompactionTaskManager manager = CompactionTaskManager.getInstance();
+    manager.addTaskToWaitingQueue(task1);
+    manager.submitTaskFromTaskQueue();
+    while (manager.getTotalTaskCount() > 0) {
+      Thread.sleep(10);
+    }
+    tsFileManager.writeLock("test");
+    // an invalid task can be submitted to waiting queue, but should not be submitted to thread pool
+    Assert.assertTrue(manager.addTaskToWaitingQueue(task2));
+    manager.submitTaskFromTaskQueue();
+    Assert.assertEquals(manager.getExecutingTaskCount(), 0);
+    long waitingTime = 0;
+    while (manager.getRunningCompactionTaskList().size() > 0) {
+      Thread.sleep(100);
+      waitingTime += 100;
+      if (waitingTime % 10000 == 0) {
+        logger.warn("{}", manager.getRunningCompactionTaskList());
+      }
+      if (waitingTime > MAX_WAITING_TIME) {
+        Assert.fail();
+      }
+    }
+  }
+
+  @Test
+  public void testRemoveSelfFromRunningList() throws Exception {
+    logger.warn("testRemoveSelfFromRunningList");
+    TsFileManager tsFileManager =
+        new TsFileManager("root.compactionTest", "0", tempSGDir.getAbsolutePath());
+    tsFileManager.addAll(seqResources, true);
+    SizeTieredCompactionTask task1 =
+        new SizeTieredCompactionTask(
+            "root.compactionTest",
+            "0",
+            0,
+            tsFileManager,
+            tsFileManager.getSequenceListByTimePartition(0),
+            seqResources,
+            true,
+            new AtomicInteger(0));
+    CompactionTaskManager manager = CompactionTaskManager.getInstance();
+    tsFileManager.writeLock("test");
+    try {
+      manager.addTaskToWaitingQueue(task1);
+      manager.submitTaskFromTaskQueue();
+      Thread.sleep(5000);
+      List<AbstractCompactionTask> runningList = manager.getRunningCompactionTaskList();
+      // compaction task should add itself to running list
+      Assert.assertEquals(1, runningList.size());
+      Assert.assertTrue(runningList.contains(task1));
+    } finally {
+      tsFileManager.writeUnlock();
+    }
+    // after execution, task should remove itself from running list
+    Thread.sleep(5000);
+    List<AbstractCompactionTask> runningList = manager.getRunningCompactionTaskList();
+    Assert.assertEquals(0, runningList.size());
+    long waitingTime = 0;
+    while (manager.getRunningCompactionTaskList().size() > 0) {
+      Thread.sleep(100);
+      waitingTime += 100;
+      if (waitingTime % 10000 == 0) {
+        logger.warn("{}", manager.getRunningCompactionTaskList());
+      }
+      if (waitingTime > MAX_WAITING_TIME) {
+        Assert.fail();
+      }
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionChunkTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionChunkTest.java
index cad4215..3f29a44 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionChunkTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionChunkTest.java
@@ -26,8 +26,6 @@ import org.apache.iotdb.db.engine.compaction.inner.utils.InnerSpaceCompactionUti
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -62,7 +60,7 @@ public class InnerCompactionChunkTest extends InnerCompactionTest {
   File tempSGDir;
 
   @Before
-  public void setUp() throws IOException, WriteProcessException, MetadataException {
+  public void setUp() throws Exception {
     tempSGDir = new File(TestConstant.getTestTsFileDir("root.compactionTest", 0, 0));
     if (tempSGDir.exists()) {
       FileUtils.deleteDirectory(tempSGDir);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionLogTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionLogTest.java
index 1f5bc24..35229e2 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionLogTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionLogTest.java
@@ -23,8 +23,6 @@ import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.compaction.CompactionScheduler;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
 import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 
 import org.apache.commons.io.FileUtils;
@@ -45,7 +43,7 @@ public class InnerCompactionLogTest extends InnerCompactionTest {
 
   @Override
   @Before
-  public void setUp() throws IOException, WriteProcessException, MetadataException {
+  public void setUp() throws Exception {
     tempSGDir = new File(TestConstant.getTestTsFileDir("root.compactionTest", 0, 0));
     if (!tempSGDir.exists()) {
       Assert.assertTrue(tempSGDir.mkdirs());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionMoreDataTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionMoreDataTest.java
index 572e7de..256327a 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionMoreDataTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionMoreDataTest.java
@@ -182,7 +182,7 @@ public class InnerCompactionMoreDataTest extends InnerCompactionTest {
   }
 
   @Before
-  public void setUp() throws IOException, WriteProcessException, MetadataException {
+  public void setUp() throws Exception {
     tempSGDir = new File(TestConstant.getTestTsFileDir("root.compactionTest", 0, 0));
     if (!tempSGDir.exists()) {
       Assert.assertTrue(tempSGDir.mkdirs());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
index f935720..e1544fa 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
@@ -82,7 +82,7 @@ public class InnerCompactionSchedulerTest {
     CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
 
     try {
-      Thread.sleep(1000);
+      Thread.sleep(5000);
     } catch (Exception e) {
 
     }
@@ -113,7 +113,7 @@ public class InnerCompactionSchedulerTest {
     CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
 
     long waitingTime = 0;
-    while (CompactionTaskManager.getInstance().getTaskCount() != 0) {
+    while (CompactionTaskManager.getInstance().getExecutingTaskCount() != 0) {
       try {
         Thread.sleep(100);
         waitingTime += 100;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionTest.java
index c7aca33..df023c1 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionTest.java
@@ -76,7 +76,7 @@ public abstract class InnerCompactionTest {
   private int prevMergeChunkThreshold;
 
   @Before
-  public void setUp() throws IOException, WriteProcessException, MetadataException {
+  public void setUp() throws IOException, WriteProcessException, MetadataException, Exception {
     EnvironmentUtils.envSetUp();
     IoTDB.metaManager.init();
     prevMergeChunkThreshold =
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionUtilsTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionUtilsTest.java
index eb5b69c..4191a40 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionUtilsTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionUtilsTest.java
@@ -26,8 +26,6 @@ import org.apache.iotdb.db.engine.compaction.inner.utils.SizeTieredCompactionLog
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.read.TsFileReader;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -54,7 +52,7 @@ public class InnerSpaceCompactionUtilsTest extends InnerCompactionTest {
 
   @Override
   @Before
-  public void setUp() throws IOException, WriteProcessException, MetadataException {
+  public void setUp() throws Exception {
     tempSGDir = new File(TestConstant.getTestTsFileDir("root.compactionTest", 0, 0));
     if (!tempSGDir.exists()) {
       assertTrue(tempSGDir.mkdirs());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/task/FakedInnerSpaceCompactionTask.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/task/FakedInnerSpaceCompactionTask.java
index 50af7a5..b43fcc1 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/task/FakedInnerSpaceCompactionTask.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/task/FakedInnerSpaceCompactionTask.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.engine.compaction.task;
 
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.inner.sizetiered.SizeTieredCompactionTask;
 import org.apache.iotdb.db.engine.storagegroup.FakedTsFileResource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
@@ -53,28 +54,37 @@ public class FakedInnerSpaceCompactionTask extends SizeTieredCompactionTask {
 
   @Override
   protected void doCompaction() throws IOException {
-    TsFileNameGenerator.TsFileName name =
-        TsFileNameGenerator.getTsFileName(selectedTsFileResourceList.get(0).getTsFile().getName());
-    String newName =
-        TsFileNameGenerator.generateNewTsFileName(
-            name.getTime(),
-            name.getVersion(),
-            name.getInnerCompactionCnt() + 1,
-            name.getCrossCompactionCnt());
-    FakedTsFileResource targetTsFileResource = new FakedTsFileResource(0, newName);
-    long targetFileSize = 0;
-    for (TsFileResource resource : selectedTsFileResourceList) {
-      targetFileSize += resource.getTsFileSize();
-    }
-    targetTsFileResource.setTsFileSize(targetFileSize);
-    this.tsFileResourceList.insertBefore(selectedTsFileResourceList.get(0), targetTsFileResource);
-    for (TsFileResource tsFileResource : selectedTsFileResourceList) {
-      this.tsFileResourceList.remove(tsFileResource);
+    try {
+      TsFileNameGenerator.TsFileName name =
+          TsFileNameGenerator.getTsFileName(
+              selectedTsFileResourceList.get(0).getTsFile().getName());
+      String newName =
+          TsFileNameGenerator.generateNewTsFileName(
+              name.getTime(),
+              name.getVersion(),
+              name.getInnerCompactionCnt() + 1,
+              name.getCrossCompactionCnt());
+      FakedTsFileResource targetTsFileResource = new FakedTsFileResource(0, newName);
+      long targetFileSize = 0;
+      for (TsFileResource resource : selectedTsFileResourceList) {
+        targetFileSize += resource.getTsFileSize();
+      }
+      targetTsFileResource.setTsFileSize(targetFileSize);
+      this.tsFileResourceList.insertBefore(selectedTsFileResourceList.get(0), targetTsFileResource);
+      for (TsFileResource tsFileResource : selectedTsFileResourceList) {
+        this.tsFileResourceList.remove(tsFileResource);
+      }
+    } finally {
+      CompactionTaskManager.getInstance().removeRunningTaskFromList(this);
     }
   }
 
   @Override
   public boolean equalsOtherTask(AbstractCompactionTask otherTask) {
+    if (otherTask instanceof FakedInnerSpaceCompactionTask) {
+      FakedInnerSpaceCompactionTask fakedOtherTask = (FakedInnerSpaceCompactionTask) otherTask;
+      return this.selectedTsFileResourceList.equals(fakedOtherTask.selectedTsFileResourceList);
+    }
     return false;
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FakedTsFileResource.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FakedTsFileResource.java
index 7e3dede..d1d8ee6 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FakedTsFileResource.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FakedTsFileResource.java
@@ -73,4 +73,15 @@ public class FakedTsFileResource extends TsFileResource {
   public File getTsFile() {
     return new File(fakeTsfileName);
   }
+
+  @Override
+  public boolean equals(Object otherObject) {
+    if (otherObject instanceof FakedTsFileResource) {
+      FakedTsFileResource otherResource = (FakedTsFileResource) otherObject;
+      return this.fakeTsfileName.equals(otherResource.fakeTsfileName)
+          && this.tsFileSize == otherResource.tsFileSize;
+    }
+
+    return false;
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index f5fec46..40e96fc 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -620,7 +620,7 @@ public class StorageGroupProcessorTest {
     processor.syncCloseAllWorkingTsFileProcessors();
     processor.merge(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
     long totalWaitingTime = 0;
-    while (CompactionTaskManager.getInstance().getTaskCount() > 0) {
+    while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) {
       // wait
       try {
         Thread.sleep(100);
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBNewTsFileCompactionIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBNewTsFileCompactionIT.java
index 46f5309..cdf6c27 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBNewTsFileCompactionIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBNewTsFileCompactionIT.java
@@ -1052,7 +1052,7 @@ public class IoTDBNewTsFileCompactionIT {
 
     long startTime = System.nanoTime();
     // get the size of level 1's tsfile list to judge whether merge is finished
-    while (CompactionTaskManager.getInstance().getTaskCount() != 0) {
+    while (CompactionTaskManager.getInstance().getExecutingTaskCount() != 0) {
       TimeUnit.MILLISECONDS.sleep(100);
       // wait too long, just break
       if ((System.nanoTime() - startTime) >= MAX_WAIT_TIME_FOR_MERGE) {

[iotdb] 04/10: [IOTDB-2010] fix incomplete show timeseries result (#4388)

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master_performance
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0d0363ad1a3db3e15e8f00ee6eb79c3c0292a8d0
Author: Zhong Wang <wa...@alibaba-inc.com>
AuthorDate: Fri Nov 26 10:41:12 2021 +0800

    [IOTDB-2010] fix incomplete show timeseries result (#4388)
    
    * [IOTDB-2010] fix incomplete show timeseries result
    
    * Self-review
    
    1. drop duplicated show timeseries results
    2. fix coding style issue
---
 .../apache/iotdb/cluster/metadata/CMManager.java   |  68 +++++------
 .../handlers/caller/ShowTimeSeriesHandler.java     | 133 +++++++++++++++++++++
 2 files changed, 163 insertions(+), 38 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index e2d9280..ab81bd4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
 import org.apache.iotdb.cluster.rpc.thrift.GetAllPathsResult;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+import org.apache.iotdb.cluster.server.handlers.caller.ShowTimeSeriesHandler;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.utils.ClusterQueryUtils;
@@ -1392,19 +1393,18 @@ public class CMManager extends MManager {
   @Override
   public List<ShowTimeSeriesResult> showTimeseries(ShowTimeSeriesPlan plan, QueryContext context)
       throws MetadataException {
-    ConcurrentSkipListSet<ShowTimeSeriesResult> resultSet = new ConcurrentSkipListSet<>();
     ExecutorService pool =
         new ThreadPoolExecutor(
             THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
 
-    List<PartitionGroup> globalGroups = new ArrayList<>();
+    List<PartitionGroup> groups = new ArrayList<>();
     try {
       PartitionGroup partitionGroup =
           metaGroupMember.getPartitionTable().partitionByPathTime(plan.getPath(), 0);
-      globalGroups.add(partitionGroup);
+      groups.add(partitionGroup);
     } catch (MetadataException e) {
       // if the path location is not find, obtain the path location from all groups.
-      globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups();
+      groups = metaGroupMember.getPartitionTable().getGlobalGroups();
     }
 
     int limit = plan.getLimit() == 0 ? Integer.MAX_VALUE : plan.getLimit();
@@ -1421,35 +1421,31 @@ public class CMManager extends MManager {
     }
 
     if (logger.isDebugEnabled()) {
-      logger.debug(
-          "Fetch timeseries schemas of {} from {} groups", plan.getPath(), globalGroups.size());
+      logger.debug("Fetch timeseries schemas of {} from {} groups", plan.getPath(), groups.size());
     }
 
+    ShowTimeSeriesHandler handler = new ShowTimeSeriesHandler(groups.size(), plan.getPath());
     List<Future<Void>> futureList = new ArrayList<>();
-    for (PartitionGroup group : globalGroups) {
+    for (PartitionGroup group : groups) {
       futureList.add(
           pool.submit(
               () -> {
-                try {
-                  showTimeseries(group, plan, resultSet, context);
-                } catch (CheckConsistencyException | MetadataException e) {
-                  logger.error("Cannot get show timeseries result of {} from {}", plan, group);
-                }
+                showTimeseries(group, plan, context, handler);
                 return null;
               }));
     }
 
     waitForThreadPool(futureList, pool, "showTimeseries()");
     List<ShowTimeSeriesResult> showTimeSeriesResults =
-        applyShowTimeseriesLimitOffset(resultSet, limit, offset);
+        applyShowTimeseriesLimitOffset(handler.getResult(), limit, offset);
     logger.debug("Show {} has {} results", plan.getPath(), showTimeSeriesResults.size());
     return showTimeSeriesResults;
   }
 
   private List<ShowTimeSeriesResult> applyShowTimeseriesLimitOffset(
-      ConcurrentSkipListSet<ShowTimeSeriesResult> resultSet, int limit, int offset) {
+      List<ShowTimeSeriesResult> results, int limit, int offset) {
     List<ShowTimeSeriesResult> showTimeSeriesResults = new ArrayList<>();
-    Iterator<ShowTimeSeriesResult> iterator = resultSet.iterator();
+    Iterator<ShowTimeSeriesResult> iterator = results.iterator();
     while (iterator.hasNext() && limit > 0) {
       if (offset > 0) {
         offset--;
@@ -1482,13 +1478,12 @@ public class CMManager extends MManager {
   private void showTimeseries(
       PartitionGroup group,
       ShowTimeSeriesPlan plan,
-      Set<ShowTimeSeriesResult> resultSet,
-      QueryContext context)
-      throws CheckConsistencyException, MetadataException {
+      QueryContext context,
+      ShowTimeSeriesHandler handler) {
     if (group.contains(metaGroupMember.getThisNode())) {
-      showLocalTimeseries(group, plan, resultSet, context);
+      showLocalTimeseries(group, plan, context, handler);
     } else {
-      showRemoteTimeseries(group, plan, resultSet);
+      showRemoteTimeseries(group, plan, handler);
     }
   }
 
@@ -1521,28 +1516,21 @@ public class CMManager extends MManager {
   private void showLocalTimeseries(
       PartitionGroup group,
       ShowTimeSeriesPlan plan,
-      Set<ShowTimeSeriesResult> resultSet,
-      QueryContext context)
-      throws CheckConsistencyException, MetadataException {
-    DataGroupMember localDataMember =
-        metaGroupMember.getLocalDataMember(group.getHeader(), group.getRaftId());
-    localDataMember.syncLeaderWithConsistencyCheck(false);
+      QueryContext context,
+      ShowTimeSeriesHandler handler) {
     try {
+      DataGroupMember localDataMember =
+          metaGroupMember.getLocalDataMember(group.getHeader(), group.getRaftId());
+      localDataMember.syncLeaderWithConsistencyCheck(false);
       List<ShowTimeSeriesResult> localResult = super.showTimeseries(plan, context);
-      resultSet.addAll(localResult);
-      logger.debug(
-          "Fetched local timeseries {} schemas of {} from {}",
-          localResult.size(),
-          plan.getPath(),
-          group);
-    } catch (MetadataException e) {
-      logger.error("Cannot execute show timeseries plan  {} from {} locally.", plan, group);
-      throw e;
+      handler.onComplete(localResult);
+    } catch (MetadataException | CheckConsistencyException e) {
+      handler.onError(e);
     }
   }
 
   private void showRemoteTimeseries(
-      PartitionGroup group, ShowTimeSeriesPlan plan, Set<ShowTimeSeriesResult> resultSet) {
+      PartitionGroup group, ShowTimeSeriesPlan plan, ShowTimeSeriesHandler handler) {
     ByteBuffer resultBinary = null;
     for (Node node : group) {
       try {
@@ -1560,13 +1548,17 @@ public class CMManager extends MManager {
 
     if (resultBinary != null) {
       int size = resultBinary.getInt();
+      List<ShowTimeSeriesResult> results = new ArrayList<>();
       logger.debug(
           "Fetched remote timeseries {} schemas of {} from {}", size, plan.getPath(), group);
       for (int i = 0; i < size; i++) {
-        resultSet.add(ShowTimeSeriesResult.deserialize(resultBinary));
+        results.add(ShowTimeSeriesResult.deserialize(resultBinary));
       }
+      handler.onComplete(results);
     } else {
-      logger.error("Failed to execute show timeseries {} in group: {}.", plan, group);
+      String errMsg =
+          String.format("Failed to get timeseries in path %s from group %s", plan.getPath(), group);
+      handler.onError(new MetadataException(errMsg));
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ShowTimeSeriesHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ShowTimeSeriesHandler.java
new file mode 100644
index 0000000..4d38687
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ShowTimeSeriesHandler.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.server.handlers.caller;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/** Handler for getting the schemas from each data group concurrently. */
+public class ShowTimeSeriesHandler implements AsyncMethodCallback<List<ShowTimeSeriesResult>> {
+
+  private static class ShowTimeSeriesResultComparator implements Comparator<ShowTimeSeriesResult> {
+
+    @Override
+    public int compare(ShowTimeSeriesResult o1, ShowTimeSeriesResult o2) {
+      if (o1 == null && o2 == null) {
+        return 0;
+      } else if (o1 == null) {
+        return -1;
+      } else if (o2 == null) {
+        return 1;
+      }
+      return o1.getName().compareTo(o2.getName());
+    }
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(ShowTimeSeriesHandler.class);
+
+  /** String representation of a partial path for logging */
+  private final String path;
+
+  private final CountDownLatch countDownLatch;
+  private final long startTimeInMs;
+
+  private final Map<String, ShowTimeSeriesResult> timeSeriesNameToResult = new HashMap<>();
+  private final List<Exception> exceptions = new ArrayList<>();
+
+  public ShowTimeSeriesHandler(int numGroup, PartialPath path) {
+    this.countDownLatch = new CountDownLatch(numGroup);
+    this.path = path.toString();
+    this.startTimeInMs = System.currentTimeMillis();
+  }
+
+  @Override
+  public synchronized void onComplete(List<ShowTimeSeriesResult> response) {
+    for (ShowTimeSeriesResult r : response) {
+      timeSeriesNameToResult.put(r.getName(), r);
+    }
+    countDownLatch.countDown();
+    logger.debug(
+        "Got {} timeseries in path {}. Remaining count: {}",
+        response.size(),
+        path,
+        countDownLatch.getCount());
+  }
+
+  @Override
+  public synchronized void onError(Exception exception) {
+    exceptions.add(exception);
+    countDownLatch.countDown();
+    logger.error("Failed to get timeseries in path {} because of {}", path, exception.getMessage());
+  }
+
+  public List<ShowTimeSeriesResult> getResult() throws MetadataException {
+    if (!exceptions.isEmpty()) {
+      MetadataException e =
+          new MetadataException(
+              "Exception happened when getting the result."
+                  + " See the suppressed exceptions for causes.");
+      for (Exception exception : exceptions) {
+        e.addSuppressed(exception);
+      }
+      throw e;
+    }
+
+    // Wait for the results and ignore the interruptions.
+    long timeout = IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
+    while (System.currentTimeMillis() - startTimeInMs < timeout) {
+      try {
+        if (countDownLatch.await(
+            System.currentTimeMillis() - startTimeInMs, TimeUnit.MILLISECONDS)) {
+          break;
+        }
+      } catch (InterruptedException ignored) {
+      }
+    }
+
+    if (countDownLatch.getCount() != 0) {
+      String errMsg =
+          String.format(
+              "Failed to get the show timeseries result"
+                  + " since %d nodes didn't respond after %d ms",
+              countDownLatch.getCount(), timeout);
+      logger.error(errMsg);
+      throw new MetadataException(errMsg);
+    }
+
+    return timeSeriesNameToResult.values().stream()
+        .sorted(new ShowTimeSeriesResultComparator())
+        .collect(Collectors.toList());
+  }
+}

[iotdb] 01/10: [IOTDB-2027] Ignore too many WAL BufferOverflow log (#4467)

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master_performance
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5cef83ff3cb07579c4116d2c8557c1280f5d2a82
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Thu Nov 25 21:32:09 2021 +0800

    [IOTDB-2027] Ignore too many WAL BufferOverflow log (#4467)
---
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |  4 ++
 .../db/writelog/node/ExclusiveWriteLogNode.java    |  8 ++++
 .../apache/iotdb/db/writelog/WriteLogNodeTest.java | 49 ++++++++++++++++++++++
 3 files changed, 61 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 1951ba0..f77021a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -78,6 +78,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -198,6 +199,9 @@ public abstract class PhysicalPlan {
     } catch (UnsupportedOperationException e) {
       // ignore and throw
       throw e;
+    } catch (BufferOverflowException e) {
+      buffer.reset();
+      throw e;
     } catch (Exception e) {
       logger.error(
           "Rollback buffer entry because error occurs when serializing this physical plan.", e);
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index 5b281c6..a64d0f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -76,6 +76,8 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
 
   private final AtomicBoolean deleted = new AtomicBoolean(false);
 
+  private int bufferOverflowNum = 0;
+
   /**
    * constructor of ExclusiveWriteLogNode.
    *
@@ -129,6 +131,12 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
     try {
       plan.serialize(logBufferWorking);
     } catch (BufferOverflowException e) {
+      bufferOverflowNum++;
+      if (bufferOverflowNum > 200) {
+        logger.info(
+            "WAL bytebuffer overflows too many times. If this occurs frequently, please increase wal_buffer_size.");
+        bufferOverflowNum = 0;
+      }
       sync();
       plan.serialize(logBufferWorking);
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
index 1b24a05..dc6904e 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
@@ -399,4 +399,53 @@ public class WriteLogNodeTest {
       MmapUtil.clean((MappedByteBuffer) byteBuffer);
     }
   }
+
+  @Test
+  public void testBufferOverflowAndRewrite() throws IOException, IllegalPathException {
+    String identifier = "root.logTestDevice";
+
+    InsertRowPlan insertPlan =
+        new InsertRowPlan(
+            new PartialPath(identifier),
+            100,
+            new String[] {"s1", "s2", "s3", "s4"},
+            new TSDataType[] {
+              TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN
+            },
+            new String[] {"1.0", "15", "str", "false"});
+
+    // get InsertRowPlan byte size
+    ByteBuffer tmpBuffer =
+        ByteBuffer.allocate(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+    insertPlan.serialize(tmpBuffer);
+    int size = tmpBuffer.position();
+    // allocate buffers
+    ByteBuffer[] byteBuffers = new ByteBuffer[2];
+    byteBuffers[0] = ByteBuffer.allocateDirect(size + 1);
+    byteBuffers[1] = ByteBuffer.allocateDirect(size + 1);
+    WriteLogNode logNode = new ExclusiveWriteLogNode(identifier);
+    logNode.initBuffer(byteBuffers);
+    // write InsertRowPlan to WAL buffer
+    logNode.write(insertPlan);
+    insertPlan.setTime(200);
+    logNode.write(insertPlan);
+
+    logNode.close();
+
+    File walFile =
+        new File(config.getWalDir() + File.separator + identifier + File.separator + "wal1");
+    assertTrue(walFile.exists());
+
+    ILogReader reader = logNode.getLogReader();
+    insertPlan.setTime(100);
+    assertEquals(insertPlan, reader.next());
+    insertPlan.setTime(200);
+    assertEquals(insertPlan, reader.next());
+    reader.close();
+
+    ByteBuffer[] array = logNode.delete();
+    for (ByteBuffer byteBuffer : array) {
+      MmapUtil.clean((MappedByteBuffer) byteBuffer);
+    }
+  }
 }

[iotdb] 03/10: [IOTDB-1673] CLI refactor and upgrade to JLine3 (#4458)

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master_performance
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 89d5c48e9978d38463579d462a5be9e8f2e8b869
Author: Zhong Wang <wa...@alibaba-inc.com>
AuthorDate: Thu Nov 25 22:58:15 2021 +0800

    [IOTDB-1673] CLI refactor and upgrade to JLine3 (#4458)
    
    * [IOTDB-1673] CLI refactor
    
    1. Upgrade to Jline3.
    2. Remove the CLI implmentation for Windows since Jline3 is platform independent.
    3. Support persisted command history.
    4. Support multi-line edition.
    5. Exit by pressing CTRL+D or CTRL+C twice.
    6. Add auto pair and auto suggestion widgets.
    7. Add keyword highlighter and completer.
    
    * Resolve comment
    
    1. Update LICENSE-binary
---
 LICENSE-binary                                     |   6 +-
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4    |   4 +-
 cli/pom.xml                                        |  11 +-
 cli/src/assembly/resources/sbin/start-cli.bat      |   2 +-
 cli/src/main/java/org/apache/iotdb/cli/Cli.java    |  40 +++--
 .../apache/iotdb/cli/IoTDBSyntaxHighlighter.java   |  76 +++++++++
 cli/src/main/java/org/apache/iotdb/cli/WinCli.java | 175 ---------------------
 .../org/apache/iotdb/cli/utils/JlineUtils.java     | 107 +++++++++++++
 .../org/apache/iotdb/tool/AbstractCsvTool.java     |   3 +-
 .../main/java/org/apache/iotdb/tool/ExportCsv.java |  19 ++-
 pom.xml                                            |   6 +-
 11 files changed, 240 insertions(+), 209 deletions(-)

diff --git a/LICENSE-binary b/LICENSE-binary
index ce5c2aa..de449d9 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -271,16 +271,12 @@ org.xerial.snappy:snappy-java:1.1.8.4
 io.airlift.airline:0.8
 net.minidev:accessors-smart:1.2
 
-BSD 2-Clause
-------------
-jline:jline:2.14.5
-
 
 BSD 3-Clause
 ------------
 org.antlr:antlr-runtime:4.8-1
 org.ow2.asm:asm:5.0.4
-org.jline:jline:3.10.0
+org.jline:jline:3.21.0
 
 
 MIT License
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4
index fdcd67f..be310fb 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4
@@ -23,9 +23,11 @@ lexer grammar IoTDBSqlLexer;
  * 1. Whitespace
  */
 
+// Instead of discarding whitespace completely, send them to a channel invisable to the parser, so
+// that the lexer could still produce WS tokens for the CLI's highlighter.
 WS
     :
-    [ \u000B\t\r\n]+ -> skip
+    [ \u000B\t\r\n]+ -> channel(HIDDEN)
     ;
 
 
diff --git a/cli/pom.xml b/cli/pom.xml
index 9bcb861..741ff6d 100644
--- a/cli/pom.xml
+++ b/cli/pom.xml
@@ -66,10 +66,19 @@
             <version>0.9.2</version>
         </dependency>
         <dependency>
-            <groupId>jline</groupId>
+            <groupId>org.jline</groupId>
             <artifactId>jline</artifactId>
         </dependency>
         <dependency>
+            <groupId>net.java.dev.jna</groupId>
+            <artifactId>jna</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-antlr</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.iotdb</groupId>
             <artifactId>iotdb-server</artifactId>
             <version>${project.version}</version>
diff --git a/cli/src/assembly/resources/sbin/start-cli.bat b/cli/src/assembly/resources/sbin/start-cli.bat
index 99da3fd..3868786 100644
--- a/cli/src/assembly/resources/sbin/start-cli.bat
+++ b/cli/src/assembly/resources/sbin/start-cli.bat
@@ -28,7 +28,7 @@ pushd %~dp0..
 if NOT DEFINED IOTDB_CLI_HOME set IOTDB_CLI_HOME=%CD%
 popd
 
-if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.cli.WinCli
+if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.cli.Cli
 if NOT DEFINED JAVA_HOME goto :err
 
 @REM -----------------------------------------------------------------------------
diff --git a/cli/src/main/java/org/apache/iotdb/cli/Cli.java b/cli/src/main/java/org/apache/iotdb/cli/Cli.java
index 6e57d27..6d7675f 100644
--- a/cli/src/main/java/org/apache/iotdb/cli/Cli.java
+++ b/cli/src/main/java/org/apache/iotdb/cli/Cli.java
@@ -18,12 +18,12 @@
  */
 package org.apache.iotdb.cli;
 
+import org.apache.iotdb.cli.utils.JlineUtils;
 import org.apache.iotdb.exception.ArgsErrorException;
 import org.apache.iotdb.jdbc.Config;
 import org.apache.iotdb.jdbc.IoTDBConnection;
 import org.apache.iotdb.rpc.RpcUtils;
 
-import jline.console.ConsoleReader;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.DefaultParser;
@@ -31,6 +31,9 @@ import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.thrift.TException;
+import org.jline.reader.EndOfFileException;
+import org.jline.reader.LineReader;
+import org.jline.reader.UserInterruptException;
 
 import java.io.IOException;
 import java.sql.DriverManager;
@@ -42,6 +45,7 @@ import static org.apache.iotdb.cli.utils.IoTPrinter.println;
 public class Cli extends AbstractCli {
 
   private static CommandLine commandLine;
+  private static LineReader lineReader;
 
   /**
    * IoTDB Client main function.
@@ -49,7 +53,7 @@ public class Cli extends AbstractCli {
    * @param args launch arguments
    * @throws ClassNotFoundException ClassNotFoundException
    */
-  public static void main(String[] args) throws ClassNotFoundException {
+  public static void main(String[] args) throws ClassNotFoundException, IOException {
     Class.forName(Config.JDBC_DRIVER_NAME);
     Options options = createOptions();
     HelpFormatter hf = new HelpFormatter();
@@ -72,6 +76,7 @@ public class Cli extends AbstractCli {
       return;
     }
 
+    lineReader = JlineUtils.getLineReader();
     serve();
   }
 
@@ -129,13 +134,10 @@ public class Cli extends AbstractCli {
           println(IOTDB_CLI_PREFIX + "> can't execute sql because" + e.getMessage());
         }
       }
-      try (ConsoleReader reader = new ConsoleReader()) {
-        reader.setExpandEvents(false);
-        if (password == null) {
-          password = reader.readLine("please input your password:", '\0');
-        }
-        receiveCommands(reader);
+      if (password == null) {
+        password = lineReader.readLine("please input your password:", '\0');
       }
+      receiveCommands(lineReader);
     } catch (ArgsErrorException e) {
       println(IOTDB_CLI_PREFIX + "> input params error because" + e.getMessage());
     } catch (Exception e) {
@@ -143,7 +145,7 @@ public class Cli extends AbstractCli {
     }
   }
 
-  private static void receiveCommands(ConsoleReader reader) throws TException, IOException {
+  private static void receiveCommands(LineReader reader) throws TException {
     try (IoTDBConnection connection =
         (IoTDBConnection)
             DriverManager.getConnection(
@@ -157,10 +159,22 @@ public class Cli extends AbstractCli {
       displayLogo(properties.getVersion());
       println(IOTDB_CLI_PREFIX + "> login successfully");
       while (true) {
-        s = reader.readLine(IOTDB_CLI_PREFIX + "> ", null);
-        boolean continues = processCommand(s, connection);
-        if (!continues) {
-          break;
+        try {
+          s = reader.readLine(IOTDB_CLI_PREFIX + "> ", null);
+          boolean continues = processCommand(s, connection);
+          if (!continues) {
+            break;
+          }
+        } catch (UserInterruptException e) {
+          // Exit on signal INT requires confirmation.
+          try {
+            reader.readLine("Press CTRL+C again to exit, or press ENTER to continue", '\0');
+          } catch (UserInterruptException | EndOfFileException e2) {
+            System.exit(0);
+          }
+        } catch (EndOfFileException e) {
+          // Exit on EOF (usually by pressing CTRL+D).
+          System.exit(0);
         }
       }
     } catch (SQLException e) {
diff --git a/cli/src/main/java/org/apache/iotdb/cli/IoTDBSyntaxHighlighter.java b/cli/src/main/java/org/apache/iotdb/cli/IoTDBSyntaxHighlighter.java
new file mode 100644
index 0000000..82e9866
--- /dev/null
+++ b/cli/src/main/java/org/apache/iotdb/cli/IoTDBSyntaxHighlighter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cli;
+
+import org.apache.iotdb.cli.utils.JlineUtils;
+import org.apache.iotdb.db.qp.sql.IoTDBSqlLexer;
+
+import org.antlr.v4.runtime.CharStream;
+import org.antlr.v4.runtime.CharStreams;
+import org.antlr.v4.runtime.Token;
+import org.jline.reader.Highlighter;
+import org.jline.reader.LineReader;
+import org.jline.utils.AttributedString;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+
+import java.util.regex.Pattern;
+
+import static org.jline.utils.AttributedStyle.DEFAULT;
+import static org.jline.utils.AttributedStyle.GREEN;
+
+public class IoTDBSyntaxHighlighter implements Highlighter {
+
+  private static final AttributedStyle KEYWORD_STYLE = DEFAULT.foreground(GREEN);
+
+  @Override
+  public AttributedString highlight(LineReader reader, String buffer) {
+    CharStream stream = CharStreams.fromString(buffer);
+    IoTDBSqlLexer tokenSource = new IoTDBSqlLexer(stream);
+    tokenSource.removeErrorListeners();
+    AttributedStringBuilder builder = new AttributedStringBuilder();
+    while (true) {
+      Token token = tokenSource.nextToken();
+      int type = token.getType();
+      if (type == Token.EOF) {
+        break;
+      }
+      String text = token.getText();
+
+      if (isKeyword(text)) {
+        builder.styled(KEYWORD_STYLE, text);
+      } else {
+        builder.append(text);
+      }
+    }
+
+    return builder.toAttributedString();
+  }
+
+  @Override
+  public void setErrorPattern(Pattern errorPattern) {}
+
+  @Override
+  public void setErrorIndex(int errorIndex) {}
+
+  private boolean isKeyword(String token) {
+    return JlineUtils.SQL_KEYWORDS.contains(token.toUpperCase());
+  }
+}
diff --git a/cli/src/main/java/org/apache/iotdb/cli/WinCli.java b/cli/src/main/java/org/apache/iotdb/cli/WinCli.java
deleted file mode 100644
index 7d9e938..0000000
--- a/cli/src/main/java/org/apache/iotdb/cli/WinCli.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.cli;
-
-import org.apache.iotdb.exception.ArgsErrorException;
-import org.apache.iotdb.jdbc.Config;
-import org.apache.iotdb.jdbc.IoTDBConnection;
-import org.apache.iotdb.rpc.RpcUtils;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.DefaultParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.thrift.TException;
-
-import java.io.Console;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.util.Scanner;
-
-import static org.apache.iotdb.cli.utils.IoTPrinter.print;
-import static org.apache.iotdb.cli.utils.IoTPrinter.println;
-
-/** args[]: -h 127.0.0.1 -p 6667 -u root -pw root */
-public class WinCli extends AbstractCli {
-
-  private static CommandLine commandLine;
-
-  /**
-   * main function.
-   *
-   * @param args -console args
-   */
-  public static void main(String[] args) throws ClassNotFoundException {
-    Class.forName(Config.JDBC_DRIVER_NAME);
-    Options options = createOptions();
-    HelpFormatter hf = new HelpFormatter();
-    hf.setWidth(MAX_HELP_CONSOLE_WIDTH);
-    commandLine = null;
-
-    if (args == null || args.length == 0) {
-      println("Require more params input, please check the following hint.");
-      hf.printHelp(IOTDB_CLI_PREFIX, options, true);
-      return;
-    }
-
-    init();
-    String[] newArgs = removePasswordArgs(args);
-    String[] newArgs2 = processExecuteArgs(newArgs);
-    boolean continues = parseCommandLine(options, newArgs2, hf);
-    if (!continues) {
-      return;
-    }
-
-    serve();
-  }
-
-  private static String readPassword() {
-    Console c = System.console();
-    if (c == null) { // IN ECLIENTPSE IDE
-      print(IOTDB_CLI_PREFIX + "> please input password: ");
-      Scanner scanner = new Scanner(System.in);
-      return scanner.nextLine();
-    } else { // Outside Eclipse IDE
-      return new String(c.readPassword(IOTDB_CLI_PREFIX + "> please input password: "));
-    }
-  }
-
-  private static boolean parseCommandLine(Options options, String[] newArgs, HelpFormatter hf) {
-    try {
-      CommandLineParser parser = new DefaultParser();
-      commandLine = parser.parse(options, newArgs);
-      if (commandLine.hasOption(HELP_ARGS)) {
-        hf.printHelp(IOTDB_CLI_PREFIX, options, true);
-        return false;
-      }
-      if (commandLine.hasOption(RPC_COMPRESS_ARGS)) {
-        Config.rpcThriftCompressionEnable = true;
-      }
-      if (commandLine.hasOption(ISO8601_ARGS)) {
-        timeFormat = RpcUtils.setTimeFormat("long");
-      }
-      if (commandLine.hasOption(MAX_PRINT_ROW_COUNT_ARGS)) {
-        setMaxDisplayNumber(commandLine.getOptionValue(MAX_PRINT_ROW_COUNT_ARGS));
-      }
-    } catch (ParseException e) {
-      println("Require more params input, please check the following hint.");
-      hf.printHelp(IOTDB_CLI_PREFIX, options, true);
-      return false;
-    } catch (NumberFormatException e) {
-      println(
-          IOTDB_CLI_PREFIX
-              + "> error format of max print row count, it should be an integer number");
-      return false;
-    }
-    return true;
-  }
-
-  private static void serve() {
-    try (Scanner scanner = new Scanner(System.in)) {
-      host = checkRequiredArg(HOST_ARGS, HOST_NAME, commandLine, false, host);
-      port = checkRequiredArg(PORT_ARGS, PORT_NAME, commandLine, false, port);
-      username = checkRequiredArg(USERNAME_ARGS, USERNAME_NAME, commandLine, true, null);
-      password = commandLine.getOptionValue(PASSWORD_ARGS);
-      if (password == null) {
-        password = readPassword();
-      }
-      if (hasExecuteSQL) {
-        try (IoTDBConnection connection =
-            (IoTDBConnection)
-                DriverManager.getConnection(
-                    Config.IOTDB_URL_PREFIX + host + ":" + port + "/", username, password)) {
-          properties = connection.getServerProperties();
-          timestampPrecision = properties.getTimestampPrecision();
-          AGGREGRATE_TIME_LIST.addAll(properties.getSupportedTimeAggregationOperations());
-          processCommand(execute, connection);
-          return;
-        } catch (SQLException e) {
-          println(IOTDB_CLI_PREFIX + "> can't execute sql because" + e.getMessage());
-        }
-      }
-
-      receiveCommands(scanner);
-    } catch (ArgsErrorException e) {
-      println(IOTDB_CLI_PREFIX + "> input params error because" + e.getMessage());
-    } catch (Exception e) {
-      println(IOTDB_CLI_PREFIX + "> exit cli with error " + e.getMessage());
-    }
-  }
-
-  private static void receiveCommands(Scanner scanner) throws TException {
-    try (IoTDBConnection connection =
-        (IoTDBConnection)
-            DriverManager.getConnection(
-                Config.IOTDB_URL_PREFIX + host + ":" + port + "/", username, password)) {
-      properties = connection.getServerProperties();
-      AGGREGRATE_TIME_LIST.addAll(properties.getSupportedTimeAggregationOperations());
-      timestampPrecision = properties.getTimestampPrecision();
-
-      echoStarting();
-      displayLogo(properties.getVersion());
-      println(IOTDB_CLI_PREFIX + "> login successfully");
-      while (true) {
-        print(IOTDB_CLI_PREFIX + "> ");
-        String s = scanner.nextLine();
-        boolean continues = processCommand(s, connection);
-        if (!continues) {
-          break;
-        }
-      }
-    } catch (SQLException e) {
-      println(
-          String.format(
-              "%s> %s Host is %s, port is %s.", IOTDB_CLI_PREFIX, e.getMessage(), host, port));
-    }
-  }
-}
diff --git a/cli/src/main/java/org/apache/iotdb/cli/utils/JlineUtils.java b/cli/src/main/java/org/apache/iotdb/cli/utils/JlineUtils.java
new file mode 100644
index 0000000..e2f5c7d
--- /dev/null
+++ b/cli/src/main/java/org/apache/iotdb/cli/utils/JlineUtils.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cli.utils;
+
+import org.apache.iotdb.cli.IoTDBSyntaxHighlighter;
+import org.apache.iotdb.db.qp.sql.IoTDBSqlLexer;
+
+import org.jline.reader.LineReader;
+import org.jline.reader.LineReader.Option;
+import org.jline.reader.LineReaderBuilder;
+import org.jline.reader.impl.DefaultParser.Bracket;
+import org.jline.reader.impl.completer.StringsCompleter;
+import org.jline.terminal.Size;
+import org.jline.terminal.Terminal;
+import org.jline.terminal.Terminal.Signal;
+import org.jline.terminal.TerminalBuilder;
+import org.jline.utils.OSUtils;
+import org.jline.widget.AutopairWidgets;
+import org.jline.widget.AutosuggestionWidgets;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class JlineUtils {
+
+  public static final Pattern SQL_KEYWORD_PATTERN = Pattern.compile("([A-Z_]+)");
+  public static final Set<String> SQL_KEYWORDS =
+      IntStream.range(0, IoTDBSqlLexer.VOCABULARY.getMaxTokenType())
+          .mapToObj(IoTDBSqlLexer.VOCABULARY::getDisplayName)
+          .filter(Objects::nonNull)
+          .filter(w -> SQL_KEYWORD_PATTERN.matcher(w).matches())
+          .collect(Collectors.toSet());
+
+  public static LineReader getLineReader() throws IOException {
+    Terminal terminal = TerminalBuilder.builder().build();
+    if (terminal.getWidth() == 0 || terminal.getHeight() == 0) {
+      // Hard coded terminal size when redirecting.
+      terminal.setSize(new Size(120, 40));
+    }
+    Thread executeThread = Thread.currentThread();
+    // Register signal handler. Instead of shutting down the process, interrupt the current thread
+    // when signal INT is received (usually by pressing CTRL+C).
+    terminal.handle(Signal.INT, signal -> executeThread.interrupt());
+
+    LineReaderBuilder builder = LineReaderBuilder.builder();
+    builder.terminal(terminal);
+
+    // Handle the command history. By default, the number of commands will not exceed 500 and the
+    // size of the history fill will be less than 10 KB. See:
+    // org.jline.reader.impl.history#DefaultHistory
+    String historyFile = ".iotdb.history";
+    String historyFilePath = System.getProperty("user.home") + File.separator + historyFile;
+    builder.variable(LineReader.HISTORY_FILE, new File(historyFilePath));
+
+    builder.highlighter(new IoTDBSyntaxHighlighter());
+
+    builder.completer(new StringsCompleter(SQL_KEYWORDS));
+
+    builder.option(Option.CASE_INSENSITIVE_SEARCH, true);
+    builder.option(Option.CASE_INSENSITIVE, true);
+    // See: https://www.gnu.org/software/bash/manual/html_node/Event-Designators.html
+    builder.option(Option.DISABLE_EVENT_EXPANSION, true);
+
+    org.jline.reader.impl.DefaultParser parser = new org.jline.reader.impl.DefaultParser();
+    // Make multi-line edition be triggered by unclosed brackets and unclosed quotes.
+    parser.setEofOnUnclosedBracket(Bracket.CURLY, Bracket.SQUARE, Bracket.ROUND);
+    parser.setEofOnUnclosedQuote(true);
+    builder.parser(parser);
+    LineReader lineReader = builder.build();
+    if (OSUtils.IS_WINDOWS) {
+      // If enabled cursor remains in begin parenthesis (gitbash).
+      lineReader.setVariable(LineReader.BLINK_MATCHING_PAREN, 0);
+    }
+
+    // Create auto-pair widgets
+    AutopairWidgets autopairWidgets = new AutopairWidgets(lineReader);
+    // Enable auto-pair
+    autopairWidgets.enable();
+    // Create autosuggestion widgets
+    AutosuggestionWidgets autosuggestionWidgets = new AutosuggestionWidgets(lineReader);
+    // Enable autosuggestions
+    autosuggestionWidgets.enable();
+    return lineReader;
+  }
+}
diff --git a/cli/src/main/java/org/apache/iotdb/tool/AbstractCsvTool.java b/cli/src/main/java/org/apache/iotdb/tool/AbstractCsvTool.java
index f03275d..3294406 100644
--- a/cli/src/main/java/org/apache/iotdb/tool/AbstractCsvTool.java
+++ b/cli/src/main/java/org/apache/iotdb/tool/AbstractCsvTool.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.Session;
 
-import jline.internal.Nullable;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
@@ -210,7 +209,7 @@ public abstract class AbstractCsvTool {
    * @param filePath the directory to save the file
    */
   public static Boolean writeCsvFile(
-      @Nullable List<String> headerNames, List<List<Object>> records, String filePath) {
+      List<String> headerNames, List<List<Object>> records, String filePath) {
     try {
       CSVPrinter printer =
           CSVFormat.DEFAULT
diff --git a/cli/src/main/java/org/apache/iotdb/tool/ExportCsv.java b/cli/src/main/java/org/apache/iotdb/tool/ExportCsv.java
index 3450121..6d77a36 100644
--- a/cli/src/main/java/org/apache/iotdb/tool/ExportCsv.java
+++ b/cli/src/main/java/org/apache/iotdb/tool/ExportCsv.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.tool;
 
+import org.apache.iotdb.cli.utils.JlineUtils;
 import org.apache.iotdb.exception.ArgsErrorException;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -28,7 +29,6 @@ import org.apache.iotdb.session.SessionDataSet;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 
-import jline.console.ConsoleReader;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.DefaultParser;
@@ -39,8 +39,13 @@ import org.apache.commons.cli.ParseException;
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVPrinter;
 import org.apache.commons.csv.QuoteMode;
+import org.jline.reader.LineReader;
 
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.PrintWriter;
 import java.time.Instant;
 import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
@@ -83,7 +88,7 @@ public class ExportCsv extends AbstractCsvTool {
   private static final int EXPORT_PER_LINE_COUNT = 10000;
 
   /** main function of export csv tool. */
-  public static void main(String[] args) throws IOException {
+  public static void main(String[] args) {
     Options options = createOptions();
     HelpFormatter hf = new HelpFormatter();
     CommandLine commandLine;
@@ -124,15 +129,13 @@ public class ExportCsv extends AbstractCsvTool {
         String sql;
 
         if (sqlFile == null) {
-          ConsoleReader reader = new ConsoleReader();
-          reader.setExpandEvents(false);
-          sql = reader.readLine(TSFILEDB_CLI_PREFIX + "> please input query: ");
+          LineReader lineReader = JlineUtils.getLineReader();
+          sql = lineReader.readLine(TSFILEDB_CLI_PREFIX + "> please input query: ");
           System.out.println(sql);
           String[] values = sql.trim().split(";");
           for (int i = 0; i < values.length; i++) {
             dumpResult(values[i], i);
           }
-          reader.close();
         } else {
           dumpFromSqlFile(sqlFile);
         }
@@ -335,7 +338,7 @@ public class ExportCsv extends AbstractCsvTool {
         }
       }
     } else {
-      names.forEach(name -> headers.add(name));
+      headers.addAll(names);
     }
     printer.printRecord(headers);
 
diff --git a/pom.xml b/pom.xml
index 6286eef..37f2f27 100644
--- a/pom.xml
+++ b/pom.xml
@@ -137,7 +137,7 @@
         <common.pool2.version>2.11.1</common.pool2.version>
         <org.slf4j.version>1.7.32</org.slf4j.version>
         <guava.version>24.1.1</guava.version>
-        <jline.version>2.14.6</jline.version>
+        <jline.version>3.21.0</jline.version>
         <jetty.version>9.4.35.v20201120</jetty.version>
         <metrics.version>4.2.4</metrics.version>
         <javax.xml.bind.version>2.4.0-b180830.0359</javax.xml.bind.version>
@@ -269,7 +269,7 @@
                 <version>${javax.xml.bind.version}</version>
             </dependency>
             <dependency>
-                <groupId>jline</groupId>
+                <groupId>org.jline</groupId>
                 <artifactId>jline</artifactId>
                 <version>${jline.version}</version>
             </dependency>
@@ -508,7 +508,7 @@
                 <artifactId>gson</artifactId>
                 <version>${gson.version}</version>
             </dependency>
-            <!-- for test container -->
+            <!-- for cli and test container -->
             <dependency>
                 <groupId>net.java.dev.jna</groupId>
                 <artifactId>jna</artifactId>

[iotdb] 08/10: [IOTDB-2065] TsFileSequenceReader will be cached for 100s even no longer used (#4478)

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master_performance
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c0233e63077404607baa54e47f361f76ebefa5e9
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Fri Nov 26 21:00:40 2021 +0800

    [IOTDB-2065] TsFileSequenceReader will be cached for 100s even no longer used (#4478)
---
 .../iotdb/db/query/control/FileReaderManager.java  | 110 +++++++--------------
 .../iotdb/db/engine/cache/ChunkCacheTest.java      |   1 -
 .../db/engine/compaction/cross/MergeTest.java      |   1 -
 .../compaction/inner/InnerCompactionTest.java      |   1 -
 .../SizeTieredCompactionRecoverTest.java           |   1 -
 .../inner/sizetiered/SizeTieredCompactionTest.java |   1 -
 .../compaction/utils/CompactionClearUtils.java     |   1 -
 .../db/query/control/FileReaderManagerTest.java    |  19 +---
 .../query/reader/series/SeriesReaderTestUtil.java  |   1 -
 .../iotdb/db/rescon/ResourceManagerTest.java       |   1 -
 10 files changed, 37 insertions(+), 100 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
index 01cde88..0046bf0 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
@@ -18,11 +18,7 @@
  */
 package org.apache.iotdb.db.query.control;
 
-import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.service.IService;
-import org.apache.iotdb.db.service.ServiceType;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.UnClosedTsFileReader;
@@ -35,15 +31,13 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * FileReaderManager is a singleton, which is used to manage all file readers(opened file streams)
  * to ensure that each file is opened at most once.
  */
-public class FileReaderManager implements IService {
+public class FileReaderManager {
 
   private static final Logger logger = LoggerFactory.getLogger(FileReaderManager.class);
   private static final Logger resourceLogger = LoggerFactory.getLogger("FileMonitor");
@@ -73,16 +67,11 @@ public class FileReaderManager implements IService {
    */
   private Map<String, AtomicInteger> unclosedReferenceMap;
 
-  private ScheduledExecutorService executorService;
-
   private FileReaderManager() {
     closedFileReaderMap = new ConcurrentHashMap<>();
     unclosedFileReaderMap = new ConcurrentHashMap<>();
     closedReferenceMap = new ConcurrentHashMap<>();
     unclosedReferenceMap = new ConcurrentHashMap<>();
-    executorService = IoTDBThreadPoolFactory.newScheduledThreadPool(1, "open-files-manager");
-
-    clearUnUsedFilesInFixTime();
   }
 
   public static FileReaderManager getInstance() {
@@ -102,44 +91,6 @@ public class FileReaderManager implements IService {
     }
   }
 
-  private void clearUnUsedFilesInFixTime() {
-    long examinePeriod = IoTDBDescriptor.getInstance().getConfig().getCacheFileReaderClearPeriod();
-    executorService.scheduleAtFixedRate(
-        () -> {
-          synchronized (this) {
-            clearMap(closedFileReaderMap, closedReferenceMap);
-            clearMap(unclosedFileReaderMap, unclosedReferenceMap);
-          }
-        },
-        0,
-        examinePeriod,
-        TimeUnit.MILLISECONDS);
-  }
-
-  private void clearMap(
-      Map<String, TsFileSequenceReader> readerMap, Map<String, AtomicInteger> refMap) {
-    Iterator<Map.Entry<String, TsFileSequenceReader>> iterator = readerMap.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Map.Entry<String, TsFileSequenceReader> entry = iterator.next();
-      TsFileSequenceReader reader = entry.getValue();
-      AtomicInteger refAtom = refMap.get(entry.getKey());
-
-      if (refAtom != null && refAtom.get() == 0) {
-        try {
-          reader.close();
-        } catch (IOException e) {
-          logger.error("Can not close TsFileSequenceReader {} !", reader.getFileName(), e);
-        }
-        iterator.remove();
-        refMap.remove(entry.getKey());
-        if (resourceLogger.isDebugEnabled()) {
-          resourceLogger.debug(
-              "{} TsFileReader is closed because of no reference.", entry.getKey());
-        }
-      }
-    }
-  }
-
   /**
    * Get the reader of the file(tsfile or unseq tsfile) indicated by filePath. If the reader already
    * exists, just get it from closedFileReaderMap or unclosedFileReaderMap depending on isClosing .
@@ -211,14 +162,44 @@ public class FileReaderManager implements IService {
   void decreaseFileReaderReference(TsFileResource tsFile, boolean isClosed) {
     synchronized (this) {
       if (!isClosed && unclosedReferenceMap.containsKey(tsFile.getTsFilePath())) {
-        unclosedReferenceMap.get(tsFile.getTsFilePath()).decrementAndGet();
+        if (unclosedReferenceMap.get(tsFile.getTsFilePath()).decrementAndGet() == 0) {
+          closeUnUsedReaderAndRemoveRef(tsFile.getTsFilePath(), false);
+        }
       } else if (closedReferenceMap.containsKey(tsFile.getTsFilePath())) {
-        closedReferenceMap.get(tsFile.getTsFilePath()).decrementAndGet();
+        if (closedReferenceMap.get(tsFile.getTsFilePath()).decrementAndGet() == 0) {
+          closeUnUsedReaderAndRemoveRef(tsFile.getTsFilePath(), true);
+        }
       }
     }
     tsFile.readUnlock();
   }
 
+  private void closeUnUsedReaderAndRemoveRef(String tsFilePath, boolean isClosed) {
+    Map<String, TsFileSequenceReader> readerMap =
+        isClosed ? closedFileReaderMap : unclosedFileReaderMap;
+    Map<String, AtomicInteger> refMap = isClosed ? closedReferenceMap : unclosedReferenceMap;
+    synchronized (this) {
+      // check ref num again
+      if (refMap.get(tsFilePath).get() != 0) {
+        return;
+      }
+
+      TsFileSequenceReader reader = readerMap.get(tsFilePath);
+      if (reader != null) {
+        try {
+          reader.close();
+        } catch (IOException e) {
+          logger.error("Can not close TsFileSequenceReader {} !", reader.getFileName(), e);
+        }
+      }
+      readerMap.remove(tsFilePath);
+      refMap.remove(tsFilePath);
+      if (resourceLogger.isDebugEnabled()) {
+        resourceLogger.debug("{} TsFileReader is closed because of no reference.", tsFilePath);
+      }
+    }
+  }
+
   /**
    * Only for <code>EnvironmentUtils.cleanEnv</code> method. To make sure that unit tests and
    * integration tests will not conflict with each other.
@@ -253,31 +234,6 @@ public class FileReaderManager implements IService {
         || (!isClosed && unclosedFileReaderMap.containsKey(tsFile.getTsFilePath()));
   }
 
-  @Override
-  public void start() {
-    // Do nothing
-  }
-
-  @Override
-  public void stop() {
-    if (executorService == null || executorService.isShutdown()) {
-      return;
-    }
-
-    executorService.shutdown();
-    try {
-      executorService.awaitTermination(10, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      logger.error("StatMonitor timing service could not be shutdown.", e);
-      Thread.currentThread().interrupt();
-    }
-  }
-
-  @Override
-  public ServiceType getID() {
-    return ServiceType.FILE_READER_MANAGER_SERVICE;
-  }
-
   private static class FileReaderManagerHelper {
 
     private static final FileReaderManager INSTANCE = new FileReaderManager();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/cache/ChunkCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/cache/ChunkCacheTest.java
index 33eab71..52fca3f 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/cache/ChunkCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/cache/ChunkCacheTest.java
@@ -234,6 +234,5 @@ public class ChunkCacheTest {
       resourceFile.delete();
     }
     FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
-    FileReaderManager.getInstance().stop();
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/MergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/MergeTest.java
index fe53e61..5882ddd 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/MergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/MergeTest.java
@@ -173,7 +173,6 @@ abstract class MergeTest {
     }
 
     FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
-    FileReaderManager.getInstance().stop();
   }
 
   void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum, long valueOffset)
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionTest.java
index 7559da5..c7aca33 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionTest.java
@@ -205,7 +205,6 @@ public abstract class InnerCompactionTest {
       resourceFile.delete();
     }
     FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
-    FileReaderManager.getInstance().stop();
   }
 
   void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum, long valueOffset)
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java
index c573611..1862654 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java
@@ -310,7 +310,6 @@ public class SizeTieredCompactionRecoverTest {
       resourceFile.delete();
     }
     FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
-    FileReaderManager.getInstance().stop();
   }
 
   /** Target file uncompleted, source files and log exists */
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTest.java
index ad4b53d..d5a50ff 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTest.java
@@ -202,7 +202,6 @@ public class SizeTieredCompactionTest {
       resourceFile.delete();
     }
     FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
-    FileReaderManager.getInstance().stop();
   }
 
   void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum, long valueOffset)
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionClearUtils.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionClearUtils.java
index 19bfd7e..4e3af9e 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionClearUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionClearUtils.java
@@ -54,6 +54,5 @@ public class CompactionClearUtils {
       modsFile.delete();
     }
     FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
-    FileReaderManager.getInstance().stop();
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java b/server/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
index 0febbdd..20aad4d 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
@@ -117,25 +117,14 @@ public class FileReaderManagerTest {
     t1.join();
     t2.join();
 
+    Thread.sleep(1000);
+    // Since we have closed the reader after reading the file, it should be false that the file is
+    // still contained by manager
     for (int i = 1; i <= MAX_FILE_SIZE; i++) {
       TsFileResource tsFile = new TsFileResource(SystemFileFactory.INSTANCE.getFile(filePath + i));
-      Assert.assertTrue(manager.contains(tsFile, false));
+      Assert.assertFalse(manager.contains(tsFile, false));
     }
 
-    // the code below is not valid because the cacheFileReaderClearPeriod config in this class is
-    // not valid
-
-    // TimeUnit.SECONDS.sleep(5);
-    //
-    // for (int i = 1; i <= MAX_FILE_SIZE; i++) {
-    //
-    // if (i == 4 || i == 5 || i == 6) {
-    // Assert.assertTrue(manager.contains(filePath + i));
-    // } else {
-    // Assert.assertFalse(manager.contains(filePath + i));
-    // }
-    // }
-
     FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
     for (int i = 1; i < MAX_FILE_SIZE; i++) {
       File file = SystemFileFactory.INSTANCE.getFile(filePath + i);
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
index 9b50d67..c862482 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
@@ -205,6 +205,5 @@ public class SeriesReaderTestUtil {
     }
 
     FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
-    FileReaderManager.getInstance().stop();
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/rescon/ResourceManagerTest.java b/server/src/test/java/org/apache/iotdb/db/rescon/ResourceManagerTest.java
index 4c04342..27dbfe3 100644
--- a/server/src/test/java/org/apache/iotdb/db/rescon/ResourceManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/rescon/ResourceManagerTest.java
@@ -149,7 +149,6 @@ public class ResourceManagerTest {
       resourceFile.delete();
     }
     FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
-    FileReaderManager.getInstance().stop();
   }
 
   void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum, long valueOffset)

[iotdb] 10/10: add split log timer

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master_performance
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 92bb0b4553c6390105e1a35de67ef367bedd8c27
Author: LebronAl <TX...@gmail.com>
AuthorDate: Sat Nov 27 13:22:55 2021 +0800

    add split log timer
---
 .../main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java   | 4 +++-
 .../src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java  | 1 +
 .../src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java   | 1 -
 3 files changed, 4 insertions(+), 2 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index 098b127..a59041a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.utils.PartitionUtils;
 import org.apache.iotdb.cluster.utils.StatusUtils;
 import org.apache.iotdb.db.conf.IoTDBConstant;
@@ -235,7 +236,7 @@ public class Coordinator {
       return concludeFinalStatus(
           plan, plan.getPaths().size(), true, false, false, null, Collections.emptyList());
     }
-
+    long startTime = Statistic.SPLIT_PLAN.getOperationStartTime();
     // split the plan into sub-plans that each only involve one data group
     Map<PhysicalPlan, PartitionGroup> planGroupMap;
     try {
@@ -244,6 +245,7 @@ public class Coordinator {
       return StatusUtils.getStatus(
           StatusUtils.CONSISTENCY_FAILURE, checkConsistencyException.getMessage());
     }
+    Timer.Statistic.SPLIT_PLAN.calOperationCostTimeFromStart(startTime);
 
     // the storage group is not found locally
     if (planGroupMap == null || planGroupMap.isEmpty()) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index f4ac980..562e8cb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -49,6 +49,7 @@ public class Timer {
     // meta group member
     META_GROUP_MEMBER_EXECUTE_NON_QUERY(
         META_GROUP_MEMBER, "execute non query", TIME_SCALE, true, COORDINATOR_EXECUTE_NON_QUERY),
+    SPLIT_PLAN(META_GROUP_MEMBER, "split plan", TIME_SCALE, true, COORDINATOR_EXECUTE_NON_QUERY),
     META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP(
         META_GROUP_MEMBER,
         "execute in local group",
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 5abed89..a7052fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -1530,7 +1530,6 @@ public class PlanExecutor implements IPlanExecutor {
       return;
     }
     try {
-      logger.info("Execute insert tablet {}", insertTabletPlan.getRowCount());
       insertTabletPlan.setMeasurementMNodes(
           new IMeasurementMNode[insertTabletPlan.getMeasurements().length]);
       getSeriesSchemas(insertTabletPlan);

[iotdb] 02/10: catch throwable in sub query threads (#4462)

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master_performance
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3865ef6355590e4b0882a8e9510d93953df7d2e0
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Thu Nov 25 21:53:29 2021 +0800

    catch throwable in sub query threads (#4462)
---
 .../query/dataset/RawQueryDataSetWithoutValueFilter.java   | 14 +++++++-------
 .../iotdb/tsfile/read/common/ExceptionBatchData.java       | 10 +++++-----
 2 files changed, 12 insertions(+), 12 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index 806321f..5f83c74 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -116,12 +116,12 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
             e,
             String.format(
                 "Something gets wrong while reading from the series reader %s: ", pathName));
-      } catch (Exception e) {
+      } catch (Throwable e) {
         putExceptionBatchData(e, "Something gets wrong: ");
       }
     }
 
-    private void putExceptionBatchData(Exception e, String logMessage) {
+    private void putExceptionBatchData(Throwable e, String logMessage) {
       try {
         LOGGER.error(logMessage, e);
         reader.setHasRemaining(false);
@@ -517,11 +517,11 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
     } else if (batchData instanceof ExceptionBatchData) {
       // exception happened in producer thread
       ExceptionBatchData exceptionBatchData = (ExceptionBatchData) batchData;
-      LOGGER.error("exception happened in producer thread", exceptionBatchData.getException());
-      if (exceptionBatchData.getException() instanceof IOException) {
-        throw (IOException) exceptionBatchData.getException();
-      } else if (exceptionBatchData.getException() instanceof RuntimeException) {
-        throw (RuntimeException) exceptionBatchData.getException();
+      LOGGER.error("exception happened in producer thread", exceptionBatchData.getThrowable());
+      if (exceptionBatchData.getThrowable() instanceof IOException) {
+        throw (IOException) exceptionBatchData.getThrowable();
+      } else if (exceptionBatchData.getThrowable() instanceof RuntimeException) {
+        throw (RuntimeException) exceptionBatchData.getThrowable();
       }
 
     } else { // there are more batch data in this time series queue
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ExceptionBatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ExceptionBatchData.java
index d71d39b..b26284b 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ExceptionBatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ExceptionBatchData.java
@@ -20,10 +20,10 @@ package org.apache.iotdb.tsfile.read.common;
 
 public class ExceptionBatchData extends BatchData {
 
-  private Exception exception;
+  private Throwable throwable;
 
-  public ExceptionBatchData(Exception exception) {
-    this.exception = exception;
+  public ExceptionBatchData(Throwable throwable) {
+    this.throwable = throwable;
   }
 
   @Override
@@ -31,7 +31,7 @@ public class ExceptionBatchData extends BatchData {
     throw new UnsupportedOperationException("hasCurrent is not supported for ExceptionBatchData");
   }
 
-  public Exception getException() {
-    return exception;
+  public Throwable getThrowable() {
+    return throwable;
   }
 }

[iotdb] 05/10: [IOTDB-2062] UDF Framework: Potential Memory Leak in `SingleInputColumnSingleReferenceIntermediateLayer` (#4472)

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master_performance
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5ceecfd85aa17f7757feb59e030630d58d9d2ccd
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Fri Nov 26 13:00:33 2021 +0800

    [IOTDB-2062] UDF Framework: Potential Memory Leak in `SingleInputColumnSingleReferenceIntermediateLayer` (#4472)
---
 .../org/apache/iotdb/db/exception/query/QueryProcessException.java  | 4 ++++
 .../org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java   | 6 ++++++
 .../layer/SingleInputColumnSingleReferenceIntermediateLayer.java    | 4 ++++
 3 files changed, 14 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/exception/query/QueryProcessException.java b/server/src/main/java/org/apache/iotdb/db/exception/query/QueryProcessException.java
index e36b67f..6ea4ad3 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/query/QueryProcessException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/query/QueryProcessException.java
@@ -38,6 +38,10 @@ public class QueryProcessException extends IoTDBException {
     super(message, errorCode);
   }
 
+  public QueryProcessException(String message, Throwable cause) {
+    super(message, cause, TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+  }
+
   public QueryProcessException(IoTDBException e) {
     super(e, e.getErrorCode(), e.isUserException());
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java
index 48b94f9..37269d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java
@@ -32,11 +32,16 @@ import org.apache.iotdb.db.query.udf.datastructure.tv.ElasticSerializableTVList;
 import org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.time.ZoneId;
 import java.util.Map;
 
 public class UDTFExecutor {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(UDTFExecutor.class);
+
   protected final FunctionExpression expression;
   protected final UDTFConfigurations configurations;
 
@@ -104,6 +109,7 @@ public class UDTFExecutor {
   }
 
   private void onError(String methodName, Exception e) throws QueryProcessException {
+    LOGGER.warn("Error occurred during executing UDTF", e);
     throw new QueryProcessException(
         String.format(
                 "Error occurred during executing UDTF#%s: %s", methodName, System.lineSeparator())
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java
index 3dc3e34..be5d41c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java
@@ -144,6 +144,8 @@ public class SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
       @Override
       public void readyForNext() {
         hasCached = false;
+
+        tvList.setEvictionUpperBound(beginIndex + 1);
       }
 
       @Override
@@ -234,6 +236,8 @@ public class SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
       public void readyForNext() {
         hasCached = false;
         nextWindowTimeBegin += slidingStep;
+
+        tvList.setEvictionUpperBound(nextIndexBegin + 1);
       }
 
       @Override