You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/04/20 06:04:07 UTC

[iotdb] branch ISSUE_5792 updated: add MemTableFlushTaskv2

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch ISSUE_5792
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ISSUE_5792 by this push:
     new ab618b7495 add MemTableFlushTaskv2
ab618b7495 is described below

commit ab618b7495653fa8a25d1c3bf6188b44bd8635f0
Author: Tian Jiang <jt...@163.com>
AuthorDate: Thu Apr 20 14:06:34 2023 +0800

    add MemTableFlushTaskv2
---
 .../commons/concurrent/dynamic/DynamicThread.java  |  12 +-
 .../concurrent/dynamic/DynamicThreadGroup.java     |  18 +-
 .../iotdb/db/engine/flush/MemTableFlushTaskV2.java | 355 +++++++++++++++++++++
 .../iotdb/db/engine/flush/tasks/DeviceIOTask.java  |  78 +++++
 .../db/engine/flush/tasks/EncodeSeriesTask.java    |  85 +++++
 .../iotdb/db/engine/flush/tasks/FinalTask.java     |  32 ++
 .../iotdb/db/engine/flush/tasks/FlushContext.java  |  86 +++++
 .../db/engine/flush/tasks/FlushDeviceContext.java  |  74 +++++
 .../db/engine/flush/tasks/SortSeriesTask.java      |  74 +++++
 .../apache/iotdb/db/engine/flush/tasks/Task.java   |  24 ++
 10 files changed, 833 insertions(+), 5 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThread.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThread.java
