You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/04/21 03:55:39 UTC
[iotdb] branch ISSUE_5792 updated: add comment
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch ISSUE_5792
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ISSUE_5792 by this push:
new a0dc6c9619 add comment
a0dc6c9619 is described below
commit a0dc6c9619b7be7a0e1e941b9797eacc4e6aedf2
Author: Tian Jiang <jt...@163.com>
AuthorDate: Fri Apr 21 11:58:07 2023 +0800
add comment
---
.../concurrent/dynamic/DynamicThreadGroup.java | 7 ++
.../iotdb/commons/concurrent/pipeline}/Task.java | 13 ++-
.../commons/concurrent/pipeline/TaskRunner.java | 73 +++++++++++++++++
.../iotdb/db/engine/flush/MemTableFlushTaskV2.java | 92 ++++------------------
.../iotdb/db/engine/flush/tasks/DeviceIOTask.java | 16 +++-
.../db/engine/flush/tasks/EncodeSeriesTask.java | 6 ++
.../iotdb/db/engine/flush/tasks/FinalTask.java | 5 ++
.../iotdb/db/engine/flush/tasks/FlushContext.java | 4 +
.../db/engine/flush/tasks/SortSeriesTask.java | 4 +
.../db/engine/storagegroup/TsFileProcessor.java | 6 +-
10 files changed, 145 insertions(+), 81 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
index 2266d5c579..240917d076 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.commons.concurrent.dynamic;
+import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,4 +99,10 @@ public class DynamicThreadGroup {
threadFutureMap.clear();
threadCnt.set(0);
}
+
+ public void join() throws ExecutionException, InterruptedException {
+ for (Future<?> future : threadFutureMap.values()) {
+ future.get();
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/Task.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/pipeline/Task.java
similarity index 68%
rename from server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/Task.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/concurrent/pipeline/Task.java
index ee524be535..b528f48025 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/Task.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/pipeline/Task.java
@@ -16,10 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.engine.flush.tasks;
+package org.apache.iotdb.commons.concurrent.pipeline;
+/**
+ * A pipeline task that will generate the next task that should be sent to the next stage in the
+ * pipeline after being run.
+ */
public interface Task {
+
+ /** Run the task. Must be called before nextTask(). */
void run();
+ /**
+ * Generate the next task (if any). Must be called after run().
+ *
+ * @return the task for the next stage or null if there is no further tasks.
+ */
Task nextTask();
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/pipeline/TaskRunner.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/pipeline/TaskRunner.java
new file mode 100644
index 0000000000..b9b308c677
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/pipeline/TaskRunner.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.concurrent.pipeline;
+
+import org.apache.iotdb.commons.concurrent.dynamic.DynamicThread;
+import org.apache.iotdb.commons.concurrent.dynamic.DynamicThreadGroup;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * A thread that will continuously take tasks from an input queue, run the task, and insert the next
+ * task to the output queue. By connecting multiple TaskRunners with input queues and output queues,
+ * a pipeline is formed.
+ */
+public class TaskRunner extends DynamicThread {
+
+ private Runnable cleanUp;
+ private BlockingQueue<Task> input;
+ private BlockingQueue<Task> output;
+
+ public TaskRunner(
+ DynamicThreadGroup threadGroup,
+ Runnable cleanUp,
+ BlockingQueue<Task> input,
+ BlockingQueue<Task> output) {
+ super(threadGroup);
+ this.cleanUp = cleanUp;
+ this.input = input;
+ this.output = output;
+ }
+
+ @Override
+ public void runInternal() {
+ while (!Thread.interrupted()) {
+ Task task;
+ try {
+ task = input.take();
+ idleToRunning();
+ task.run();
+ Task nextTask = task.nextTask();
+ if (nextTask != null) {
+ output.put(nextTask);
+ }
+ runningToIdle();
+
+ if (shouldExit()) {
+ break;
+ }
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+
+ cleanUp.run();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskV2.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskV2.java
index 1329fed663..6a4b95eaa6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskV2.java
@@ -20,6 +20,8 @@ package org.apache.iotdb.db.engine.flush;
import org.apache.iotdb.commons.concurrent.dynamic.DynamicThread;
import org.apache.iotdb.commons.concurrent.dynamic.DynamicThreadGroup;
+import org.apache.iotdb.commons.concurrent.pipeline.Task;
+import org.apache.iotdb.commons.concurrent.pipeline.TaskRunner;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
@@ -29,7 +31,6 @@ import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
import org.apache.iotdb.db.engine.flush.tasks.FlushContext;
import org.apache.iotdb.db.engine.flush.tasks.FlushDeviceContext;
import org.apache.iotdb.db.engine.flush.tasks.SortSeriesTask;
-import org.apache.iotdb.db.engine.flush.tasks.Task;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
@@ -51,9 +52,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -70,7 +69,7 @@ public class MemTableFlushTaskV2 {
private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final DynamicThreadGroup sortTasks;
private final DynamicThreadGroup encodingTasks;
- private final Future<?> ioTaskFuture;
+ private final DynamicThreadGroup ioTask;
private final LinkedBlockingQueue<Task> sortTaskQueue = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<Task> encodingTaskQueue = new LinkedBlockingQueue<>();
@@ -116,7 +115,12 @@ public class MemTableFlushTaskV2 {
this::newEncodingThread,
config.getFlushMemTableMinSubThread(),
config.getFlushMemTableMaxSubThread());
- this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(newIOThread());
+ this.ioTask = new DynamicThreadGroup(
+ storageGroup + "-" + dataRegionId + "-" + memTable,
+ SUB_TASK_POOL_MANAGER::submit,
+ this::newIOThread,
+ 1,
+ 1);
LOGGER.debug(
"flush task of database {} memtable is created, flushing to file {}.",
storageGroup,
@@ -164,6 +168,7 @@ public class MemTableFlushTaskV2 {
allContext.setDeviceContexts(new ArrayList<>());
for (IDeviceID deviceID : deviceIDList) {
+ // create a context for each device
FlushDeviceContext flushDeviceContext = new FlushDeviceContext();
flushDeviceContext.setDeviceID(deviceID);
allContext.getDeviceContexts().add(flushDeviceContext);
@@ -173,6 +178,7 @@ public class MemTableFlushTaskV2 {
// skip the empty device/chunk group
seriesInOrder.removeIf(s -> memChunkMap.get(s).count() == 0);
seriesInOrder.sort((String::compareTo));
+ // record the series order in the device context
flushDeviceContext.setMeasurementIds(seriesInOrder);
flushDeviceContext.setChunkWriters(new IChunkWriter[seriesInOrder.size()]);
flushDeviceContext.setSeriesIndexMap(new HashMap<>());
@@ -195,9 +201,9 @@ public class MemTableFlushTaskV2 {
}
try {
- ioTaskFuture.get();
+ ioTask.join();
} catch (InterruptedException | ExecutionException e) {
- ioTaskFuture.cancel(true);
+ ioTask.cancelAll();
encodingTasks.cancelAll();
sortTasks.cancelAll();
if (e instanceof InterruptedException) {
@@ -263,82 +269,16 @@ public class MemTableFlushTaskV2 {
}
}
- protected class TaskRunner extends DynamicThread {
-
- private static final String TASK_NAME_SORT = "sort data";
- private static final String TASK_NAME_ENCODING = "encode data";
- private static final String TASK_NAME_IO = "write file";
- private Runnable cleanUp;
- private String taskName;
- private BlockingQueue<Task> input;
- private BlockingQueue<Task> output;
-
- public TaskRunner(
- DynamicThreadGroup threadGroup,
- Runnable cleanUp,
- String taskName,
- BlockingQueue<Task> input,
- BlockingQueue<Task> output) {
- super(threadGroup);
- this.cleanUp = cleanUp;
- this.taskName = taskName;
- this.input = input;
- this.output = output;
- }
-
- @Override
- public void runInternal() {
- LOGGER.debug(
- "Database {} memtable flushing to file {} starts to {}.",
- storageGroup,
- allContext.getWriter().getFile().getName(),
- taskName);
- while (!Thread.interrupted()) {
- Task task;
- try {
- task = input.take();
- idleToRunning();
- task.run();
- Task nextTask = task.nextTask();
- if (nextTask != null) {
- output.put(nextTask);
- }
- runningToIdle();
-
- if (shouldExit()) {
- break;
- }
- } catch (InterruptedException e1) {
- Thread.currentThread().interrupt();
- break;
- }
- }
-
- cleanUp.run();
- }
- }
-
private DynamicThread newSortThread() {
- return new TaskRunner(
- sortTasks,
- this::cleanSortThread,
- TaskRunner.TASK_NAME_SORT,
- sortTaskQueue,
- encodingTaskQueue);
+ return new TaskRunner(sortTasks, this::cleanSortThread, sortTaskQueue, encodingTaskQueue);
}
private DynamicThread newEncodingThread() {
- return new TaskRunner(
- encodingTasks,
- this::cleanEncodingThread,
- TaskRunner.TASK_NAME_ENCODING,
- encodingTaskQueue,
- ioTaskQueue);
+ return new TaskRunner(encodingTasks, this::cleanEncodingThread, encodingTaskQueue, ioTaskQueue);
}
private DynamicThread newIOThread() {
- return new TaskRunner(
- null, this::cleanIOThread, TaskRunner.TASK_NAME_IO, ioTaskQueue, ioTaskQueue);
+ return new TaskRunner(null, this::cleanIOThread, ioTaskQueue, ioTaskQueue);
}
private void cleanSortThread() {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/DeviceIOTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/DeviceIOTask.java
index 161853117c..cc3c9ba3c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/DeviceIOTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/DeviceIOTask.java
@@ -18,13 +18,24 @@
*/
package org.apache.iotdb.db.engine.flush.tasks;
+import org.apache.iotdb.commons.concurrent.pipeline.Task;
import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
import org.apache.iotdb.db.service.metrics.WritingMetrics;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import java.io.IOException;
+/**
+ * A DeviceIOTask will check if the current device in the FlushContext (indicated by
+ * FlushContext.cursor) is fully encoded. If it is fully encoded, it will be flushed to the
+ * underlying file and the cursor will be moved to the next device.
+ * <p>
+ * The above procedure repeats until the current device is still being encoded or all devices are
+ * flushed. If all devices are flushed, a FinalTask will be generated. Otherwise, no new task will
+ * be generated.
+ */
public class DeviceIOTask implements Task {
+
private FlushContext flushContext;
public DeviceIOTask(FlushContext flushContext) {
@@ -39,12 +50,15 @@ public class DeviceIOTask implements Task {
if (cursor < flushContext.getDeviceContexts().size()) {
FlushDeviceContext flushDeviceContext = flushContext.getDeviceContexts().get(cursor);
if (flushDeviceContext.isFullyEncoded()) {
+ // the current device is ready, flush it and move to the next device
flushOneDevice(flushDeviceContext);
flushContext.setCursor(cursor + 1);
} else {
+ // the current device is still being flushed
hasNext = false;
}
} else {
+ // all devices are flushed
hasNext = false;
}
}
@@ -70,7 +84,7 @@ public class DeviceIOTask implements Task {
@Override
public FinalTask nextTask() {
- if (flushContext.getCursor() == flushContext.getDeviceContexts().size()) {
+ if (flushContext.allFlushed()) {
// all devices have been flushed
return new FinalTask();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/EncodeSeriesTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/EncodeSeriesTask.java
index 083db4ba5d..b03010b1d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/EncodeSeriesTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/EncodeSeriesTask.java
@@ -18,12 +18,18 @@
*/
package org.apache.iotdb.db.engine.flush.tasks;
+import org.apache.iotdb.commons.concurrent.pipeline.Task;
import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
import org.apache.iotdb.db.service.metrics.WritingMetrics;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+/**
+ * EncodeSeriesTask encodes a timeseries to its byte form. If all timeseries of the device are
+ * encoded, a DeviceIOTask will be generated. Otherwise, no new task will be generated.
+ */
public class EncodeSeriesTask implements Task {
+
private IDeviceID deviceId;
private String seriesId;
private FlushDeviceContext deviceContext;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FinalTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FinalTask.java
index 7b4b4f0bfc..632f9b4ef3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FinalTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FinalTask.java
@@ -18,6 +18,11 @@
*/
package org.apache.iotdb.db.engine.flush.tasks;
+import org.apache.iotdb.commons.concurrent.pipeline.Task;
+
+/**
+ * FinalTask ends the current TaskRunner by interruption.
+ */
public class FinalTask implements Task {
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FlushContext.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FlushContext.java
index 6e4e2acb5e..87523c8bc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FlushContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/FlushContext.java
@@ -83,4 +83,8 @@ public class FlushContext {
public void setMemTable(IMemTable memTable) {
this.memTable = memTable;
}
+
+ public boolean allFlushed() {
+ return cursor.get() == deviceContexts.size();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/SortSeriesTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/SortSeriesTask.java
index e9d9f73711..a5c529f6ed 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/SortSeriesTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/tasks/SortSeriesTask.java
@@ -18,10 +18,14 @@
*/
package org.apache.iotdb.db.engine.flush.tasks;
+import org.apache.iotdb.commons.concurrent.pipeline.Task;
import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
import org.apache.iotdb.db.service.metrics.WritingMetrics;
+/**
+ * SortSeriesTask sorts a timeseries and generates the associated encoding task.
+ */
public class SortSeriesTask implements Task {
private IDeviceID deviceId;
private String seriesId;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 395c372ac3..6e0a248270 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -32,7 +32,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.flush.CloseFileListener;
import org.apache.iotdb.db.engine.flush.FlushListener;
import org.apache.iotdb.db.engine.flush.FlushManager;
-import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
+import org.apache.iotdb.db.engine.flush.MemTableFlushTaskV2;
import org.apache.iotdb.db.engine.flush.NotifyFlushMemTable;
import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunk;
import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunkGroup;
@@ -1077,8 +1077,8 @@ public class TsFileProcessor {
} else {
try {
writer.mark();
- MemTableFlushTask flushTask =
- new MemTableFlushTask(
+ MemTableFlushTaskV2 flushTask =
+ new MemTableFlushTaskV2(
memTableToFlush,
writer,
storageGroupName,