You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/04/21 03:55:39 UTC

[iotdb] branch ISSUE_5792 updated: add comment

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch ISSUE_5792
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ISSUE_5792 by this push:
     new a0dc6c9619 add comment
a0dc6c9619 is described below

commit a0dc6c9619b7be7a0e1e941b9797eacc4e6aedf2
Author: Tian Jiang <jt...@163.com>
AuthorDate: Fri Apr 21 11:58:07 2023 +0800

    add comment
---
 .../concurrent/dynamic/DynamicThreadGroup.java     |  7 ++
 .../iotdb/commons/concurrent/pipeline}/Task.java   | 13 ++-
 .../commons/concurrent/pipeline/TaskRunner.java    | 73 +++++++++++++++++
 .../iotdb/db/engine/flush/MemTableFlushTaskV2.java | 92 ++++------------------
 .../iotdb/db/engine/flush/tasks/DeviceIOTask.java  | 16 +++-
 .../db/engine/flush/tasks/EncodeSeriesTask.java    |  6 ++
 .../iotdb/db/engine/flush/tasks/FinalTask.java     |  5 ++
 .../iotdb/db/engine/flush/tasks/FlushContext.java  |  4 +
 .../db/engine/flush/tasks/SortSeriesTask.java      |  4 +
 .../db/engine/storagegroup/TsFileProcessor.java    |  6 +-
 10 files changed, 145 insertions(+), 81 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
index 2266d5c579..240917d076 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.commons.concurrent.dynamic;
 
+import java.util.concurrent.ExecutionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -98,4 +99,10 @@ public class DynamicThreadGroup {
     threadFutureMap.clear();
     threadCnt.set(0);
   }