index 4620de5159..1bbb6d16c7 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThread.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThread.java
@@ -35,6 +35,10 @@ public abstract class DynamicThread implements Runnable {
   private double maximumIdleRatio = 0.5;
   private double minimumIdleRatio = 0.1;
 
+  public DynamicThread(DynamicThreadGroup threadGroup) {
+    this.threadGroup = threadGroup;
+  }
+
   /**
    * The implementation must call idleToRunning() and runningToIdle() properly. E.g., {
    * {@code {while(! Thread.interrupted ()) { Object obj = blockingQueue.poll(); idleToRunning();
@@ -65,6 +69,10 @@ public abstract class DynamicThread implements Runnable {
   }
 
   protected boolean shouldExit() {
+    if (threadGroup == null) {
+      return false;
+    }
+
     double idleRatio = idleRatio();
     if (idleRatio < minimumIdleRatio) {
       // Thread too busy, try adding a new thread
@@ -91,7 +99,9 @@ public abstract class DynamicThread implements Runnable {
     try {
       runInternal();
     } finally {
-      threadGroup.onThreadExit(this);
+      if (threadGroup != null) {
+        threadGroup.onThreadExit(this);
+      }
     }
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
index 7216437ea9..9d3199d0bb 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
@@ -23,11 +23,12 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 public class DynamicThreadGroup {
 
-  private ExecutorService hostingPool;
+  private Function<Runnable, Future<?>> poolSubmitter;
   private String name;
   private Supplier<DynamicThread> threadFactory;
   private AtomicInteger threadCnt = new AtomicInteger();
@@ -35,13 +36,16 @@ public class DynamicThreadGroup {
   private int maxThreadCnt = 8;
   private Map<DynamicThread, Future<?>> threadFutureMap = new ConcurrentHashMap<>();
 
-  public DynamicThreadGroup(String name, ExecutorService hostingPool,
+  public DynamicThreadGroup(String name, Function<Runnable, Future<?>> poolSubmitter,
       Supplier<DynamicThread> threadFactory, int minThreadCnt, int maxThreadCnt) {
     this.name = name;
-    this.hostingPool = hostingPool;
+    this.poolSubmitter = poolSubmitter;
     this.threadFactory = threadFactory;
     this.minThreadCnt = Math.max(1, minThreadCnt);
     this.maxThreadCnt = Math.max(this.minThreadCnt, maxThreadCnt);
+    for (int i = 0; i < this.minThreadCnt; i++) {
+      addThread();
+    }
   }
 
   /**
@@ -51,7 +55,7 @@ public class DynamicThreadGroup {
     int afterCnt = threadCnt.incrementAndGet();
     if (afterCnt <= maxThreadCnt) {
       DynamicThread dynamicThread = threadFactory.get();
-      Future<?> submit = hostingPool.submit(dynamicThread);
+      Future<?> submit = poolSubmitter.apply(dynamicThread);
       threadFutureMap.put(dynamicThread, submit);
     } else {
       threadCnt.decrementAndGet();
@@ -74,4 +78,10 @@ public class DynamicThreadGroup {
     threadCnt.decrementAndGet();
     threadFutureMap.remove(dynamicThread);
   }
+
+  public void cancelAll() {
+    threadFutureMap.forEach((t,f) -> f.cancel(true));
+    threadFutureMap.clear();
+    threadCnt.set(0);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskV2.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskV2.java
new file mode 100644
index 0000000000..75cc4d03cc
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskV2.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.flush;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.commons.concurrent.dynamic.DynamicThread;
+import org.apache.iotdb.commons.concurrent.dynamic.DynamicThreadGroup;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
+import org.apache.iotdb.db.engine.flush.tasks.FlushContext;
+import org.apache.iotdb.db.engine.flush.tasks.FlushDeviceContext;
+import org.apache.iotdb.db.engine.flush.tasks.SortSeriesTask;
+import org.apache.iotdb.db.engine.flush.tasks.Task;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
+import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
+import org.apache.iotdb.db.rescon.SystemInfo;
+import org.apache.iotdb.db.service.metrics.WritingMetrics;
+import org.apache.iotdb.db.service.metrics.recorder.WritingMetricsManager;
+import org.apache.iotdb.metrics.utils.IoTDBMetricsUtils;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * flush task to flush one memtable using a pipeline model to flush, which is sort memtable ->
+ * encoding -> write to disk (io task)
+ */
+public class MemTableFlushTaskV2 {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTaskV2.class);
+  private static final FlushSubTaskPoolManager SUB_TASK_POOL_MANAGER =
+      FlushSubTaskPoolManager.getInstance();
+  private static final WritingMetricsManager WRITING_METRICS = WritingMetricsManager.getInstance();
+  private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private final DynamicThreadGroup sortTasks;
+  private final DynamicThreadGroup encodingTasks;
+  private final Future<?> ioTaskFuture;
+
+  private final LinkedBlockingQueue<Task> sortTaskQueue = new LinkedBlockingQueue<>();
+  private final LinkedBlockingQueue<Task> encodingTaskQueue = new LinkedBlockingQueue<>();
+  private final LinkedBlockingQueue<Task> ioTaskQueue =
+      (config.isEnableMemControl() && SystemInfo.getInstance().isEncodingFasterThanIo())
+          ? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing())
+          : new LinkedBlockingQueue<>();
+
+  private String storageGroup;
+  private String dataRegionId;
+
+  private IMemTable memTable;
+  private FlushContext allContext;
+
+  /**
+   * @param memTable     the memTable to flush
+   * @param writer       the writer where memTable will be flushed to (current tsfile writer or vm
+   *                     writer)
+   * @param storageGroup current database
+   */
+  public MemTableFlushTaskV2(
+      IMemTable memTable,
+      RestorableTsFileIOWriter writer,
+      String storageGroup,
+      String dataRegionId) {
+    this.memTable = memTable;
+    this.storageGroup = storageGroup;
+    this.dataRegionId = dataRegionId;
+    this.allContext = new FlushContext();
+    this.allContext.setWriter(writer);
+    this.allContext.setMemTable(memTable);
+
+    this.sortTasks = new DynamicThreadGroup(storageGroup + "-" + dataRegionId + "-" + memTable,
+        SUB_TASK_POOL_MANAGER::submit, this::newSortThread, 1, 8);
+    this.encodingTasks = new DynamicThreadGroup(storageGroup + "-" + dataRegionId + "-" + memTable,
+        SUB_TASK_POOL_MANAGER::submit, this::newEncodingThread, 1, 8);
+    this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(newIOThread());
+    LOGGER.debug(
+        "flush task of database {} memtable is created, flushing to file {}.",
+        storageGroup,
+        allContext.getWriter().getFile().getName());
+  }
+
+
+  /**
+   * the function for flushing memtable.
+   */
+  public void syncFlushMemTable() throws ExecutionException, InterruptedException {
+    long avgSeriesPointsNum =
+        memTable.getSeriesNumber() == 0
+            ? 0
+            : memTable.getTotalPointsNum() / memTable.getSeriesNumber();
+    LOGGER.info(
+        "The memTable size of SG {} is {}, the avg series points num in chunk is {}, total timeseries number is {}",
+        storageGroup,
+        memTable.memSize(),
+        avgSeriesPointsNum,
+        memTable.getSeriesNumber());
+    WRITING_METRICS.recordFlushingMemTableStatus(
+        storageGroup,
+        memTable.memSize(),
+        memTable.getSeriesNumber(),
+        memTable.getTotalPointsNum(),
+        avgSeriesPointsNum);
+
+    long estimatedTemporaryMemSize = 0L;
+    if (config.isEnableMemControl() && SystemInfo.getInstance().isEncodingFasterThanIo()) {
+      estimatedTemporaryMemSize =
+          memTable.getSeriesNumber() == 0
+              ? 0
+              : memTable.memSize()
+                  / memTable.getSeriesNumber()
+                  * config.getIoTaskQueueSizeForFlushing();
+      SystemInfo.getInstance().applyTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
+    }
+    long start = System.currentTimeMillis();
+
+    // for map do not use get(key) to iterate
+    Map<IDeviceID, IWritableMemChunkGroup> memTableMap = memTable.getMemTableMap();
+    List<IDeviceID> deviceIDList = new ArrayList<>(memTableMap.keySet());
+    // sort the IDeviceID in lexicographical order
+    deviceIDList.sort(Comparator.comparing(IDeviceID::toStringID));
+    deviceIDList.removeIf(d -> memTableMap.get(d).count() == 0 ||
+        memTableMap.get(d).getMemChunkMap().isEmpty());
+
+    allContext = new FlushContext();
+    allContext.setDeviceContexts(new ArrayList<>());
+
+    for (IDeviceID deviceID : deviceIDList) {
+      FlushDeviceContext flushDeviceContext = new FlushDeviceContext();
+      allContext.getDeviceContexts().add(flushDeviceContext);
+      flushDeviceContext.setDeviceID(deviceID);
+      final Map<String, IWritableMemChunk> value = memTableMap.get(deviceID).getMemChunkMap();
+      List<String> seriesInOrder = new ArrayList<>(value.keySet());
+      // skip the empty device/chunk group
+      seriesInOrder.removeIf(s -> value.get(s).count() == 0);
+      seriesInOrder.sort((String::compareTo));
+      flushDeviceContext.setMeasurementIds(seriesInOrder);
+      flushDeviceContext.setChunkWriters(new IChunkWriter[seriesInOrder.size()]);
+      flushDeviceContext.setSeriesIndexMap(new HashMap<>());
+
+      for (int j = 0; j < seriesInOrder.size(); j++) {
+        // starting from sorting each series
+        String seriesId = seriesInOrder.get(j);
+        flushDeviceContext.getSeriesIndexMap().put(seriesId, j);
+        IWritableMemChunk series = value.get(seriesId);
+
+        SortSeriesTask sortSeriesTask = new SortSeriesTask();
+        sortSeriesTask.setSeriesId(seriesId);
+        sortSeriesTask.setAllContext(allContext);
+        sortSeriesTask.setDeviceId(deviceID);
+        sortSeriesTask.setSeries(series);
+        sortSeriesTask.setDeviceContext(flushDeviceContext);
+
+        sortTaskQueue.put(sortSeriesTask);
+      }
+    }
+
+    try {
+      ioTaskFuture.get();
+    } catch (InterruptedException | ExecutionException e) {
+      ioTaskFuture.cancel(true);
+      encodingTasks.cancelAll();
+      sortTasks.cancelAll();
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      throw e;
+    }
+    encodingTasks.cancelAll();
+    sortTasks.cancelAll();
+
+    try {
+      long writePlanIndicesStartTime = System.currentTimeMillis();
+      allContext.getWriter().writePlanIndices();
+      WRITING_METRICS.recordFlushCost(
+          WritingMetrics.WRITE_PLAN_INDICES,
+          System.currentTimeMillis() - writePlanIndicesStartTime);
+    } catch (IOException e) {
+      throw new ExecutionException(e);
+    }
+
+    if (config.isEnableMemControl()) {
+      if (estimatedTemporaryMemSize != 0) {
+        SystemInfo.getInstance().releaseTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
+      }
+      SystemInfo.getInstance().setEncodingFasterThanIo(
+          allContext.getIoTime().get() >= allContext.getEncodingTime().get());
+    }
+
+    MetricService.getInstance()
+        .timer(
+            System.currentTimeMillis() - start,
+            TimeUnit.MILLISECONDS,
+            Metric.COST_TASK.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            "flush");
+
+    LOGGER.info(
+        "Database {} memtable {} flushing a memtable has finished! Time consumption: {}ms",
+        storageGroup,
+        memTable,
+        System.currentTimeMillis() - start);
+  }
+
+  protected void metricFlush() {
+    if (!storageGroup.startsWith(IoTDBMetricsUtils.DATABASE)) {
+      int lastIndex = storageGroup.lastIndexOf("-");
+      if (lastIndex == -1) {
+        lastIndex = storageGroup.length();
+      }
+      MetricService.getInstance()
+          .gaugeWithInternalReportAsync(
+              memTable.getTotalPointsNum(),
+              Metric.POINTS.toString(),
+              MetricLevel.CORE,
+              Tag.DATABASE.toString(),
+              storageGroup.substring(0, lastIndex),
+              Tag.TYPE.toString(),
+              "flush",
+              Tag.REGION.toString(),
+              dataRegionId);
+    }
+  }
+
+
+  protected class TaskRunner extends DynamicThread {
+
+    private static final String TASK_NAME_SORT = "sort data";
+    private static final String TASK_NAME_ENCODING = "encode data";
+    private static final String TASK_NAME_IO = "write file";
+    private Runnable cleanUp;
+    private String taskName;
+    private BlockingQueue<Task> input;
+    private BlockingQueue<Task> output;
+
+    public TaskRunner(DynamicThreadGroup threadGroup, Runnable cleanUp, String taskName,
+        BlockingQueue<Task> input, BlockingQueue<Task> output) {
+      super(threadGroup);
+      this.cleanUp = cleanUp;
+      this.taskName = taskName;
+      this.input = input;
+      this.output = output;
+    }
+
+    @Override
+    public void runInternal() {
+      LOGGER.debug(
+          "Database {} memtable flushing to file {} starts to {}.",
+          storageGroup,
+          allContext.getWriter().getFile().getName(),
+          taskName);
+      while (!Thread.interrupted()) {
+        Task task;
+        try {
+          task = input.take();
+          idleToRunning();
+          task.run();
+          Task nextTask = task.nextTask();
+          if (nextTask != null) {
+            output.put(nextTask);
+          }
+          runningToIdle();
+        } catch (InterruptedException e1) {
+          Thread.currentThread().interrupt();
+          break;
+        }
+      }
+
+      cleanUp.run();
+    }
+  }
+
+  private DynamicThread newSortThread() {
+    return new TaskRunner(sortTasks, this::cleanSortThread, TaskRunner.TASK_NAME_SORT,
+        sortTaskQueue, encodingTaskQueue);
+  }
+
+  private DynamicThread newEncodingThread() {
+    return new TaskRunner(encodingTasks, this::cleanEncodingThread, TaskRunner.TASK_NAME_ENCODING,
+        encodingTaskQueue, ioTaskQueue);
+  }
+
+  private DynamicThread newIOThread() {
+    return new TaskRunner(null, this::cleanIOThread, TaskRunner.TASK_NAME_IO,
+        ioTaskQueue, ioTaskQueue);
+  }
+
+
+  private void cleanSortThread() {
+    metricFlush();
+    LOGGER.info(
+        "Database {}, flushing memtable {} into disk: Sort data cost " + "{} ms.",
+        storageGroup,
+        allContext.getWriter().getFile().getName(),
+        allContext.getSortTime().get());
+    WRITING_METRICS.recordFlushCost(WritingMetrics.FLUSH_STAGE_SORT,
+        allContext.getSortTime().get());
+  }
+
+  private void cleanEncodingThread() {
+    metricFlush();
+    LOGGER.info(
+        "Database {}, flushing memtable {} into disk: Encoding data cost " + "{} ms.",
+        storageGroup,
+        allContext.getWriter().getFile().getName(),
+        allContext.getEncodingTime().get());
+    WRITING_METRICS.recordFlushCost(WritingMetrics.FLUSH_STAGE_ENCODING,
+        allContext.getEncodingTime().get());
+  }
+  private void cleanIOThread() {
+    metricFlush();
+    LOGGER.info(
+        "Database {}, flushing memtable {} into disk: IO cost " + "{} ms.",
+        storageGroup,
+        allContext.getWriter().getFile().getName(),
+        allContext.getIoTime().get());
+    WRITING_METRICS.recordFlushCost(WritingMetrics.FLUSH_STAGE_IO, allContext.getIoTime().get());
+    WRITING_METRICS.recordFlushTsFileSize(storageGroup,
+        allContext.getWriter().getFile().length());
+  }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/DeviceIOTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/DeviceIOTask.java
new file mode 100644
index 0000000000..e7f0012efd
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/DeviceIOTask.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.flush.tasks;
+
+import java.io.IOException;
+import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
+import org.apache.iotdb.db.service.metrics.WritingMetrics;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+
+public class DeviceIOTask implements Task {
+  private FlushContext flushContext;
+
+  public DeviceIOTask(FlushContext flushContext) {
+    this.flushContext = flushContext;
+  }
+
+  @Override
+  public void run() {
+    boolean hasNext = true;
+    while (hasNext) {
+      int cursor = flushContext.getCursor();
+      if (cursor < flushContext.getDeviceContexts().size()) {
+        FlushDeviceContext flushDeviceContext = flushContext.getDeviceContexts().get(cursor);
+        if (flushDeviceContext.isFullyEncoded()) {
+          flushOneDevice(flushDeviceContext);
+          flushContext.setCursor(cursor + 1);
+        } else {
+          hasNext = false;
+        }
+      } else {
+        hasNext = false;
+      }
+    }
+  }
+
+  public void flushOneDevice(FlushDeviceContext deviceContext) {
+    long starTime = System.currentTimeMillis();
+    try {
+      flushContext.getWriter().startChunkGroup(deviceContext.getDeviceID().toStringID());
+      for (IChunkWriter chunkWriter : deviceContext.getChunkWriters()) {
+        chunkWriter.writeToFileWriter(flushContext.getWriter());
+      }
+      flushContext.getWriter().setMinPlanIndex(flushContext.getMemTable().getMinPlanIndex());
+      flushContext.getWriter().setMaxPlanIndex(flushContext.getMemTable().getMaxPlanIndex());
+      flushContext.getWriter().endChunkGroup();
+    } catch (IOException e) {
+      throw new FlushRunTimeException(e);
+    }
+    long subTaskTime = System.currentTimeMillis() - starTime;
+    flushContext.getIoTime().addAndGet(subTaskTime);
+    flushContext.getWritingMetrics().recordFlushSubTaskCost(WritingMetrics.IO_TASK, subTaskTime);
+  }
+
+  @Override
+  public FinalTask nextTask() {
+    if (flushContext.getCursor() == flushContext.getDeviceContexts().size()) {
+      // all devices have been flushed
+      return new FinalTask();
+    }
+    return null;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/EncodeSeriesTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/EncodeSeriesTask.java
new file mode 100644
index 0000000000..d6d6bfccd7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/EncodeSeriesTask.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.flush.tasks;
+
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
+import org.apache.iotdb.db.service.metrics.WritingMetrics;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+
+public class EncodeSeriesTask implements Task {
+  private IDeviceID deviceId;
+  private String seriesId;
+  private FlushDeviceContext deviceContext;
+  private FlushContext allContext;
+  private IWritableMemChunk series;
+  private IChunkWriter chunkWriter;
+
+  public EncodeSeriesTask(IDeviceID deviceId, String seriesId, FlushDeviceContext deviceContext,
+      FlushContext allContext,
+      IWritableMemChunk series) {
+    this.deviceId = deviceId;
+    this.seriesId = seriesId;
+    this.deviceContext = deviceContext;
+    this.allContext = allContext;
+    this.series = series;
+  }
+
+  @Override
+  public DeviceIOTask nextTask() {
+    Integer seriesIndex = deviceContext.getSeriesIndexMap().get(seriesId);
+    deviceContext.getChunkWriters()[seriesIndex] = chunkWriter;
+    int encodedSeriesNum = deviceContext.getEncodedCounter().incrementAndGet();
+    if (encodedSeriesNum == deviceContext.getMeasurementIds().size()) {
+      // the whole device has been encoded, try flushing it
+      return new DeviceIOTask(allContext);
+    }
+    // some series are still under encoding, the last encoded one will trigger a DeviceIOTask
+    return null;
+  }
+
+  @Override
+  public void run() {
+    long starTime = System.currentTimeMillis();
+    chunkWriter = series.createIChunkWriter();
+    series.encode(chunkWriter);
+    chunkWriter.sealCurrentPage();
+    chunkWriter.clearPageWriter();
+
+    long subTaskTime = System.currentTimeMillis() - starTime;
+    allContext.getWritingMetrics().recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK, subTaskTime);
+    allContext.getEncodingTime().addAndGet(subTaskTime);
+  }
+
+  public IDeviceID getDeviceId() {
+    return deviceId;
+  }
+
+  public void setDeviceId(IDeviceID deviceId) {
+    this.deviceId = deviceId;
+  }
+
+  public IWritableMemChunk getSeries() {
+    return series;
+  }
+
+  public void setSeries(IWritableMemChunk series) {
+    this.series = series;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FinalTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FinalTask.java
new file mode 100644
index 0000000000..7b4b4f0bfc
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FinalTask.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.flush.tasks;
+
+public class FinalTask implements Task {
+
+  @Override
+  public void run() {
+    Thread.currentThread().interrupt();
+  }
+
+  @Override
+  public Task nextTask() {
+    return null;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FlushContext.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FlushContext.java
new file mode 100644
index 0000000000..786f9211ac
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FlushContext.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.flush.tasks;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.service.metrics.recorder.WritingMetricsManager;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+
+public class FlushContext {
+  private final WritingMetricsManager WRITING_METRICS = WritingMetricsManager.getInstance();
+  private List<FlushDeviceContext> deviceContexts;
+  private AtomicInteger cursor = new AtomicInteger();
+  private RestorableTsFileIOWriter writer;
+  private AtomicLong sortTime = new AtomicLong();
+  private AtomicLong encodingTime = new AtomicLong();
+  private AtomicLong ioTime = new AtomicLong();
+  private IMemTable memTable;
+
+  public List<FlushDeviceContext> getDeviceContexts() {
+    return deviceContexts;
+  }
+
+  public void setDeviceContexts(List<FlushDeviceContext> deviceContexts) {
+    this.deviceContexts = deviceContexts;
+  }
+
+
+  public int getCursor() {
+    return cursor.get();
+  }
+
+  public void setCursor(int cursor) {
+    this.cursor.set(cursor);
+  }
+
+  public WritingMetricsManager getWritingMetrics() {
+    return WRITING_METRICS;
+  }
+
+  public AtomicLong getSortTime() {
+    return sortTime;
+  }
+
+  public AtomicLong getEncodingTime() {
+    return encodingTime;
+  }
+
+  public AtomicLong getIoTime() {
+    return ioTime;
+  }
+
+  public RestorableTsFileIOWriter getWriter() {
+    return writer;
+  }
+
+  public void setWriter(RestorableTsFileIOWriter writer) {
+    this.writer = writer;
+  }
+
+  public IMemTable getMemTable() {
+    return memTable;
+  }
+
+  public void setMemTable(IMemTable memTable) {
+    this.memTable = memTable;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FlushDeviceContext.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FlushDeviceContext.java
new file mode 100644
index 0000000000..a78d79ff89
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FlushDeviceContext.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.flush.tasks;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+
+public class FlushDeviceContext {
+  private IDeviceID deviceID;
+  private List<String> measurementIds;
+  private IChunkWriter[] chunkWriters;
+  private AtomicInteger encodedCounter = new AtomicInteger();
+  private Map<String, Integer> seriesIndexMap;
+
+
+  public AtomicInteger getEncodedCounter() {
+    return encodedCounter;
+  }
+
+  public IDeviceID getDeviceID() {
+    return deviceID;
+  }
+
+  public void setDeviceID(IDeviceID deviceID) {
+    this.deviceID = deviceID;
+  }
+
+  public List<String> getMeasurementIds() {
+    return measurementIds;
+  }
+
+  public void setMeasurementIds(List<String> measurementIds) {
+    this.measurementIds = measurementIds;
+  }
+
+  public IChunkWriter[] getChunkWriters() {
+    return chunkWriters;
+  }
+
+  public void setChunkWriters(IChunkWriter[] chunkWriters) {
+    this.chunkWriters = chunkWriters;
+  }
+
+  public Map<String, Integer> getSeriesIndexMap() {
+    return seriesIndexMap;
+  }
+
+  public void setSeriesIndexMap(Map<String, Integer> seriesIndexMap) {
+    this.seriesIndexMap = seriesIndexMap;
+  }
+
+  public boolean isFullyEncoded() {
+    return encodedCounter.get() == measurementIds.size();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/SortSeriesTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/SortSeriesTask.java
new file mode 100644
index 0000000000..96da7d91f3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/SortSeriesTask.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.flush.tasks;
+
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
+import org.apache.iotdb.db.service.metrics.WritingMetrics;
+
+public class SortSeriesTask implements Task{
+  private IDeviceID deviceId;
+  private String seriesId;
+  private FlushDeviceContext deviceContext;
+  private FlushContext allContext;
+  private IWritableMemChunk series;
+
+  public EncodeSeriesTask nextTask() {
+    return new EncodeSeriesTask(deviceId, seriesId, deviceContext, allContext, series);
+  }
+
+  public void run() {
+    long startTime = System.currentTimeMillis();
+    /*
+     * sort task (first task of flush pipeline)
+     */
+    series.sortTvListForFlush();
+    long subTaskTime = System.currentTimeMillis() - startTime;
+    allContext.getSortTime().addAndGet(subTaskTime);
+    allContext.getWritingMetrics().recordFlushSubTaskCost(WritingMetrics.SORT_TASK, subTaskTime);
+  }
+
+  public IDeviceID getDeviceId() {
+    return deviceId;
+  }
+
+  public void setDeviceId(IDeviceID deviceId) {
+    this.deviceId = deviceId;
+  }
+
+  public void setSeriesId(String seriesId) {
+    this.seriesId = seriesId;
+  }
+
+  public void setDeviceContext(FlushDeviceContext deviceContext) {
+    this.deviceContext = deviceContext;
+  }
+
+  public void setAllContext(FlushContext allContext) {
+    this.allContext = allContext;
+  }
+
+  public IWritableMemChunk getSeries() {
+    return series;
+  }
+
+  public void setSeries(IWritableMemChunk series) {
+    this.series = series;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/Task.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/Task.java
new file mode 100644
index 0000000000..2bbd3ede2c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/Task.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.flush.tasks;
+
+public interface Task {
+  void run();
+  Task nextTask();
+}