You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/04/20 06:04:07 UTC
[iotdb] branch ISSUE_5792 updated: add MemTableFlushTaskv2
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch ISSUE_5792
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ISSUE_5792 by this push:
new ab618b7495 add MemTableFlushTaskv2
ab618b7495 is described below
commit ab618b7495653fa8a25d1c3bf6188b44bd8635f0
Author: Tian Jiang <jt...@163.com>
AuthorDate: Thu Apr 20 14:06:34 2023 +0800
add MemTableFlushTaskv2
---
.../commons/concurrent/dynamic/DynamicThread.java | 12 +-
.../concurrent/dynamic/DynamicThreadGroup.java | 18 +-
.../iotdb/db/engine/flush/MemTableFlushTaskV2.java | 355 +++++++++++++++++++++
.../iotdb/db/engine/flush/tasks/DeviceIOTask.java | 78 +++++
.../db/engine/flush/tasks/EncodeSeriesTask.java | 85 +++++
.../iotdb/db/engine/flush/tasks/FinalTask.java | 32 ++
.../iotdb/db/engine/flush/tasks/FlushContext.java | 86 +++++
.../db/engine/flush/tasks/FlushDeviceContext.java | 74 +++++
.../db/engine/flush/tasks/SortSeriesTask.java | 74 +++++
.../apache/iotdb/db/engine/flush/tasks/Task.java | 24 ++
10 files changed, 833 insertions(+), 5 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThread.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThread.java
index 4620de5159..1bbb6d16c7 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThread.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThread.java
@@ -35,6 +35,10 @@ public abstract class DynamicThread implements Runnable {
private double maximumIdleRatio = 0.5;
private double minimumIdleRatio = 0.1;
+ public DynamicThread(DynamicThreadGroup threadGroup) {
+ this.threadGroup = threadGroup;
+ }
+
/**
* The implementation must call idleToRunning() and runningToIdle() properly. E.g., {
* {@code {while(! Thread.interrupted ()) { Object obj = blockingQueue.poll(); idleToRunning();
@@ -65,6 +69,10 @@ public abstract class DynamicThread implements Runnable {
}
protected boolean shouldExit() {
+ if (threadGroup == null) {
+ return false;
+ }
+
double idleRatio = idleRatio();
if (idleRatio < minimumIdleRatio) {
// Thread too busy, try adding a new thread
@@ -91,7 +99,9 @@ public abstract class DynamicThread implements Runnable {
try {
runInternal();
} finally {
- threadGroup.onThreadExit(this);
+ if (threadGroup != null) {
+ threadGroup.onThreadExit(this);
+ }
}
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
index 7216437ea9..9d3199d0bb 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
@@ -23,11 +23,12 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
import java.util.function.Supplier;
public class DynamicThreadGroup {
- private ExecutorService hostingPool;
+ private Function<Runnable, Future<?>> poolSubmitter;
private String name;
private Supplier<DynamicThread> threadFactory;
private AtomicInteger threadCnt = new AtomicInteger();
@@ -35,13 +36,16 @@ public class DynamicThreadGroup {
private int maxThreadCnt = 8;
private Map<DynamicThread, Future<?>> threadFutureMap = new ConcurrentHashMap<>();
- public DynamicThreadGroup(String name, ExecutorService hostingPool,
+ public DynamicThreadGroup(String name, Function<Runnable, Future<?>> poolSubmitter,
Supplier<DynamicThread> threadFactory, int minThreadCnt, int maxThreadCnt) {
this.name = name;
- this.hostingPool = hostingPool;
+ this.poolSubmitter = poolSubmitter;
this.threadFactory = threadFactory;
this.minThreadCnt = Math.max(1, minThreadCnt);
this.maxThreadCnt = Math.max(this.minThreadCnt, maxThreadCnt);
+ for (int i = 0; i < this.minThreadCnt; i++) {
+ addThread();
+ }
}
/**
@@ -51,7 +55,7 @@ public class DynamicThreadGroup {
int afterCnt = threadCnt.incrementAndGet();
if (afterCnt <= maxThreadCnt) {
DynamicThread dynamicThread = threadFactory.get();
- Future<?> submit = hostingPool.submit(dynamicThread);
+ Future<?> submit = poolSubmitter.apply(dynamicThread);
threadFutureMap.put(dynamicThread, submit);
} else {
threadCnt.decrementAndGet();
@@ -74,4 +78,10 @@ public class DynamicThreadGroup {
threadCnt.decrementAndGet();
threadFutureMap.remove(dynamicThread);
}
+
+ public void cancelAll() {
+ threadFutureMap.forEach((t,f) -> f.cancel(true));
+ threadFutureMap.clear();
+ threadCnt.set(0);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskV2.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskV2.java
new file mode 100644
index 0000000000..75cc4d03cc
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskV2.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.flush;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.commons.concurrent.dynamic.DynamicThread;
+import org.apache.iotdb.commons.concurrent.dynamic.DynamicThreadGroup;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
+import org.apache.iotdb.db.engine.flush.tasks.FlushContext;
+import org.apache.iotdb.db.engine.flush.tasks.FlushDeviceContext;
+import org.apache.iotdb.db.engine.flush.tasks.SortSeriesTask;
+import org.apache.iotdb.db.engine.flush.tasks.Task;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
+import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
+import org.apache.iotdb.db.rescon.SystemInfo;
+import org.apache.iotdb.db.service.metrics.WritingMetrics;
+import org.apache.iotdb.db.service.metrics.recorder.WritingMetricsManager;
+import org.apache.iotdb.metrics.utils.IoTDBMetricsUtils;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * flush task to flush one memtable using a pipeline model to flush, which is sort memtable ->
+ * encoding -> write to disk (io task)
+ */
+public class MemTableFlushTaskV2 {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTaskV2.class);
+ private static final FlushSubTaskPoolManager SUB_TASK_POOL_MANAGER =
+ FlushSubTaskPoolManager.getInstance();
+ private static final WritingMetricsManager WRITING_METRICS = WritingMetricsManager.getInstance();
+ private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private final DynamicThreadGroup sortTasks;
+ private final DynamicThreadGroup encodingTasks;
+ private final Future<?> ioTaskFuture;
+
+ private final LinkedBlockingQueue<Task> sortTaskQueue = new LinkedBlockingQueue<>();
+ private final LinkedBlockingQueue<Task> encodingTaskQueue = new LinkedBlockingQueue<>();
+ private final LinkedBlockingQueue<Task> ioTaskQueue =
+ (config.isEnableMemControl() && SystemInfo.getInstance().isEncodingFasterThanIo())
+ ? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing())
+ : new LinkedBlockingQueue<>();
+
+ private String storageGroup;
+ private String dataRegionId;
+
+ private IMemTable memTable;
+ private FlushContext allContext;
+
+ /**
+ * @param memTable the memTable to flush
+ * @param writer the writer where memTable will be flushed to (current tsfile writer or vm
+ * writer)
+ * @param storageGroup current database
+ */
+ public MemTableFlushTaskV2(
+ IMemTable memTable,
+ RestorableTsFileIOWriter writer,
+ String storageGroup,
+ String dataRegionId) {
+ this.memTable = memTable;
+ this.storageGroup = storageGroup;
+ this.dataRegionId = dataRegionId;
+ this.allContext = new FlushContext();
+ this.allContext.setWriter(writer);
+ this.allContext.setMemTable(memTable);
+
+ this.sortTasks = new DynamicThreadGroup(storageGroup + "-" + dataRegionId + "-" + memTable,
+ SUB_TASK_POOL_MANAGER::submit, this::newSortThread, 1, 8);
+ this.encodingTasks = new DynamicThreadGroup(storageGroup + "-" + dataRegionId + "-" + memTable,
+ SUB_TASK_POOL_MANAGER::submit, this::newEncodingThread, 1, 8);
+ this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(newIOThread());
+ LOGGER.debug(
+ "flush task of database {} memtable is created, flushing to file {}.",
+ storageGroup,
+ allContext.getWriter().getFile().getName());
+ }
+
+
+ /**
+ * the function for flushing memtable.
+ */
+ public void syncFlushMemTable() throws ExecutionException, InterruptedException {
+ long avgSeriesPointsNum =
+ memTable.getSeriesNumber() == 0
+ ? 0
+ : memTable.getTotalPointsNum() / memTable.getSeriesNumber();
+ LOGGER.info(
+ "The memTable size of SG {} is {}, the avg series points num in chunk is {}, total timeseries number is {}",
+ storageGroup,
+ memTable.memSize(),
+ avgSeriesPointsNum,
+ memTable.getSeriesNumber());
+ WRITING_METRICS.recordFlushingMemTableStatus(
+ storageGroup,
+ memTable.memSize(),
+ memTable.getSeriesNumber(),
+ memTable.getTotalPointsNum(),
+ avgSeriesPointsNum);
+
+ long estimatedTemporaryMemSize = 0L;
+ if (config.isEnableMemControl() && SystemInfo.getInstance().isEncodingFasterThanIo()) {
+ estimatedTemporaryMemSize =
+ memTable.getSeriesNumber() == 0
+ ? 0
+ : memTable.memSize()
+ / memTable.getSeriesNumber()
+ * config.getIoTaskQueueSizeForFlushing();
+ SystemInfo.getInstance().applyTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
+ }
+ long start = System.currentTimeMillis();
+
+ // for map do not use get(key) to iterate
+ Map<IDeviceID, IWritableMemChunkGroup> memTableMap = memTable.getMemTableMap();
+ List<IDeviceID> deviceIDList = new ArrayList<>(memTableMap.keySet());
+ // sort the IDeviceID in lexicographical order
+ deviceIDList.sort(Comparator.comparing(IDeviceID::toStringID));
+ deviceIDList.removeIf(d -> memTableMap.get(d).count() == 0 ||
+ memTableMap.get(d).getMemChunkMap().isEmpty());
+
+ allContext = new FlushContext();
+ allContext.setDeviceContexts(new ArrayList<>());
+
+ for (IDeviceID deviceID : deviceIDList) {
+ FlushDeviceContext flushDeviceContext = new FlushDeviceContext();
+ allContext.getDeviceContexts().add(flushDeviceContext);
+ flushDeviceContext.setDeviceID(deviceID);
+ final Map<String, IWritableMemChunk> value = memTableMap.get(deviceID).getMemChunkMap();
+ List<String> seriesInOrder = new ArrayList<>(value.keySet());
+ // skip the empty device/chunk group
+ seriesInOrder.removeIf(s -> value.get(s).count() == 0);
+ seriesInOrder.sort((String::compareTo));
+ flushDeviceContext.setMeasurementIds(seriesInOrder);
+ flushDeviceContext.setChunkWriters(new IChunkWriter[seriesInOrder.size()]);
+ flushDeviceContext.setSeriesIndexMap(new HashMap<>());
+
+ for (int j = 0; j < seriesInOrder.size(); j++) {
+ // starting from sorting each series
+ String seriesId = seriesInOrder.get(j);
+ flushDeviceContext.getSeriesIndexMap().put(seriesId, j);
+ IWritableMemChunk series = value.get(seriesId);
+
+ SortSeriesTask sortSeriesTask = new SortSeriesTask();
+ sortSeriesTask.setSeriesId(seriesId);
+ sortSeriesTask.setAllContext(allContext);
+ sortSeriesTask.setDeviceId(deviceID);
+ sortSeriesTask.setSeries(series);
+ sortSeriesTask.setDeviceContext(flushDeviceContext);
+
+ sortTaskQueue.put(sortSeriesTask);
+ }
+ }
+
+ try {
+ ioTaskFuture.get();
+ } catch (InterruptedException | ExecutionException e) {
+ ioTaskFuture.cancel(true);
+ encodingTasks.cancelAll();
+ sortTasks.cancelAll();
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ throw e;
+ }
+ encodingTasks.cancelAll();
+ sortTasks.cancelAll();
+
+ try {
+ long writePlanIndicesStartTime = System.currentTimeMillis();
+ allContext.getWriter().writePlanIndices();
+ WRITING_METRICS.recordFlushCost(
+ WritingMetrics.WRITE_PLAN_INDICES,
+ System.currentTimeMillis() - writePlanIndicesStartTime);
+ } catch (IOException e) {
+ throw new ExecutionException(e);
+ }
+
+ if (config.isEnableMemControl()) {
+ if (estimatedTemporaryMemSize != 0) {
+ SystemInfo.getInstance().releaseTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
+ }
+ SystemInfo.getInstance().setEncodingFasterThanIo(
+ allContext.getIoTime().get() >= allContext.getEncodingTime().get());
+ }
+
+ MetricService.getInstance()
+ .timer(
+ System.currentTimeMillis() - start,
+ TimeUnit.MILLISECONDS,
+ Metric.COST_TASK.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ "flush");
+
+ LOGGER.info(
+ "Database {} memtable {} flushing a memtable has finished! Time consumption: {}ms",
+ storageGroup,
+ memTable,
+ System.currentTimeMillis() - start);
+ }
+
+ protected void metricFlush() {
+ if (!storageGroup.startsWith(IoTDBMetricsUtils.DATABASE)) {
+ int lastIndex = storageGroup.lastIndexOf("-");
+ if (lastIndex == -1) {
+ lastIndex = storageGroup.length();
+ }
+ MetricService.getInstance()
+ .gaugeWithInternalReportAsync(
+ memTable.getTotalPointsNum(),
+ Metric.POINTS.toString(),
+ MetricLevel.CORE,
+ Tag.DATABASE.toString(),
+ storageGroup.substring(0, lastIndex),
+ Tag.TYPE.toString(),
+ "flush",
+ Tag.REGION.toString(),
+ dataRegionId);
+ }
+ }
+
+
+ protected class TaskRunner extends DynamicThread {
+
+ private static final String TASK_NAME_SORT = "sort data";
+ private static final String TASK_NAME_ENCODING = "encode data";
+ private static final String TASK_NAME_IO = "write file";
+ private Runnable cleanUp;
+ private String taskName;
+ private BlockingQueue<Task> input;
+ private BlockingQueue<Task> output;
+
+ public TaskRunner(DynamicThreadGroup threadGroup, Runnable cleanUp, String taskName,
+ BlockingQueue<Task> input, BlockingQueue<Task> output) {
+ super(threadGroup);
+ this.cleanUp = cleanUp;
+ this.taskName = taskName;
+ this.input = input;
+ this.output = output;
+ }
+
+ @Override
+ public void runInternal() {
+ LOGGER.debug(
+ "Database {} memtable flushing to file {} starts to {}.",
+ storageGroup,
+ allContext.getWriter().getFile().getName(),
+ taskName);
+ while (!Thread.interrupted()) {
+ Task task;
+ try {
+ task = input.take();
+ idleToRunning();
+ task.run();
+ Task nextTask = task.nextTask();
+ if (nextTask != null) {
+ output.put(nextTask);
+ }
+ runningToIdle();
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+
+ cleanUp.run();
+ }
+ }
+
+ private DynamicThread newSortThread() {
+ return new TaskRunner(sortTasks, this::cleanSortThread, TaskRunner.TASK_NAME_SORT,
+ sortTaskQueue, encodingTaskQueue);
+ }
+
+ private DynamicThread newEncodingThread() {
+ return new TaskRunner(encodingTasks, this::cleanEncodingThread, TaskRunner.TASK_NAME_ENCODING,
+ encodingTaskQueue, ioTaskQueue);
+ }
+
+ private DynamicThread newIOThread() {
+ return new TaskRunner(null, this::cleanIOThread, TaskRunner.TASK_NAME_IO,
+ ioTaskQueue, ioTaskQueue);
+ }
+
+
+ private void cleanSortThread() {
+ metricFlush();
+ LOGGER.info(
+ "Database {}, flushing memtable {} into disk: Sort data cost " + "{} ms.",
+ storageGroup,
+ allContext.getWriter().getFile().getName(),
+ allContext.getSortTime().get());
+ WRITING_METRICS.recordFlushCost(WritingMetrics.FLUSH_STAGE_SORT,
+ allContext.getSortTime().get());
+ }
+
+ private void cleanEncodingThread() {
+ metricFlush();
+ LOGGER.info(
+ "Database {}, flushing memtable {} into disk: Encoding data cost " + "{} ms.",
+ storageGroup,
+ allContext.getWriter().getFile().getName(),
+ allContext.getEncodingTime().get());
+ WRITING_METRICS.recordFlushCost(WritingMetrics.FLUSH_STAGE_ENCODING,
+ allContext.getEncodingTime().get());
+ }
+ private void cleanIOThread() {
+ metricFlush();
+ LOGGER.info(
+ "Database {}, flushing memtable {} into disk: IO cost " + "{} ms.",
+ storageGroup,
+ allContext.getWriter().getFile().getName(),
+ allContext.getIoTime().get());
+ WRITING_METRICS.recordFlushCost(WritingMetrics.FLUSH_STAGE_IO, allContext.getIoTime().get());
+ WRITING_METRICS.recordFlushTsFileSize(storageGroup,
+ allContext.getWriter().getFile().length());
+ }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/DeviceIOTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/DeviceIOTask.java
new file mode 100644
index 0000000000..e7f0012efd
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/DeviceIOTask.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.flush.tasks;
+
+import java.io.IOException;
+import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
+import org.apache.iotdb.db.service.metrics.WritingMetrics;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+
+public class DeviceIOTask implements Task {
+ private FlushContext flushContext;
+
+ public DeviceIOTask(FlushContext flushContext) {
+ this.flushContext = flushContext;
+ }
+
+ @Override
+ public void run() {
+ boolean hasNext = true;
+ while (hasNext) {
+ int cursor = flushContext.getCursor();
+ if (cursor < flushContext.getDeviceContexts().size()) {
+ FlushDeviceContext flushDeviceContext = flushContext.getDeviceContexts().get(cursor);
+ if (flushDeviceContext.isFullyEncoded()) {
+ flushOneDevice(flushDeviceContext);
+ flushContext.setCursor(cursor + 1);
+ } else {
+ hasNext = false;
+ }
+ } else {
+ hasNext = false;
+ }
+ }
+ }
+
+ public void flushOneDevice(FlushDeviceContext deviceContext) {
+ long starTime = System.currentTimeMillis();
+ try {
+ flushContext.getWriter().startChunkGroup(deviceContext.getDeviceID().toStringID());
+ for (IChunkWriter chunkWriter : deviceContext.getChunkWriters()) {
+ chunkWriter.writeToFileWriter(flushContext.getWriter());
+ }
+ flushContext.getWriter().setMinPlanIndex(flushContext.getMemTable().getMinPlanIndex());
+ flushContext.getWriter().setMaxPlanIndex(flushContext.getMemTable().getMaxPlanIndex());
+ flushContext.getWriter().endChunkGroup();
+ } catch (IOException e) {
+ throw new FlushRunTimeException(e);
+ }
+ long subTaskTime = System.currentTimeMillis() - starTime;
+ flushContext.getIoTime().addAndGet(subTaskTime);
+ flushContext.getWritingMetrics().recordFlushSubTaskCost(WritingMetrics.IO_TASK, subTaskTime);
+ }
+
+ @Override
+ public FinalTask nextTask() {
+ if (flushContext.getCursor() == flushContext.getDeviceContexts().size()) {
+ // all devices have been flushed
+ return new FinalTask();
+ }
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/EncodeSeriesTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/EncodeSeriesTask.java
new file mode 100644
index 0000000000..d6d6bfccd7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/EncodeSeriesTask.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.flush.tasks;
+
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
+import org.apache.iotdb.db.service.metrics.WritingMetrics;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+
+public class EncodeSeriesTask implements Task {
+ private IDeviceID deviceId;
+ private String seriesId;
+ private FlushDeviceContext deviceContext;
+ private FlushContext allContext;
+ private IWritableMemChunk series;
+ private IChunkWriter chunkWriter;
+
+ public EncodeSeriesTask(IDeviceID deviceId, String seriesId, FlushDeviceContext deviceContext,
+ FlushContext allContext,
+ IWritableMemChunk series) {
+ this.deviceId = deviceId;
+ this.seriesId = seriesId;
+ this.deviceContext = deviceContext;
+ this.allContext = allContext;
+ this.series = series;
+ }
+
+ @Override
+ public DeviceIOTask nextTask() {
+ Integer seriesIndex = deviceContext.getSeriesIndexMap().get(seriesId);
+ deviceContext.getChunkWriters()[seriesIndex] = chunkWriter;
+ int encodedSeriesNum = deviceContext.getEncodedCounter().incrementAndGet();
+ if (encodedSeriesNum == deviceContext.getMeasurementIds().size()) {
+ // the whole device has been encoded, try flushing it
+ return new DeviceIOTask(allContext);
+ }
+ // some series are still under encoding, the last encoded one will trigger a DeviceIOTask
+ return null;
+ }
+
+ @Override
+ public void run() {
+ long starTime = System.currentTimeMillis();
+ chunkWriter = series.createIChunkWriter();
+ series.encode(chunkWriter);
+ chunkWriter.sealCurrentPage();
+ chunkWriter.clearPageWriter();
+
+ long subTaskTime = System.currentTimeMillis() - starTime;
+ allContext.getWritingMetrics().recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK, subTaskTime);
+ allContext.getEncodingTime().addAndGet(subTaskTime);
+ }
+
+ public IDeviceID getDeviceId() {
+ return deviceId;
+ }
+
+ public void setDeviceId(IDeviceID deviceId) {
+ this.deviceId = deviceId;
+ }
+
+ public IWritableMemChunk getSeries() {
+ return series;
+ }
+
+ public void setSeries(IWritableMemChunk series) {
+ this.series = series;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FinalTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FinalTask.java
new file mode 100644
index 0000000000..7b4b4f0bfc
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FinalTask.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.flush.tasks;
+
+public class FinalTask implements Task {
+
+ @Override
+ public void run() {
+ Thread.currentThread().interrupt();
+ }
+
+ @Override
+ public Task nextTask() {
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FlushContext.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FlushContext.java
new file mode 100644
index 0000000000..786f9211ac
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FlushContext.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.flush.tasks;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.service.metrics.recorder.WritingMetricsManager;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+
+public class FlushContext {
+ private final WritingMetricsManager WRITING_METRICS = WritingMetricsManager.getInstance();
+ private List<FlushDeviceContext> deviceContexts;
+ private AtomicInteger cursor = new AtomicInteger();
+ private RestorableTsFileIOWriter writer;
+ private AtomicLong sortTime = new AtomicLong();
+ private AtomicLong encodingTime = new AtomicLong();
+ private AtomicLong ioTime = new AtomicLong();
+ private IMemTable memTable;
+
+ public List<FlushDeviceContext> getDeviceContexts() {
+ return deviceContexts;
+ }
+
+ public void setDeviceContexts(List<FlushDeviceContext> deviceContexts) {
+ this.deviceContexts = deviceContexts;
+ }
+
+
+ public int getCursor() {
+ return cursor.get();
+ }
+
+ public void setCursor(int cursor) {
+ this.cursor.set(cursor);
+ }
+
+ public WritingMetricsManager getWritingMetrics() {
+ return WRITING_METRICS;
+ }
+
+ public AtomicLong getSortTime() {
+ return sortTime;
+ }
+
+ public AtomicLong getEncodingTime() {
+ return encodingTime;
+ }
+
+ public AtomicLong getIoTime() {
+ return ioTime;
+ }
+
+ public RestorableTsFileIOWriter getWriter() {
+ return writer;
+ }
+
+ public void setWriter(RestorableTsFileIOWriter writer) {
+ this.writer = writer;
+ }
+
+ public IMemTable getMemTable() {
+ return memTable;
+ }
+
+ public void setMemTable(IMemTable memTable) {
+ this.memTable = memTable;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FlushDeviceContext.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FlushDeviceContext.java
new file mode 100644
index 0000000000..a78d79ff89
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FlushDeviceContext.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.flush.tasks;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+
+public class FlushDeviceContext {
+ private IDeviceID deviceID;
+ private List<String> measurementIds;
+ private IChunkWriter[] chunkWriters;
+ private AtomicInteger encodedCounter = new AtomicInteger();
+ private Map<String, Integer> seriesIndexMap;
+
+
+ public AtomicInteger getEncodedCounter() {
+ return encodedCounter;
+ }
+
+ public IDeviceID getDeviceID() {
+ return deviceID;
+ }
+
+ public void setDeviceID(IDeviceID deviceID) {
+ this.deviceID = deviceID;
+ }
+
+ public List<String> getMeasurementIds() {
+ return measurementIds;
+ }
+
+ public void setMeasurementIds(List<String> measurementIds) {
+ this.measurementIds = measurementIds;
+ }
+
+ public IChunkWriter[] getChunkWriters() {
+ return chunkWriters;
+ }
+
+ public void setChunkWriters(IChunkWriter[] chunkWriters) {
+ this.chunkWriters = chunkWriters;
+ }
+
+ public Map<String, Integer> getSeriesIndexMap() {
+ return seriesIndexMap;
+ }
+
+ public void setSeriesIndexMap(Map<String, Integer> seriesIndexMap) {
+ this.seriesIndexMap = seriesIndexMap;
+ }
+
+ public boolean isFullyEncoded() {
+ return encodedCounter.get() == measurementIds.size();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/SortSeriesTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/SortSeriesTask.java
new file mode 100644
index 0000000000..96da7d91f3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/SortSeriesTask.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.flush.tasks;
+
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
+import org.apache.iotdb.db.service.metrics.WritingMetrics;
+
+public class SortSeriesTask implements Task{
+ private IDeviceID deviceId;
+ private String seriesId;
+ private FlushDeviceContext deviceContext;
+ private FlushContext allContext;
+ private IWritableMemChunk series;
+
+ public EncodeSeriesTask nextTask() {
+ return new EncodeSeriesTask(deviceId, seriesId, deviceContext, allContext, series);
+ }
+
+ public void run() {
+ long startTime = System.currentTimeMillis();
+ /*
+ * sort task (first task of flush pipeline)
+ */
+ series.sortTvListForFlush();
+ long subTaskTime = System.currentTimeMillis() - startTime;
+ allContext.getSortTime().addAndGet(subTaskTime);
+ allContext.getWritingMetrics().recordFlushSubTaskCost(WritingMetrics.SORT_TASK, subTaskTime);
+ }
+
+ public IDeviceID getDeviceId() {
+ return deviceId;
+ }
+
+ public void setDeviceId(IDeviceID deviceId) {
+ this.deviceId = deviceId;
+ }
+
+ public void setSeriesId(String seriesId) {
+ this.seriesId = seriesId;
+ }
+
+ public void setDeviceContext(FlushDeviceContext deviceContext) {
+ this.deviceContext = deviceContext;
+ }
+
+ public void setAllContext(FlushContext allContext) {
+ this.allContext = allContext;
+ }
+
+ public IWritableMemChunk getSeries() {
+ return series;
+ }
+
+ public void setSeries(IWritableMemChunk series) {
+ this.series = series;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/Task.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/Task.java
new file mode 100644
index 0000000000..2bbd3ede2c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/Task.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.flush.tasks;
+
+public interface Task {
+ void run();
+ Task nextTask();
+}