+
+  public void join() throws ExecutionException, InterruptedException {
+    for (Future<?> future : threadFutureMap.values()) {
+      future.get();
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/Task.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/pipeline/Task.java
similarity index 68%
rename from server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/Task.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/concurrent/pipeline/Task.java
index ee524be535..b528f48025 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/Task.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/pipeline/Task.java
@@ -16,10 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.engine.flush.tasks;
+package org.apache.iotdb.commons.concurrent.pipeline;
 
+/**
+ * A pipeline task that will generate the next task that should be sent to the next stage in the
+ * pipeline after being run.
+ */
 public interface Task {
+
+  /** Run the task. Must be called before nextTask(). */
   void run();
 
+  /**
+   * Generate the next task (if any). Must be called after run().
+   *
+   * @return the task for the next stage or null if there is no further tasks.
+   */
   Task nextTask();
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/pipeline/TaskRunner.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/pipeline/TaskRunner.java
new file mode 100644
index 0000000000..b9b308c677
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/pipeline/TaskRunner.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.concurrent.pipeline;
+
+import org.apache.iotdb.commons.concurrent.dynamic.DynamicThread;
+import org.apache.iotdb.commons.concurrent.dynamic.DynamicThreadGroup;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * A thread that will continuously take tasks from an input queue, run the task, and insert the next
+ * task to the output queue. By connecting multiple TaskRunners with input queues and output queues,
+ * a pipeline is formed.
+ */
+public class TaskRunner extends DynamicThread {
+
+  private Runnable cleanUp;
+  private BlockingQueue<Task> input;
+  private BlockingQueue<Task> output;
+
+  public TaskRunner(
+      DynamicThreadGroup threadGroup,
+      Runnable cleanUp,
+      BlockingQueue<Task> input,
+      BlockingQueue<Task> output) {
+    super(threadGroup);
+    this.cleanUp = cleanUp;
+    this.input = input;
+    this.output = output;
+  }
+
+  @Override
+  public void runInternal() {
+    while (!Thread.interrupted()) {
+      Task task;
+      try {
+        task = input.take();
+        idleToRunning();
+        task.run();
+        Task nextTask = task.nextTask();
+        if (nextTask != null) {
+          output.put(nextTask);
+        }
+        runningToIdle();
+
+        if (shouldExit()) {
+          break;
+        }
+      } catch (InterruptedException e1) {
+        Thread.currentThread().interrupt();
+        break;
+      }
+    }
+
+    cleanUp.run();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskV2.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskV2.java
index 1329fed663..6a4b95eaa6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskV2.java
@@ -20,6 +20,8 @@ package org.apache.iotdb.db.engine.flush;
 
 import org.apache.iotdb.commons.concurrent.dynamic.DynamicThread;
 import org.apache.iotdb.commons.concurrent.dynamic.DynamicThreadGroup;
+import org.apache.iotdb.commons.concurrent.pipeline.Task;
+import org.apache.iotdb.commons.concurrent.pipeline.TaskRunner;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
@@ -29,7 +31,6 @@ import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
 import org.apache.iotdb.db.engine.flush.tasks.FlushContext;
 import org.apache.iotdb.db.engine.flush.tasks.FlushDeviceContext;
 import org.apache.iotdb.db.engine.flush.tasks.SortSeriesTask;
-import org.apache.iotdb.db.engine.flush.tasks.Task;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
 import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
@@ -51,9 +52,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -70,7 +69,7 @@ public class MemTableFlushTaskV2 {
   private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private final DynamicThreadGroup sortTasks;
   private final DynamicThreadGroup encodingTasks;
-  private final Future<?> ioTaskFuture;
+  private final DynamicThreadGroup ioTask;
 
   private final LinkedBlockingQueue<Task> sortTaskQueue = new LinkedBlockingQueue<>();
   private final LinkedBlockingQueue<Task> encodingTaskQueue = new LinkedBlockingQueue<>();
@@ -116,7 +115,12 @@ public class MemTableFlushTaskV2 {
             this::newEncodingThread,
             config.getFlushMemTableMinSubThread(),
             config.getFlushMemTableMaxSubThread());
-    this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(newIOThread());
+    this.ioTask = new DynamicThreadGroup(
+        storageGroup + "-" + dataRegionId + "-" + memTable,
+        SUB_TASK_POOL_MANAGER::submit,
+        this::newIOThread,
+        1,
+        1);
     LOGGER.debug(
         "flush task of database {} memtable is created, flushing to file {}.",
         storageGroup,
@@ -164,6 +168,7 @@ public class MemTableFlushTaskV2 {
     allContext.setDeviceContexts(new ArrayList<>());
 
     for (IDeviceID deviceID : deviceIDList) {
+      // create a context for each device
       FlushDeviceContext flushDeviceContext = new FlushDeviceContext();
       flushDeviceContext.setDeviceID(deviceID);
       allContext.getDeviceContexts().add(flushDeviceContext);
@@ -173,6 +178,7 @@ public class MemTableFlushTaskV2 {
       // skip the empty device/chunk group
       seriesInOrder.removeIf(s -> memChunkMap.get(s).count() == 0);
       seriesInOrder.sort((String::compareTo));
+      // record the series order in the device context
       flushDeviceContext.setMeasurementIds(seriesInOrder);
       flushDeviceContext.setChunkWriters(new IChunkWriter[seriesInOrder.size()]);
       flushDeviceContext.setSeriesIndexMap(new HashMap<>());
@@ -195,9 +201,9 @@ public class MemTableFlushTaskV2 {
     }
 
     try {
-      ioTaskFuture.get();
+      ioTask.join();
     } catch (InterruptedException | ExecutionException e) {
-      ioTaskFuture.cancel(true);
+      ioTask.cancelAll();
       encodingTasks.cancelAll();
       sortTasks.cancelAll();
       if (e instanceof InterruptedException) {
@@ -263,82 +269,16 @@ public class MemTableFlushTaskV2 {
     }
   }
 
-  protected class TaskRunner extends DynamicThread {
-
-    private static final String TASK_NAME_SORT = "sort data";
-    private static final String TASK_NAME_ENCODING = "encode data";
-    private static final String TASK_NAME_IO = "write file";
-    private Runnable cleanUp;
-    private String taskName;
-    private BlockingQueue<Task> input;
-    private BlockingQueue<Task> output;
-
-    public TaskRunner(
-        DynamicThreadGroup threadGroup,
-        Runnable cleanUp,
-        String taskName,
-        BlockingQueue<Task> input,
-        BlockingQueue<Task> output) {
-      super(threadGroup);
-      this.cleanUp = cleanUp;
-      this.taskName = taskName;
-      this.input = input;
-      this.output = output;
-    }
-
-    @Override
-    public void runInternal() {
-      LOGGER.debug(
-          "Database {} memtable flushing to file {} starts to {}.",
-          storageGroup,
-          allContext.getWriter().getFile().getName(),
-          taskName);
-      while (!Thread.interrupted()) {
-        Task task;
-        try {
-          task = input.take();
-          idleToRunning();
-          task.run();
-          Task nextTask = task.nextTask();
-          if (nextTask != null) {
-            output.put(nextTask);
-          }
-          runningToIdle();
-
-          if (shouldExit()) {
-            break;
-          }
-        } catch (InterruptedException e1) {
-          Thread.currentThread().interrupt();
-          break;
-        }
-      }
-
-      cleanUp.run();
-    }
-  }
-
   private DynamicThread newSortThread() {
-    return new TaskRunner(
-        sortTasks,
-        this::cleanSortThread,
-        TaskRunner.TASK_NAME_SORT,
-        sortTaskQueue,
-        encodingTaskQueue);
+    return new TaskRunner(sortTasks, this::cleanSortThread, sortTaskQueue, encodingTaskQueue);
   }
 
   private DynamicThread newEncodingThread() {
-    return new TaskRunner(
-        encodingTasks,
-        this::cleanEncodingThread,
-        TaskRunner.TASK_NAME_ENCODING,
-        encodingTaskQueue,
-        ioTaskQueue);
+    return new TaskRunner(encodingTasks, this::cleanEncodingThread, encodingTaskQueue, ioTaskQueue);
   }
 
   private DynamicThread newIOThread() {
-    return new TaskRunner(
-        null, this::cleanIOThread, TaskRunner.TASK_NAME_IO, ioTaskQueue, ioTaskQueue);
+    return new TaskRunner(null, this::cleanIOThread, ioTaskQueue, ioTaskQueue);
   }
 
   private void cleanSortThread() {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/DeviceIOTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/DeviceIOTask.java
index 161853117c..cc3c9ba3c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/DeviceIOTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/DeviceIOTask.java
@@ -18,13 +18,24 @@
  */
 package org.apache.iotdb.db.engine.flush.tasks;
 
+import org.apache.iotdb.commons.concurrent.pipeline.Task;
 import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
 import org.apache.iotdb.db.service.metrics.WritingMetrics;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 
 import java.io.IOException;
 
+/**
+ * A DeviceIOTask will check if the current device in the FlushContext (indicated by
+ * FlushContext.cursor) is fully encoded. If it is fully encoded, it will be flushed to the
+ * underlying file and the cursor will be moved to the next device.
+ * <p>
+ * The above procedure repeats until the current device is still being encoded or all devices are
+ * flushed. If all devices are flushed, a FinalTask will be generated. Otherwise, no new task will
+ * be generated.
+ */
 public class DeviceIOTask implements Task {
+
   private FlushContext flushContext;
 
   public DeviceIOTask(FlushContext flushContext) {
@@ -39,12 +50,15 @@ public class DeviceIOTask implements Task {
       if (cursor < flushContext.getDeviceContexts().size()) {
         FlushDeviceContext flushDeviceContext = flushContext.getDeviceContexts().get(cursor);
         if (flushDeviceContext.isFullyEncoded()) {
+          // the current device is ready, flush it and move to the next device
           flushOneDevice(flushDeviceContext);
           flushContext.setCursor(cursor + 1);
         } else {
+          // the current device is still being flushed
           hasNext = false;
         }
       } else {
+        // all devices are flushed
         hasNext = false;
       }
     }
@@ -70,7 +84,7 @@ public class DeviceIOTask implements Task {
 
   @Override
   public FinalTask nextTask() {
-    if (flushContext.getCursor() == flushContext.getDeviceContexts().size()) {
+    if (flushContext.allFlushed()) {
       // all devices have been flushed
       return new FinalTask();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/EncodeSeriesTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/EncodeSeriesTask.java
index 083db4ba5d..b03010b1d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/EncodeSeriesTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/EncodeSeriesTask.java
@@ -18,12 +18,18 @@
  */
 package org.apache.iotdb.db.engine.flush.tasks;
 
+import org.apache.iotdb.commons.concurrent.pipeline.Task;
 import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
 import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
 import org.apache.iotdb.db.service.metrics.WritingMetrics;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 
+/**
+ * EncodeSeriesTask encodes a timeseries to its byte form. If all timeseries of the device are
+ * encoded, a DeviceIOTask will be generated. Otherwise, no new task will be generated.
+ */
 public class EncodeSeriesTask implements Task {
+
   private IDeviceID deviceId;
   private String seriesId;
   private FlushDeviceContext deviceContext;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FinalTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FinalTask.java
index 7b4b4f0bfc..632f9b4ef3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FinalTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FinalTask.java
@@ -18,6 +18,11 @@
  */
 package org.apache.iotdb.db.engine.flush.tasks;
 
+import org.apache.iotdb.commons.concurrent.pipeline.Task;
+
+/**
+ * FinalTask ends the current TaskRunner by interruption.
+ */
 public class FinalTask implements Task {
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FlushContext.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FlushContext.java
index 6e4e2acb5e..87523c8bc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FlushContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FlushContext.java
@@ -83,4 +83,8 @@ public class FlushContext {
   public void setMemTable(IMemTable memTable) {
     this.memTable = memTable;
   }
+
+  public boolean allFlushed() {
+    return cursor.get() == deviceContexts.size();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/SortSeriesTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/SortSeriesTask.java
index e9d9f73711..a5c529f6ed 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/SortSeriesTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/SortSeriesTask.java
@@ -18,10 +18,14 @@
  */
 package org.apache.iotdb.db.engine.flush.tasks;
 
+import org.apache.iotdb.commons.concurrent.pipeline.Task;
 import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
 import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
 import org.apache.iotdb.db.service.metrics.WritingMetrics;
 
+/**
+ * SortSeriesTask sorts a timeseries and generates the associated encoding task.
+ */
 public class SortSeriesTask implements Task {
   private IDeviceID deviceId;
   private String seriesId;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 395c372ac3..6e0a248270 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -32,7 +32,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.flush.CloseFileListener;
 import org.apache.iotdb.db.engine.flush.FlushListener;
 import org.apache.iotdb.db.engine.flush.FlushManager;
-import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
+import org.apache.iotdb.db.engine.flush.MemTableFlushTaskV2;
 import org.apache.iotdb.db.engine.flush.NotifyFlushMemTable;
 import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunk;
 import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunkGroup;
@@ -1077,8 +1077,8 @@ public class TsFileProcessor {
       } else {
         try {
           writer.mark();
-          MemTableFlushTask flushTask =
-              new MemTableFlushTask(
+          MemTableFlushTaskV2 flushTask =
+              new MemTableFlushTaskV2(
                   memTableToFlush,
                   writer,
                   storageGroupName,