You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/09 09:47:38 UTC
[iotdb] branch master updated: [IOTDB-2807]Speed up the cross space compaction by multi-threads (#5415)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 99c824f4d0 [IOTDB-2807]Speed up the cross space compaction by multi-threads (#5415)
99c824f4d0 is described below
commit 99c824f4d0d370c73eb97635c256e4ed990a7df0
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Sat Apr 9 17:47:33 2022 +0800
[IOTDB-2807]Speed up the cross space compaction by multi-threads (#5415)
---
.../iotdb/commons/concurrent/ThreadName.java | 1 +
.../resources/conf/iotdb-engine.properties | 6 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 14 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 7 +
.../engine/compaction/CompactionTaskManager.java | 18 ++
.../db/engine/compaction/CompactionUtils.java | 120 +++++---
.../cross/rewrite/task/SubCompactionTask.java | 107 +++++++
.../writer/AbstractCompactionWriter.java | 64 ++--
.../writer/CrossSpaceCompactionWriter.java | 80 ++---
.../writer/InnerSpaceCompactionWriter.java | 27 +-
.../db/engine/compaction/CompactionUtilsTest.java | 321 ++++++++++++++++++++-
.../compaction/cross/CrossSpaceCompactionTest.java | 3 +
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 8 +-
13 files changed, 642 insertions(+), 134 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index f6ea467f74..4df1111ea7 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -37,6 +37,7 @@ public enum ThreadName {
FLUSH_SERVICE("Flush"),
FLUSH_SUB_TASK_SERVICE("Flush-SubTask"),
COMPACTION_SERVICE("Compaction"),
+ COMPACTION_SUB_SERVICE("Sub-Compaction"),
COMPACTION_SCHEDULE("Compaction_Schedule"),
WAL_DAEMON("WAL-Sync"),
WAL_FORCE_DAEMON("WAL-Force"),
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 3d19ffeab5..a16b373130 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -525,6 +525,12 @@ timestamp_precision=ms
# Datatype: int
# query_timeout_threshold=60000
+# The number of sub compaction threads to be set up to perform compaction.
+# Currently only works for nonAligned data in cross space compaction and unseq inner space compaction.
+# Set to 1 when less than or equal to 0.
+# Datatype: int
+# sub_compaction_thread_num=4
+
####################
### Metadata Cache Configuration
####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index b4bde3f2b8..54464410d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -442,6 +442,12 @@ public class IoTDBConfig {
/** The interval of compaction task submission from queue in CompactionTaskMananger */
private long compactionSubmissionIntervalInMs = 60_000L;
+ /**
+ * The number of sub compaction threads to be set up to perform compaction. Currently only works
+ * for nonAligned data in cross space compaction and unseq inner space compaction.
+ */
+ private int subCompactionTaskNum = 4;
+
/** whether to cache meta data(ChunkMetaData and TsFileMetaData) or not. */
private boolean metaDataCacheEnable = true;
@@ -2552,6 +2558,14 @@ public class IoTDBConfig {
compactionSubmissionIntervalInMs = interval;
}
+ public int getSubCompactionTaskNum() {
+ return subCompactionTaskNum;
+ }
+
+ public void setSubCompactionTaskNum(int subCompactionTaskNum) {
+ this.subCompactionTaskNum = subCompactionTaskNum;
+ }
+
public String getDeviceIDTransformationMethod() {
return deviceIDTransformationMethod;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 6a88bd8333..ec955aa40a 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -380,6 +380,13 @@ public class IoTDBDescriptor {
properties.getProperty(
"compaction_priority", conf.getCompactionPriority().toString())));
+ int subtaskNum =
+ Integer.parseInt(
+ properties.getProperty(
+ "sub_compaction_thread_num", Integer.toString(conf.getSubCompactionTaskNum())));
+ subtaskNum = subtaskNum <= 0 ? 1 : subtaskNum;
+ conf.setSubCompactionTaskNum(subtaskNum);
+
conf.setQueryTimeoutThreshold(
Integer.parseInt(
properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index 7d7c9ecb4b..bf54c18b91 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -39,6 +39,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
@@ -55,6 +56,10 @@ public class CompactionTaskManager implements IService {
// The thread pool that executes the compaction task. The default number of threads for this pool
// is 10.
private WrappedScheduledExecutorService taskExecutionPool;
+
+ // The thread pool that executes the sub compaction task.
+ private ScheduledExecutorService subCompactionTaskExecutionPool;
+
public static volatile AtomicInteger currentTaskNum = new AtomicInteger(0);
private FixedPriorityBlockingQueue<AbstractCompactionTask> candidateCompactionTaskQueue =
new FixedPriorityBlockingQueue<>(1024, new CompactionTaskComparator());
@@ -86,6 +91,11 @@ public class CompactionTaskManager implements IService {
IoTDBThreadPoolFactory.newScheduledThreadPool(
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread(),
ThreadName.COMPACTION_SERVICE.getName());
+ this.subCompactionTaskExecutionPool =
+ IoTDBThreadPoolFactory.newScheduledThreadPool(
+ IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()
+ * IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum(),
+ ThreadName.COMPACTION_SUB_SERVICE.getName());
currentTaskNum = new AtomicInteger(0);
compactionTaskSubmissionThreadPool =
IoTDBThreadPoolFactory.newScheduledThreadPool(1, ThreadName.COMPACTION_SERVICE.getName());
@@ -310,6 +320,14 @@ public class CompactionTaskManager implements IService {
: "taskExecutionPool is terminated");
}
+ public synchronized Future<Void> submitSubTask(Callable<Void> subCompactionTask) {
+ if (subCompactionTaskExecutionPool != null && !subCompactionTaskExecutionPool.isTerminated()) {
+ Future<Void> future = subCompactionTaskExecutionPool.submit(subCompactionTask);
+ return future;
+ }
+ return null;
+ }
+
/**
* Abort all compactions of a storage group. The running compaction tasks will be returned as a
* list, the compaction threads for the storage group are not terminated util all the tasks in the
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
index 6e9cf68291..a4328449ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine.compaction;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.cross.rewrite.task.SubCompactionTask;
import org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
import org.apache.iotdb.db.engine.compaction.writer.CrossSpaceCompactionWriter;
@@ -44,12 +45,14 @@ import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,22 +60,26 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.stream.Collectors;
/**
* This tool can be used to perform inner space or cross space compaction of aligned and non aligned
* timeseries . Currently, we use {@link
* org.apache.iotdb.db.engine.compaction.inner.utils.InnerSpaceCompactionUtils} to speed up if it is
- * an inner space compaction.
+ * an seq inner space compaction.
*/
public class CompactionUtils {
private static final Logger logger =
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+ private static final int subTaskNum =
+ IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum();
public static void compact(
List<TsFileResource> seqFileResources,
@@ -108,6 +115,7 @@ public class CompactionUtils {
}
compactionWriter.endFile();
+ updateDeviceStartTimeAndEndTime(targetFileResources, compactionWriter);
updatePlanIndexes(targetFileResources, seqFileResources, unseqFileResources);
} finally {
QueryResourceManager.getInstance().endQuery(queryId);
@@ -157,9 +165,9 @@ public class CompactionUtils {
if (dataBatchReader.hasNextBatch()) {
// chunkgroup is serialized only when at least one timeseries under this device has data
compactionWriter.startChunkGroup(device, true);
- compactionWriter.startMeasurement(measurementSchemas);
- writeWithReader(compactionWriter, dataBatchReader);
- compactionWriter.endMeasurement();
+ compactionWriter.startMeasurement(measurementSchemas, 0);
+ writeWithReader(compactionWriter, dataBatchReader, 0);
+ compactionWriter.endMeasurement(0);
compactionWriter.endChunkGroup();
}
}
@@ -170,59 +178,58 @@ public class CompactionUtils {
AbstractCompactionWriter compactionWriter,
QueryContext queryContext,
QueryDataSource queryDataSource)
- throws MetadataException, IOException {
- boolean hasStartChunkGroup = false;
+ throws IOException, InterruptedException {
MultiTsFileDeviceIterator.MeasurementIterator measurementIterator =
deviceIterator.iterateNotAlignedSeries(device, false);
Set<String> allMeasurements = measurementIterator.getAllMeasurements();
+ int subTaskNums = Math.min(allMeasurements.size(), subTaskNum);
+
+ // assign all measurements to different sub tasks
+ Set<String>[] measurementsForEachSubTask = new HashSet[subTaskNums];
+ int idx = 0;
for (String measurement : allMeasurements) {
- List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
- try {
- if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) {
- measurementSchemas.add(IDTableManager.getInstance().getSeriesSchema(device, measurement));
- } else {
- measurementSchemas.add(
- IoTDB.schemaProcessor.getSeriesSchema(new PartialPath(device, measurement)));
- }
- } catch (PathNotExistException e) {
- logger.info("A deleted path is skipped: {}", e.getMessage());
- continue;
+ if (measurementsForEachSubTask[idx % subTaskNums] == null) {
+ measurementsForEachSubTask[idx % subTaskNums] = new HashSet<String>();
}
+ measurementsForEachSubTask[idx++ % subTaskNums].add(measurement);
+ }
- IBatchReader dataBatchReader =
- constructReader(
- device,
- Collections.singletonList(measurement),
- measurementSchemas,
- allMeasurements,
- queryContext,
- queryDataSource,
- false);
+ // construct sub tasks and start compacting measurements in parallel
+ List<Future<Void>> futures = new ArrayList<>();
+ compactionWriter.startChunkGroup(device, false);
+ for (int i = 0; i < subTaskNums; i++) {
+ futures.add(
+ CompactionTaskManager.getInstance()
+ .submitSubTask(
+ new SubCompactionTask(
+ device,
+ measurementsForEachSubTask[i],
+ queryContext,
+ queryDataSource,
+ compactionWriter,
+ i)));
+ }
- if (dataBatchReader.hasNextBatch()) {
- if (!hasStartChunkGroup) {
- // chunkgroup is serialized only when at least one timeseries under this device has
- // data
- compactionWriter.startChunkGroup(device, false);
- hasStartChunkGroup = true;
- }
- compactionWriter.startMeasurement(measurementSchemas);
- writeWithReader(compactionWriter, dataBatchReader);
- compactionWriter.endMeasurement();
+ // wait for all sub tasks finish
+ for (int i = 0; i < subTaskNums; i++) {
+ try {
+ futures.get(i).get();
+ } catch (InterruptedException | ExecutionException e) {
+ logger.error("SubCompactionTask meet errors ", e);
+ Thread.interrupted();
+ throw new InterruptedException();
}
}
- if (hasStartChunkGroup) {
- compactionWriter.endChunkGroup();
- }
+ compactionWriter.endChunkGroup();
}
- private static void writeWithReader(AbstractCompactionWriter writer, IBatchReader reader)
- throws IOException {
+ public static void writeWithReader(
+ AbstractCompactionWriter writer, IBatchReader reader, int subTaskId) throws IOException {
while (reader.hasNextBatch()) {
BatchData batchData = reader.nextBatch();
while (batchData.hasCurrent()) {
- writer.write(batchData.currentTime(), batchData.currentValue());
+ writer.write(batchData.currentTime(), batchData.currentValue(), subTaskId);
batchData.next();
}
}
@@ -232,7 +239,7 @@ public class CompactionUtils {
* @param measurementIds if device is aligned, then measurementIds contain all measurements. If
* device is not aligned, then measurementIds only contain one measurement.
*/
- private static IBatchReader constructReader(
+ public static IBatchReader constructReader(
String deviceId,
List<String> measurementIds,
List<IMeasurementSchema> measurementSchemas,
@@ -268,6 +275,29 @@ public class CompactionUtils {
}
}
+ private static void updateDeviceStartTimeAndEndTime(
+ List<TsFileResource> targetResources, AbstractCompactionWriter compactionWriter) {
+ List<TsFileIOWriter> targetFileWriters = compactionWriter.getFileIOWriter();
+ for (int i = 0; i < targetFileWriters.size(); i++) {
+ TsFileIOWriter fileIOWriter = targetFileWriters.get(i);
+ TsFileResource fileResource = targetResources.get(i);
+ // The tmp target file may does not have any data points written due to the existence of the
+ // mods file, and it will be deleted after compaction. So skip the target file that has been
+ // deleted.
+ if (!fileResource.getTsFile().exists()) {
+ continue;
+ }
+ for (Map.Entry<String, List<TimeseriesMetadata>> entry :
+ fileIOWriter.getDeviceTimeseriesMetadataMap().entrySet()) {
+ String device = entry.getKey();
+ for (TimeseriesMetadata timeseriesMetadata : entry.getValue()) {
+ fileResource.updateStartTime(device, timeseriesMetadata.getStatistics().getStartTime());
+ fileResource.updateEndTime(device, timeseriesMetadata.getStatistics().getEndTime());
+ }
+ }
+ }
+ }
+
private static void updatePlanIndexes(
List<TsFileResource> targetResources,
List<TsFileResource> seqResources,
@@ -280,7 +310,7 @@ public class CompactionUtils {
// in the new file
for (int i = 0; i < targetResources.size(); i++) {
TsFileResource targetResource = targetResources.get(i);
- // remove the target file been deleted from list
+ // remove the target file that has been deleted from list
if (!targetResource.getTsFile().exists()) {
targetResources.remove(i--);
continue;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
new file mode 100644
index 0000000000..315122e3cf
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.cross.rewrite.task;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionUtils;
+import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.metadata.idtable.IDTableManager;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+/**
+ * This class is used to implement reading the measurements and writing to the target files in
+ * parallel in the compaction. Currently, it only works for nonAligned data in cross space
+ * compaction and unseq inner space compaction.
+ */
+public class SubCompactionTask implements Callable<Void> {
+ private static final Logger logger =
+ LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+ private final String device;
+ private final Set<String> measurementList;
+ private final QueryContext queryContext;
+ private final QueryDataSource queryDataSource;
+ private final AbstractCompactionWriter compactionWriter;
+ private final int taskId;
+
+ public SubCompactionTask(
+ String device,
+ Set<String> measurementList,
+ QueryContext queryContext,
+ QueryDataSource queryDataSource,
+ AbstractCompactionWriter compactionWriter,
+ int taskId) {
+ this.device = device;
+ this.measurementList = measurementList;
+ this.queryContext = queryContext;
+ this.queryDataSource = queryDataSource;
+ this.compactionWriter = compactionWriter;
+ this.taskId = taskId;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ for (String measurement : measurementList) {
+ List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
+ try {
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) {
+ measurementSchemas.add(IDTableManager.getInstance().getSeriesSchema(device, measurement));
+ } else {
+ measurementSchemas.add(
+ IoTDB.schemaProcessor.getSeriesSchema(new PartialPath(device, measurement)));
+ }
+ } catch (PathNotExistException e) {
+ logger.info("A deleted path is skipped: {}", e.getMessage());
+ continue;
+ }
+
+ IBatchReader dataBatchReader =
+ CompactionUtils.constructReader(
+ device,
+ Collections.singletonList(measurement),
+ measurementSchemas,
+ measurementList,
+ queryContext,
+ queryDataSource,
+ false);
+
+ if (dataBatchReader.hasNextBatch()) {
+ compactionWriter.startMeasurement(measurementSchemas, taskId);
+ CompactionUtils.writeWithReader(compactionWriter, dataBatchReader, taskId);
+ compactionWriter.endMeasurement(taskId);
+ }
+ }
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
index 053631a516..5c1460230d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.engine.compaction.CompactionMetricsManager;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.constant.CompactionType;
import org.apache.iotdb.db.engine.compaction.constant.ProcessChunkType;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -38,8 +37,12 @@ import java.io.IOException;
import java.util.List;
public abstract class AbstractCompactionWriter implements AutoCloseable {
+ protected static final int subTaskNum =
+ IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum();
- protected IChunkWriter chunkWriter;
+ // Each sub task has its own chunk writer.
+ // The index of the array corresponds to subTaskId.
+ protected IChunkWriter[] chunkWriters = new IChunkWriter[subTaskNum];
protected boolean isAlign;
@@ -49,25 +52,26 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
private final boolean enableMetrics =
MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric();
- // point count in current measurment, which is used to check size
- private int measurementPointCount;
+ // Each sub task has point count in current measurment, which is used to check size.
+ // The index of the array corresponds to subTaskId.
+ protected int[] measurementPointCountArray = new int[subTaskNum];
public abstract void startChunkGroup(String deviceId, boolean isAlign) throws IOException;
public abstract void endChunkGroup() throws IOException;
- public void startMeasurement(List<IMeasurementSchema> measurementSchemaList) {
- measurementPointCount = 0;
+ public void startMeasurement(List<IMeasurementSchema> measurementSchemaList, int subTaskId) {
+ measurementPointCountArray[subTaskId] = 0;
if (isAlign) {
- chunkWriter = new AlignedChunkWriterImpl(measurementSchemaList);
+ chunkWriters[subTaskId] = new AlignedChunkWriterImpl(measurementSchemaList);
} else {
- chunkWriter = new ChunkWriterImpl(measurementSchemaList.get(0), true);
+ chunkWriters[subTaskId] = new ChunkWriterImpl(measurementSchemaList.get(0), true);
}
}
- public abstract void endMeasurement() throws IOException;
+ public abstract void endMeasurement(int subTaskId) throws IOException;
- public abstract void write(long timestamp, Object value) throws IOException;
+ public abstract void write(long timestamp, Object value, int subTaskId) throws IOException;
public abstract void write(long[] timestamps, Object values);
@@ -75,9 +79,9 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
public abstract void close() throws IOException;
- protected void writeDataPoint(Long timestamp, Object value) {
+ protected void writeDataPoint(Long timestamp, Object value, int subTaskId) {
if (!isAlign) {
- ChunkWriterImpl chunkWriter = (ChunkWriterImpl) this.chunkWriter;
+ ChunkWriterImpl chunkWriter = (ChunkWriterImpl) this.chunkWriters[subTaskId];
switch (chunkWriter.getDataType()) {
case TEXT:
chunkWriter.write(timestamp, (Binary) value);
@@ -101,7 +105,7 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType());
}
} else {
- AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) this.chunkWriter;
+ AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) this.chunkWriters[subTaskId];
for (TsPrimitiveType val : (TsPrimitiveType[]) value) {
if (val == null) {
chunkWriter.write(timestamp, null, true);
@@ -133,28 +137,37 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
}
chunkWriter.write(timestamp);
}
- measurementPointCount++;
+ measurementPointCountArray[subTaskId] += 1;
}
- protected void checkChunkSizeAndMayOpenANewChunk(TsFileIOWriter fileWriter) throws IOException {
- if (measurementPointCount % 10 == 0 && checkChunkSize()) {
- writeRateLimit(chunkWriter.estimateMaxSeriesMemSize());
+ protected void flushChunkToFileWriter(TsFileIOWriter targetWriter, int subTaskId)
+ throws IOException {
+ writeRateLimit(chunkWriters[subTaskId].estimateMaxSeriesMemSize());
+ synchronized (targetWriter) {
+ chunkWriters[subTaskId].writeToFileWriter(targetWriter);
+ }
+ }
+
+ protected void checkChunkSizeAndMayOpenANewChunk(TsFileIOWriter fileWriter, int subTaskId)
+ throws IOException {
+ if (measurementPointCountArray[subTaskId] % 10 == 0 && checkChunkSize(subTaskId)) {
+ flushChunkToFileWriter(fileWriter, subTaskId);
CompactionMetricsManager.recordWriteInfo(
this instanceof CrossSpaceCompactionWriter
? CompactionType.CROSS_COMPACTION
: CompactionType.INNER_UNSEQ_COMPACTION,
ProcessChunkType.DESERIALIZE_CHUNK,
this.isAlign,
- chunkWriter.estimateMaxSeriesMemSize());
- chunkWriter.writeToFileWriter(fileWriter);
+ chunkWriters[subTaskId].estimateMaxSeriesMemSize());
}
}
- private boolean checkChunkSize() {
- if (chunkWriter instanceof AlignedChunkWriterImpl) {
- return ((AlignedChunkWriterImpl) chunkWriter).checkIsChunkSizeOverThreshold(targetChunkSize);
+ protected boolean checkChunkSize(int subTaskId) {
+ if (chunkWriters[subTaskId] instanceof AlignedChunkWriterImpl) {
+ return ((AlignedChunkWriterImpl) chunkWriters[subTaskId])
+ .checkIsChunkSizeOverThreshold(targetChunkSize);
} else {
- return chunkWriter.estimateMaxSeriesMemSize() > targetChunkSize;
+ return chunkWriters[subTaskId].estimateMaxSeriesMemSize() > targetChunkSize;
}
}
@@ -163,8 +176,5 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
CompactionTaskManager.getInstance().getMergeWriteRateLimiter(), bytesLength);
}
- protected void updateDeviceStartAndEndTime(TsFileResource targetResource, long timestamp) {
- targetResource.updateStartTime(deviceId, timestamp);
- targetResource.updateEndTime(deviceId, timestamp);
- }
+ public abstract List<TsFileIOWriter> getFileIOWriter();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
index 2ebe22581d..3e245cfc35 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.engine.compaction.writer;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
-import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import java.io.IOException;
@@ -36,68 +35,73 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
// source tsfiles
private List<TsFileResource> seqTsFileResources;
- private int seqFileIndex;
+ // Each sub task has its corresponding seq file index.
+ // The index of the array corresponds to subTaskId.
+ private int[] seqFileIndexArray = new int[subTaskNum];
+ // device end time in each source seq file
private final long[] currentDeviceEndTime;
+ // whether each target file is empty or not
private final boolean[] isEmptyFile;
- private final boolean[] hasTargetFileStartChunkGroup;
+ // whether each target file has device data or not
+ private final boolean[] isDeviceExistedInTargetFiles;
- private final List<TsFileResource> targetTsFileResources;
+ // current chunk group header size
+ private int chunkGroupHeaderSize;
public CrossSpaceCompactionWriter(
List<TsFileResource> targetResources, List<TsFileResource> seqFileResources)
throws IOException {
currentDeviceEndTime = new long[seqFileResources.size()];
isEmptyFile = new boolean[seqFileResources.size()];
- hasTargetFileStartChunkGroup = new boolean[seqFileResources.size()];
+ isDeviceExistedInTargetFiles = new boolean[targetResources.size()];
for (int i = 0; i < targetResources.size(); i++) {
- this.fileWriterList.add(new RestorableTsFileIOWriter(targetResources.get(i).getTsFile()));
+ this.fileWriterList.add(new TsFileIOWriter(targetResources.get(i).getTsFile()));
isEmptyFile[i] = true;
}
this.seqTsFileResources = seqFileResources;
- this.targetTsFileResources = targetResources;
- seqFileIndex = 0;
}
@Override
public void startChunkGroup(String deviceId, boolean isAlign) throws IOException {
this.deviceId = deviceId;
this.isAlign = isAlign;
- this.seqFileIndex = 0;
+ this.seqFileIndexArray = new int[subTaskNum];
checkIsDeviceExistAndGetDeviceEndTime();
- for (int i = 0; i < seqTsFileResources.size(); i++) {
- hasTargetFileStartChunkGroup[i] = false;
+ for (int i = 0; i < fileWriterList.size(); i++) {
+ chunkGroupHeaderSize = fileWriterList.get(i).startChunkGroup(deviceId);
}
}
@Override
public void endChunkGroup() throws IOException {
for (int i = 0; i < seqTsFileResources.size(); i++) {
- if (hasTargetFileStartChunkGroup[i]) {
- fileWriterList.get(i).endChunkGroup();
+ TsFileIOWriter targetFileWriter = fileWriterList.get(i);
+ if (isDeviceExistedInTargetFiles[i]) {
+ targetFileWriter.endChunkGroup();
+ } else {
+ targetFileWriter.truncate(targetFileWriter.getPos() - chunkGroupHeaderSize);
}
+ isDeviceExistedInTargetFiles[i] = false;
}
deviceId = null;
}
@Override
- public void endMeasurement() throws IOException {
- writeRateLimit(chunkWriter.estimateMaxSeriesMemSize());
- chunkWriter.writeToFileWriter(fileWriterList.get(seqFileIndex));
- chunkWriter = null;
- seqFileIndex = 0;
+ public void endMeasurement(int subTaskId) throws IOException {
+ flushChunkToFileWriter(fileWriterList.get(seqFileIndexArray[subTaskId]), subTaskId);
+ seqFileIndexArray[subTaskId] = 0;
}
@Override
- public void write(long timestamp, Object value) throws IOException {
- checkTimeAndMayFlushChunkToCurrentFile(timestamp);
- checkAndMayStartChunkGroup();
- writeDataPoint(timestamp, value);
- updateDeviceStartAndEndTime(targetTsFileResources.get(seqFileIndex), timestamp);
- checkChunkSizeAndMayOpenANewChunk(fileWriterList.get(seqFileIndex));
- isEmptyFile[seqFileIndex] = false;
+ public void write(long timestamp, Object value, int subTaskId) throws IOException {
+ checkTimeAndMayFlushChunkToCurrentFile(timestamp, subTaskId);
+ writeDataPoint(timestamp, value, subTaskId);
+ checkChunkSizeAndMayOpenANewChunk(fileWriterList.get(seqFileIndexArray[subTaskId]), subTaskId);
+ isDeviceExistedInTargetFiles[seqFileIndexArray[subTaskId]] = true;
+ isEmptyFile[seqFileIndexArray[subTaskId]] = false;
}
@Override
@@ -123,16 +127,21 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
}
fileWriterList = null;
seqTsFileResources = null;
- chunkWriter = null;
}
- private void checkTimeAndMayFlushChunkToCurrentFile(long timestamp) throws IOException {
+ @Override
+ public List<TsFileIOWriter> getFileIOWriter() {
+ return fileWriterList;
+ }
+
+ private void checkTimeAndMayFlushChunkToCurrentFile(long timestamp, int subTaskId)
+ throws IOException {
+ int fileIndex = seqFileIndexArray[subTaskId];
// if timestamp is later than the current source seq tsfile, than flush chunk writer
- while (timestamp > currentDeviceEndTime[seqFileIndex]) {
- if (seqFileIndex != seqTsFileResources.size() - 1) {
- writeRateLimit(chunkWriter.estimateMaxSeriesMemSize());
- chunkWriter.writeToFileWriter(fileWriterList.get(seqFileIndex));
- seqFileIndex++;
+ while (timestamp > currentDeviceEndTime[fileIndex]) {
+ if (fileIndex != seqTsFileResources.size() - 1) {
+ flushChunkToFileWriter(fileWriterList.get(fileIndex), subTaskId);
+ seqFileIndexArray[subTaskId] = ++fileIndex;
} else {
// If the seq file is deleted for various reasons, the following two situations may occur
// when selecting the source files: (1) unseq files may have some devices or measurements
@@ -168,11 +177,4 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
fileIndex++;
}
}
-
- private void checkAndMayStartChunkGroup() throws IOException {
- if (!hasTargetFileStartChunkGroup[seqFileIndex]) {
- fileWriterList.get(seqFileIndex).startChunkGroup(deviceId);
- hasTargetFileStartChunkGroup[seqFileIndex] = true;
- }
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
index 7b0e31095d..af2cc53c67 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
@@ -19,22 +19,20 @@
package org.apache.iotdb.db.engine.compaction.writer;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
public class InnerSpaceCompactionWriter extends AbstractCompactionWriter {
private TsFileIOWriter fileWriter;
private boolean isEmptyFile;
- private final TsFileResource targetTsFileResource;
-
public InnerSpaceCompactionWriter(TsFileResource targetFileResource) throws IOException {
- fileWriter = new RestorableTsFileIOWriter(targetFileResource.getTsFile());
+ this.fileWriter = new TsFileIOWriter(targetFileResource.getTsFile());
isEmptyFile = true;
- this.targetTsFileResource = targetFileResource;
}
@Override
@@ -50,17 +48,14 @@ public class InnerSpaceCompactionWriter extends AbstractCompactionWriter {
}
@Override
- public void endMeasurement() throws IOException {
- writeRateLimit(chunkWriter.estimateMaxSeriesMemSize());
- chunkWriter.writeToFileWriter(fileWriter);
- chunkWriter = null;
+ public void endMeasurement(int subTaskId) throws IOException {
+ flushChunkToFileWriter(fileWriter, subTaskId);
}
@Override
- public void write(long timestamp, Object value) throws IOException {
- writeDataPoint(timestamp, value);
- updateDeviceStartAndEndTime(targetTsFileResource, timestamp);
- checkChunkSizeAndMayOpenANewChunk(fileWriter);
+ public void write(long timestamp, Object value, int subTaskId) throws IOException {
+ writeDataPoint(timestamp, value, subTaskId);
+ checkChunkSizeAndMayOpenANewChunk(fileWriter, subTaskId);
isEmptyFile = false;
}
@@ -80,7 +75,11 @@ public class InnerSpaceCompactionWriter extends AbstractCompactionWriter {
if (fileWriter != null && fileWriter.canWrite()) {
fileWriter.close();
}
- chunkWriter = null;
fileWriter = null;
}
+
+ @Override
+ public List<TsFileIOWriter> getFileIOWriter() {
+ return Collections.singletonList(fileWriter);
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java
index a6aab11e1c..69ff88dee0 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java
@@ -30,9 +30,14 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -202,6 +207,20 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true);
CompactionUtils.compact(seqResources, unseqResources, targetResources);
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
+ assertEquals(
+ 0, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ assertEquals(
+ 0, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ assertEquals(
+ 250, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ assertEquals(
+ 600, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ assertEquals(
+ 600, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4"));
+ for (int i = 0; i < 5; i++) {
+ assertEquals(
+ 749, targetResources.get(0).getEndTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i));
+ }
for (int i = 0; i < 5; i++) {
for (int j = 0; j < 5; j++) {
@@ -1752,7 +1771,7 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
}
/**
- * Total 5 seq files and 5 unseq files, each file has different nonAligned timeseries.
+ * Total 4 seq files and 5 unseq files, each file has different nonAligned timeseries.
*
* <p>Seq files<br>
* first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range
@@ -1836,6 +1855,34 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
CompactionUtils.compact(seqResources, unseqResources, targetResources);
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+ List<String> deviceIdList = new ArrayList<>();
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
+ for (int i = 0; i < 2; i++) {
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ check(targetResources.get(i), deviceIdList);
+ }
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+ for (int i = 2; i < 4; i++) {
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ check(targetResources.get(i), deviceIdList);
+ }
+
Map<String, Long> measurementMaxTime = new HashMap<>();
for (int i = 0; i < 4; i++) {
@@ -1905,7 +1952,7 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
}
/**
- * Total 5 seq files and 5 unseq files, each file has different nonAligned timeseries.
+ * Total 4 seq files and 5 unseq files, each file has different nonAligned timeseries.
*
* <p>Seq files<br>
* first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range
@@ -2002,6 +2049,34 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
CompactionUtils.compact(seqResources, unseqResources, targetResources);
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+ List<String> deviceIdList = new ArrayList<>();
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
+ for (int i = 0; i < 2; i++) {
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ check(targetResources.get(i), deviceIdList);
+ }
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+ for (int i = 2; i < 4; i++) {
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ check(targetResources.get(i), deviceIdList);
+ }
+
Map<String, Long> measurementMaxTime = new HashMap<>();
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 5; j++) {
@@ -2072,7 +2147,7 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
}
/**
- * Total 5 seq files and 5 unseq files, each file has different nonAligned timeseries.
+ * Total 4 seq files and 5 unseq files, each file has different nonAligned timeseries.
*
* <p>Seq files<br>
* first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range
@@ -2165,6 +2240,32 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
CompactionUtils.compact(seqResources, unseqResources, targetResources);
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+ List<String> deviceIdList = new ArrayList<>();
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
+ for (int i = 0; i < 2; i++) {
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ check(targetResources.get(i), deviceIdList);
+ }
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+ for (int i = 2; i < 4; i++) {
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ check(targetResources.get(i), deviceIdList);
+ }
+
Map<String, Long> measurementMaxTime = new HashMap<>();
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 5; j++) {
@@ -2231,7 +2332,7 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
}
/**
- * Total 5 seq files and 5 unseq files, each file has different nonAligned timeseries.
+ * Total 4 seq files and 5 unseq files, each file has different nonAligned timeseries.
*
* <p>Seq files<br>
* first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range
@@ -2245,8 +2346,8 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
* forth and fifth file has d0 and s0 ~ s4, time range is 450 ~ 549 and 550 ~ 649, value range is
* 20450 ~ 20549 and 20550 ~ 20649.
*
- * <p>The data of d0, d1 and d2 is deleted in each file. Data in the first target file is all
- * deleted.
+ * <p>The data of d0, d1 and d2 is deleted in each file. Data in the first and second target file
+ * is all deleted.
*/
@Test
public void testCrossSpaceCompactionWithAllDataDeletedInOneTargetFile()
@@ -2320,6 +2421,21 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
CompactionUtils.compact(seqResources, unseqResources, targetResources);
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+ Assert.assertEquals(2, targetResources.size());
+ List<String> deviceIdList = new ArrayList<>();
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+ for (int i = 0; i < 2; i++) {
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ check(targetResources.get(i), deviceIdList);
+ }
+
Map<String, Long> measurementMaxTime = new HashMap<>();
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 5; j++) {
@@ -2380,7 +2496,7 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
}
/**
- * Total 5 seq files and 5 unseq files, each file has different nonAligned timeseries.
+ * Total 4 seq files and 5 unseq files, each file has different nonAligned timeseries.
*
* <p>Seq files<br>
* first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range
@@ -2477,6 +2593,48 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
CompactionUtils.compact(seqResources, unseqResources, targetResources);
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+ Assert.assertEquals(4, targetResources.size());
+ List<String> deviceIdList = new ArrayList<>();
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
+ for (int i = 0; i < 2; i++) {
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ check(targetResources.get(i), deviceIdList);
+ }
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+ for (int i = 2; i < 3; i++) {
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ check(targetResources.get(i), deviceIdList);
+ }
+ deviceIdList.clear();
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+ for (int i = 3; i < 4; i++) {
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ check(targetResources.get(i), deviceIdList);
+ }
+
Map<String, Long> measurementMaxTime = new HashMap<>();
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 5; j++) {
@@ -2933,6 +3091,35 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
CompactionUtils.compact(seqResources, unseqResources, targetResources);
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+ Assert.assertEquals(4, targetResources.size());
+ List<String> deviceIdList = new ArrayList<>();
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10000");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10001");
+ for (int i = 0; i < 2; i++) {
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10000"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10001"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10002"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10003"));
+ check(targetResources.get(i), deviceIdList);
+ }
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10002");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10003");
+ for (int i = 2; i < 4; i++) {
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10000"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10001"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10002"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10003"));
+ check(targetResources.get(i), deviceIdList);
+ }
+
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 4;
i++) {
@@ -3433,6 +3620,54 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
CompactionUtils.compact(seqResources, unseqResources, targetResources);
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+
+ Assert.assertEquals(4, targetResources.size());
+ List<String> deviceIdList = new ArrayList<>();
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
+ for (int i = 0; i < 2; i++) {
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ check(targetResources.get(i), deviceIdList);
+ }
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+ for (int i = 2; i < 3; i++) {
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ check(targetResources.get(i), deviceIdList);
+ }
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d5");
+ for (int i = 3; i < 4; i++) {
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d5"));
+ check(targetResources.get(i), deviceIdList);
+ }
} catch (MetadataException
| IOException
| WriteProcessException
@@ -3456,6 +3691,35 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
CompactionUtils.compact(seqResources, unseqResources, targetResources);
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+ Assert.assertEquals(2, targetResources.size());
+ List<String> deviceIdList = new ArrayList<>();
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
+ for (int i = 0; i < 1; i++) {
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ check(targetResources.get(i), deviceIdList);
+ }
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+ for (int i = 1; i < 2; i++) {
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ check(targetResources.get(i), deviceIdList);
+ }
+
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 4; j++) {
PartialPath path =
@@ -3515,4 +3779,47 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
CompactionFileGeneratorUtils.generateMods(deleteMap, resource, false);
}
}
+
+ /**
+ * Check whether target file contain empty chunk group or not. Assert fail if it contains empty
+ * chunk group whose deviceID is not in the deviceIdList.
+ */
+ public void check(TsFileResource targetResource, List<String> deviceIdList) throws IOException {
+ byte marker;
+ try (TsFileSequenceReader reader =
+ new TsFileSequenceReader(targetResource.getTsFile().getAbsolutePath())) {
+ reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1);
+ while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
+ switch (marker) {
+ case MetaMarker.CHUNK_HEADER:
+ case MetaMarker.TIME_CHUNK_HEADER:
+ case MetaMarker.VALUE_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
+ ChunkHeader header = reader.readChunkHeader(marker);
+ int dataSize = header.getDataSize();
+ reader.position(reader.position() + dataSize);
+ break;
+ case MetaMarker.CHUNK_GROUP_HEADER:
+ ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
+ String deviceID = chunkGroupHeader.getDeviceID();
+ if (!deviceIdList.contains(deviceID)) {
+ Assert.fail(
+ "Target file "
+ + targetResource.getTsFile().getPath()
+ + " contains empty chunk group "
+ + deviceID);
+ }
+ break;
+ case MetaMarker.OPERATION_INDEX_RANGE:
+ reader.readPlanIndex();
+ break;
+ default:
+ // the disk file is corrupted, using this file may be dangerous
+ throw new IOException("Unexpected marker " + marker);
+ }
+ }
+ }
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
index a413419f32..5e0824911e 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.compaction.cross;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.cross.rewrite.manage.CrossSpaceCompactionResource;
import org.apache.iotdb.db.engine.compaction.cross.rewrite.selector.ICrossSpaceMergeFileSelector;
import org.apache.iotdb.db.engine.compaction.cross.rewrite.selector.RewriteCompactionFileSelector;
@@ -108,6 +109,7 @@ public class CrossSpaceCompactionTest {
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
}
+ CompactionTaskManager.getInstance().start();
Thread.currentThread().setName("pool-1-IoTDB-Compaction-1");
}
@@ -118,6 +120,7 @@ public class CrossSpaceCompactionTest {
ChunkCache.getInstance().clear();
TimeSeriesMetadataCache.getInstance().clear();
IoTDB.configManager.clear();
+ CompactionTaskManager.getInstance().stop();
EnvironmentUtils.cleanAllDir();
Thread.currentThread().setName(oldThreadName);
new CompactionConfigRestorer().restoreCompactionConfig();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 7eb9a95110..2f865f297f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -142,14 +142,14 @@ public class TsFileIOWriter implements AutoCloseable {
out.write(VERSION_NUMBER_BYTE);
}
- public void startChunkGroup(String deviceId) throws IOException {
+ public int startChunkGroup(String deviceId) throws IOException {
this.currentChunkGroupDeviceId = deviceId;
if (logger.isDebugEnabled()) {
logger.debug("start chunk group:{}, file position {}", deviceId, out.getPosition());
}
chunkMetadataList = new ArrayList<>();
ChunkGroupHeader chunkGroupHeader = new ChunkGroupHeader(currentChunkGroupDeviceId);
- chunkGroupHeader.serializeTo(out.wrapAsStream());
+ return chunkGroupHeader.serializeTo(out.wrapAsStream());
}
/**
@@ -458,6 +458,10 @@ public class TsFileIOWriter implements AutoCloseable {
out.flush();
}
+ public void truncate(long offset) throws IOException {
+ out.truncate(offset);
+ }
+
/**
* this function is only for Test.
*