You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/10/04 09:45:32 UTC
[iotdb] 01/01: control metadata size in TsFileIOWriter
This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch IOTDB-4517
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b873fed67b30b0c4b1e9185b9428fb962e34e32c
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Tue Oct 4 17:45:11 2022 +0800
control metadata size in TsFileIOWriter
---
.../resources/conf/iotdb-engine.properties | 8 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 21 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 10 +
.../db/engine/compaction/CompactionUtils.java | 79 +-
.../cross/rewrite/task/SubCompactionTask.java | 5 +-
.../utils/AlignedSeriesCompactionExecutor.java | 1 +
.../inner/utils/InnerSpaceCompactionUtils.java | 12 +-
.../utils/SingleSeriesCompactionExecutor.java | 1 +
.../writer/AbstractCompactionWriter.java | 7 +
.../writer/CrossSpaceCompactionWriter.java | 47 +
.../writer/InnerSpaceCompactionWriter.java | 34 +-
.../iotdb/db/engine/flush/MemTableFlushTask.java | 16 +-
.../db/engine/storagegroup/TsFileProcessor.java | 9 +-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 2 +-
.../org/apache/iotdb/db/utils/FileLoaderUtils.java | 2 +-
.../writelog/recover/TsFileRecoverPerformer.java | 12 +-
.../iotdb/tsfile/file/metadata/ChunkMetadata.java | 8 +
.../file/metadata/MetadataIndexConstructor.java | 39 +-
.../tsfile/file/metadata/MetadataIndexNode.java | 2 +-
.../iotdb/tsfile/file/metadata/TsFileMetadata.java | 13 +
.../tsfile/file/metadata/enums/TSDataType.java | 2 +-
.../iotdb/tsfile/read/TsFileSequenceReader.java | 5 +-
.../tsfile/v2/read/TsFileSequenceReaderForV2.java | 20 +-
.../write/writer/RestorableTsFileIOWriter.java | 12 +
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 359 ++++--
.../write/writer/tsmiterator/DiskTSMIterator.java | 132 ++
.../write/writer/tsmiterator/TSMIterator.java | 147 +++
.../tsfile/write/MetadataIndexConstructorTest.java | 2 +-
.../iotdb/tsfile/write/TsFileIOWriterTest.java | 2 +-
.../tsfile/write/TsFileIntegrityCheckingTool.java | 251 ++++
.../writer/TsFileIOWriterMemoryControlTest.java | 1303 ++++++++++++++++++++
31 files changed, 2411 insertions(+), 152 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index e17a900a25..9f494b582c 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -353,6 +353,10 @@ timestamp_precision=ms
# Datatype: int
# primitive_array_size=32
+# the percentage of write memory for chunk metadata remains in a single file writer when flushing memtable
+# Datatype: double
+# chunk_metadata_size_proportion_in_write=0.1
+
# Ratio of write memory for invoking flush disk, 0.4 by default
# If you have extremely high write load (like batch=1000), it can be set lower than the default value like 0.2
# Datatype: double
@@ -449,6 +453,10 @@ timestamp_precision=ms
# BALANCE: alternate two compaction types
# compaction_priority=BALANCE
+# size proportion for chunk metadata maintains in memory when compacting
+# Datatype: double
+# chunk_metadata_size_proportion_in_compaction=0.05
+
# The target tsfile size in compaction
# Datatype: long, Unit: byte
# target_compaction_file_size=1073741824
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index f4cfc65920..c9151ffcf3 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -128,6 +128,8 @@ public class IoTDBConfig {
/** The proportion of write memory for write process */
private double writeProportion = 0.8;
+ private double chunkMetadataSizeProportionInWrite = 0.1;
+
/** The proportion of write memory for compaction */
private double compactionProportion = 0.2;
@@ -398,6 +400,8 @@ public class IoTDBConfig {
*/
private CompactionPriority compactionPriority = CompactionPriority.BALANCE;
+ private double chunkMetadataSizeProportionInCompaction = 0.05;
+
/** The target tsfile size in compaction, 1 GB by default */
private long targetCompactionFileSize = 1073741824L;
@@ -2773,4 +2777,21 @@ public class IoTDBConfig {
public void setCustomizedProperties(Properties customizedProperties) {
this.customizedProperties = customizedProperties;
}
+
+ public double getChunkMetadataSizeProportionInWrite() {
+ return chunkMetadataSizeProportionInWrite;
+ }
+
+ public void setChunkMetadataSizeProportionInWrite(double chunkMetadataSizeProportionInWrite) {
+ this.chunkMetadataSizeProportionInWrite = chunkMetadataSizeProportionInWrite;
+ }
+
+ public double getChunkMetadataSizeProportionInCompaction() {
+ return chunkMetadataSizeProportionInCompaction;
+ }
+
+ public void setChunkMetadataSizeProportionInCompaction(
+ double chunkMetadataSizeProportionInCompaction) {
+ this.chunkMetadataSizeProportionInCompaction = chunkMetadataSizeProportionInCompaction;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 815113f6b4..01606eb873 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -341,6 +341,11 @@ public class IoTDBDescriptor {
"max_waiting_time_when_insert_blocked",
Integer.toString(conf.getMaxWaitingTimeWhenInsertBlocked()))));
+ conf.setChunkMetadataSizeProportionInCompaction(
+ Double.parseDouble(
+ properties.getProperty(
+ "chunk_metadata_size_proportion_in_compaction",
+ Double.toString(conf.getChunkMetadataSizeProportionInCompaction()))));
conf.setEstimatedSeriesSize(
Integer.parseInt(
properties.getProperty(
@@ -928,6 +933,11 @@ public class IoTDBDescriptor {
.setKerberosPrincipal(
properties.getProperty("kerberos_principal", conf.getKerberosPrincipal()));
TSFileDescriptor.getInstance().getConfig().setBatchSize(conf.getBatchSize());
+ conf.setChunkMetadataSizeProportionInWrite(
+ Double.parseDouble(
+ properties.getProperty(
+ "chunk_metadata_size_proportion_in_write",
+ Double.toString(conf.getChunkMetadataSizeProportionInWrite()))));
// timed flush memtable, timed close tsfile
loadTimedService(properties);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
index 97bcff78ff..e32930864e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
@@ -60,7 +60,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -165,48 +165,51 @@ public class CompactionUtils {
QueryContext queryContext,
QueryDataSource queryDataSource)
throws IOException, InterruptedException {
- Map<String, MeasurementSchema> measurementSchemaMap =
- deviceIterator.getAllSchemasOfCurrentDevice();
- int subTaskNums = Math.min(measurementSchemaMap.size(), subTaskNum);
-
- // assign all measurements to different sub tasks
- Set<String>[] measurementsForEachSubTask = new HashSet[subTaskNums];
- int idx = 0;
- for (String measurement : measurementSchemaMap.keySet()) {
- if (measurementsForEachSubTask[idx % subTaskNums] == null) {
- measurementsForEachSubTask[idx % subTaskNums] = new HashSet<String>();
+ Map<String, MeasurementSchema> schemaMap = deviceIterator.getAllSchemasOfCurrentDevice();
+ List<String> allMeasurements = new ArrayList<>(schemaMap.keySet());
+ allMeasurements.sort((String::compareTo));
+ int subTaskNums = Math.min(allMeasurements.size(), subTaskNum);
+ // construct sub tasks and start compacting measurements in parallel
+ if (subTaskNums > 0) {
+ // assign the measurements for each subtask
+ List<String>[] measurementListArray = new List[subTaskNums];
+ for (int i = 0, size = allMeasurements.size(); i < size; ++i) {
+ int index = i % subTaskNums;
+ if (measurementListArray[index] == null) {
+ measurementListArray[index] = new LinkedList<>();
+ }
+ measurementListArray[index].add(allMeasurements.get(i));
}
- measurementsForEachSubTask[idx++ % subTaskNums].add(measurement);
- }
- // construct sub tasks and start compacting measurements in parallel
- List<Future<Void>> futures = new ArrayList<>();
- compactionWriter.startChunkGroup(device, false);
- for (int i = 0; i < subTaskNums; i++) {
- futures.add(
- CompactionTaskManager.getInstance()
- .submitSubTask(
- new SubCompactionTask(
- device,
- measurementsForEachSubTask[i],
- queryContext,
- queryDataSource,
- compactionWriter,
- measurementSchemaMap,
- i)));
- }
+ // construct sub tasks and start compacting measurements in parallel
+ List<Future<Void>> futures = new ArrayList<>();
+ compactionWriter.startChunkGroup(device, false);
+ for (int i = 0; i < subTaskNums; i++) {
+ futures.add(
+ CompactionTaskManager.getInstance()
+ .submitSubTask(
+ new SubCompactionTask(
+ device,
+ measurementListArray[i],
+ queryContext,
+ queryDataSource,
+ compactionWriter,
+ schemaMap,
+ i)));
+ }
- // wait for all sub tasks finish
- for (int i = 0; i < subTaskNums; i++) {
- try {
- futures.get(i).get();
- } catch (InterruptedException | ExecutionException e) {
- logger.error("SubCompactionTask meet errors ", e);
- Thread.interrupted();
- throw new InterruptedException();
+ // wait for all sub tasks finish
+ for (int i = 0; i < subTaskNums; i++) {
+ try {
+ futures.get(i).get();
+ } catch (InterruptedException | ExecutionException e) {
+ logger.error("SubCompactionTask meet errors ", e);
+ Thread.interrupted();
+ throw new InterruptedException();
+ }
}
}
-
+ compactionWriter.checkAndMayFlushChunkMetadata();
compactionWriter.endChunkGroup();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
index c688deb8bd..f5d5278437 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
@@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.Callable;
/**
@@ -45,7 +44,7 @@ public class SubCompactionTask implements Callable<Void> {
private static final Logger logger =
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
private final String device;
- private final Set<String> measurementList;
+ private final List<String> measurementList;
private final QueryContext queryContext;
private final QueryDataSource queryDataSource;
@@ -57,7 +56,7 @@ public class SubCompactionTask implements Callable<Void> {
public SubCompactionTask(
String device,
- Set<String> measurementList,
+ List<String> measurementList,
QueryContext queryContext,
QueryDataSource queryDataSource,
AbstractCompactionWriter compactionWriter,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
index af5353153f..dbd5b98bec 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
@@ -135,6 +135,7 @@ public class AlignedSeriesCompactionExecutor {
chunkWriter.estimateMaxSeriesMemSize());
chunkWriter.writeToFileWriter(writer);
}
+ writer.checkMetadataSizeAndMayFlush();
}
private void compactOneAlignedChunk(AlignedChunkReader chunkReader) throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
index 06004c9c6e..a6248e9835 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
@@ -64,8 +65,17 @@ public class InnerSpaceCompactionUtils {
public static void compact(TsFileResource targetResource, List<TsFileResource> tsFileResources)
throws IOException, MetadataException, InterruptedException {
+ // size for file writer is 5% of per compaction task memory budget
+ long sizeForFileWriter =
+ (long)
+ (SystemInfo.getInstance().getMemorySizeForCompaction()
+ / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()
+ * IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getChunkMetadataSizeProportionInCompaction());
try (MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(tsFileResources);
- TsFileIOWriter writer = new TsFileIOWriter(targetResource.getTsFile())) {
+ TsFileIOWriter writer =
+ new TsFileIOWriter(targetResource.getTsFile(), true, sizeForFileWriter)) {
while (deviceIterator.hasNextDevice()) {
Pair<String, Boolean> deviceInfo = deviceIterator.nextDevice();
String device = deviceInfo.left;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
index 2d49094f44..d614b3dbe3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
@@ -138,6 +138,7 @@ public class SingleSeriesCompactionExecutor {
}
targetResource.updateStartTime(device, minStartTimestamp);
targetResource.updateEndTime(device, maxEndTimestamp);
+ fileWriter.checkMetadataSizeAndMayFlush();
}
private void constructChunkWriterFromReadChunk(Chunk chunk) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
index 5c1460230d..72096069e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
@@ -177,4 +177,11 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
}
public abstract List<TsFileIOWriter> getFileIOWriter();
+
+ public void checkAndMayFlushChunkMetadata() throws IOException {
+ List<TsFileIOWriter> writers = this.getFileIOWriter();
+ for (TsFileIOWriter writer : writers) {
+ writer.checkMetadataSizeAndMayFlush();
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
index 3e245cfc35..3a413d4cf4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
@@ -18,8 +18,10 @@
*/
package org.apache.iotdb.db.engine.compaction.writer;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
@@ -27,6 +29,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
// target fileIOWriters
@@ -34,6 +38,7 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
// source tsfiles
private List<TsFileResource> seqTsFileResources;
+ private List<TsFileResource> targetTsFileResources;
// Each sub task has its corresponding seq file index.
// The index of the array corresponds to subTaskId.
@@ -51,17 +56,46 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
// current chunk group header size
private int chunkGroupHeaderSize;
+ private AtomicLong[] startTimeForCurDeviceForEachFile;
+ private AtomicLong[] endTimeForCurDeviceForEachFile;
+ private AtomicBoolean[] hasCurDeviceForEachFile;
+ private AtomicLong[][] startTimeForEachDevice = new AtomicLong[subTaskNum][];
+ private AtomicLong[][] endTimeForEachDevice = new AtomicLong[subTaskNum][];
+
public CrossSpaceCompactionWriter(
List<TsFileResource> targetResources, List<TsFileResource> seqFileResources)
throws IOException {
currentDeviceEndTime = new long[seqFileResources.size()];
isEmptyFile = new boolean[seqFileResources.size()];
isDeviceExistedInTargetFiles = new boolean[targetResources.size()];
+ this.targetTsFileResources = targetResources;
+ startTimeForCurDeviceForEachFile = new AtomicLong[targetResources.size()];
+ endTimeForCurDeviceForEachFile = new AtomicLong[targetResources.size()];
+ hasCurDeviceForEachFile = new AtomicBoolean[targetResources.size()];
+ long memorySizeForEachWriter =
+ (long)
+ (SystemInfo.getInstance().getMemorySizeForCompaction()
+ / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()
+ * IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getChunkMetadataSizeProportionInCompaction()
+ / targetResources.size());
for (int i = 0; i < targetResources.size(); i++) {
this.fileWriterList.add(new TsFileIOWriter(targetResources.get(i).getTsFile()));
isEmptyFile[i] = true;
+ startTimeForCurDeviceForEachFile[i] = new AtomicLong(Long.MAX_VALUE);
+ endTimeForCurDeviceForEachFile[i] = new AtomicLong(Long.MIN_VALUE);
+ hasCurDeviceForEachFile[i] = new AtomicBoolean(false);
}
this.seqTsFileResources = seqFileResources;
+ for (int i = 0, size = targetResources.size(); i < subTaskNum; ++i) {
+ startTimeForEachDevice[i] = new AtomicLong[size];
+ endTimeForEachDevice[i] = new AtomicLong[size];
+ for (int j = 0; j < size; ++j) {
+ startTimeForEachDevice[i][j] = new AtomicLong(Long.MAX_VALUE);
+ endTimeForEachDevice[i][j] = new AtomicLong(Long.MIN_VALUE);
+ }
+ }
}
@Override
@@ -86,6 +120,16 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
}
isDeviceExistedInTargetFiles[i] = false;
}
+ for (int i = 0, size = targetTsFileResources.size(); i < size; ++i) {
+ for (int j = 0; j < subTaskNum; ++j) {
+ targetTsFileResources
+ .get(i)
+ .updateStartTime(deviceId, startTimeForEachDevice[j][i].getAndSet(Long.MAX_VALUE));
+ targetTsFileResources
+ .get(i)
+ .updateEndTime(deviceId, endTimeForEachDevice[j][i].getAndSet(Long.MIN_VALUE));
+ }
+ }
deviceId = null;
}
@@ -99,6 +143,9 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
public void write(long timestamp, Object value, int subTaskId) throws IOException {
checkTimeAndMayFlushChunkToCurrentFile(timestamp, subTaskId);
writeDataPoint(timestamp, value, subTaskId);
+ int fileIndex = seqFileIndexArray[subTaskId];
+ startTimeForEachDevice[subTaskId][fileIndex].accumulateAndGet(timestamp, Math::min);
+ endTimeForEachDevice[subTaskId][fileIndex].accumulateAndGet(timestamp, Math::max);
checkChunkSizeAndMayOpenANewChunk(fileWriterList.get(seqFileIndexArray[subTaskId]), subTaskId);
isDeviceExistedInTargetFiles[seqFileIndexArray[subTaskId]] = true;
isEmptyFile[seqFileIndexArray[subTaskId]] = false;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
index af2cc53c67..18fa51d7a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
@@ -18,21 +18,43 @@
*/
package org.apache.iotdb.db.engine.compaction.writer;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
public class InnerSpaceCompactionWriter extends AbstractCompactionWriter {
private TsFileIOWriter fileWriter;
private boolean isEmptyFile;
+ private TsFileResource resource;
+ private AtomicLong[] startTimeOfCurDevice;
+ private AtomicLong[] endTimeOfCurDevice;
public InnerSpaceCompactionWriter(TsFileResource targetFileResource) throws IOException {
- this.fileWriter = new TsFileIOWriter(targetFileResource.getTsFile());
+ long sizeForFileWriter =
+ (long)
+ (SystemInfo.getInstance().getMemorySizeForCompaction()
+ / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()
+ * IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getChunkMetadataSizeProportionInCompaction());
+ this.fileWriter = new TsFileIOWriter(targetFileResource.getTsFile(), true, sizeForFileWriter);
isEmptyFile = true;
+ resource = targetFileResource;
+ int concurrentThreadNum =
+ Math.max(1, IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum());
+ startTimeOfCurDevice = new AtomicLong[concurrentThreadNum];
+ endTimeOfCurDevice = new AtomicLong[concurrentThreadNum];
+ for (int i = 0; i < concurrentThreadNum; ++i) {
+ startTimeOfCurDevice[i] = new AtomicLong(Long.MAX_VALUE);
+ endTimeOfCurDevice[i] = new AtomicLong(Long.MIN_VALUE);
+ }
}
@Override
@@ -44,6 +66,14 @@ public class InnerSpaceCompactionWriter extends AbstractCompactionWriter {
@Override
public void endChunkGroup() throws IOException {
+ for (int i = 0; i < startTimeOfCurDevice.length; ++i) {
+ resource.updateStartTime(
+ fileWriter.getCurrentChunkGroupDeviceId(), startTimeOfCurDevice[i].get());
+ resource.updateEndTime(
+ fileWriter.getCurrentChunkGroupDeviceId(), endTimeOfCurDevice[i].get());
+ startTimeOfCurDevice[i].set(Long.MAX_VALUE);
+ endTimeOfCurDevice[i].set(Long.MIN_VALUE);
+ }
fileWriter.endChunkGroup();
}
@@ -57,6 +87,8 @@ public class InnerSpaceCompactionWriter extends AbstractCompactionWriter {
writeDataPoint(timestamp, value, subTaskId);
checkChunkSizeAndMayOpenANewChunk(fileWriter, subTaskId);
isEmptyFile = false;
+ startTimeOfCurDevice[subTaskId].set(Math.min(startTimeOfCurDevice[subTaskId].get(), timestamp));
+ endTimeOfCurDevice[subTaskId].set(Math.max(endTimeOfCurDevice[subTaskId].get(), timestamp));
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 2fd5f4db64..5ab620f7fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -39,6 +39,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -114,12 +117,14 @@ public class MemTableFlushTask {
long start = System.currentTimeMillis();
long sortTime = 0;
- // for map do not use get(key) to iterate
- for (Map.Entry<IDeviceID, IWritableMemChunkGroup> memTableEntry :
- memTable.getMemTableMap().entrySet()) {
- encodingTaskQueue.put(new StartFlushGroupIOTask(memTableEntry.getKey().toStringID()));
+ Map<IDeviceID, IWritableMemChunkGroup> memTableMap = memTable.getMemTableMap();
+ List<IDeviceID> deviceIDList = new ArrayList<>(memTableMap.keySet());
+ // sort the IDeviceID in lexicographical order
+ deviceIDList.sort(Comparator.comparing(IDeviceID::toStringID));
+ for (IDeviceID deviceID : deviceIDList) {
+ encodingTaskQueue.put(new StartFlushGroupIOTask(deviceID.toStringID()));
- final Map<String, IWritableMemChunk> value = memTableEntry.getValue().getMemChunkMap();
+ final Map<String, IWritableMemChunk> value = memTableMap.get(deviceID).getMemChunkMap();
for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) {
long startTime = System.currentTimeMillis();
IWritableMemChunk series = iWritableMemChunkEntry.getValue();
@@ -275,6 +280,7 @@ public class MemTableFlushTask {
this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
this.writer.endChunkGroup();
+ writer.checkMetadataSizeAndMayFlush();
} else {
((IChunkWriter) ioMessage).writeToFileWriter(this.writer);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 1cb31057e5..a56061935a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -172,7 +172,14 @@ public class TsFileProcessor {
this.storageGroupName = storageGroupName;
this.tsFileResource = new TsFileResource(tsfile, this);
this.storageGroupInfo = storageGroupInfo;
- this.writer = new RestorableTsFileIOWriter(tsfile);
+ this.writer =
+ new RestorableTsFileIOWriter(
+ tsfile,
+ (long)
+ (IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold()
+ * IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getChunkMetadataSizeProportionInWrite()));
this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback;
this.sequence = sequence;
logger.info("create a new tsfile processor {}", tsfile.getAbsolutePath());
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 8c26ec903c..6ec8e69fc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -1423,7 +1423,7 @@ public class PlanExecutor implements IPlanExecutor {
private void loadNewTsFileVerifyMetadata(TsFileSequenceReader tsFileSequenceReader)
throws MetadataException, QueryProcessException, IOException {
Map<String, List<TimeseriesMetadata>> metadataSet =
- tsFileSequenceReader.getAllTimeseriesMetadata();
+ tsFileSequenceReader.getAllTimeseriesMetadata(false);
for (Map.Entry<String, List<TimeseriesMetadata>> entry : metadataSet.entrySet()) {
String deviceId = entry.getKey();
PartialPath devicePath = new PartialPath(deviceId);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 5d61e6c304..6273add29f 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -69,7 +69,7 @@ public class FileLoaderUtils {
public static void updateTsFileResource(
TsFileSequenceReader reader, TsFileResource tsFileResource) throws IOException {
for (Entry<String, List<TimeseriesMetadata>> entry :
- reader.getAllTimeseriesMetadata().entrySet()) {
+ reader.getAllTimeseriesMetadata(false).entrySet()) {
for (TimeseriesMetadata timeseriesMetaData : entry.getValue()) {
tsFileResource.updateStartTime(
entry.getKey(), timeseriesMetaData.getStatistics().getStartTime());
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index b7722b67dd..dd87894e39 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.writelog.recover;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -113,7 +114,14 @@ public class TsFileRecoverPerformer {
// remove corrupted part of the TsFile
RestorableTsFileIOWriter restorableTsFileIOWriter;
try {
- restorableTsFileIOWriter = new RestorableTsFileIOWriter(file);
+ restorableTsFileIOWriter =
+ new RestorableTsFileIOWriter(
+ file,
+ (long)
+ (IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold()
+ * IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getChunkMetadataSizeProportionInWrite()));
} catch (NotCompatibleTsFileException e) {
boolean result = file.delete();
logger.warn("TsFile {} is incompatible. Delete it successfully {}", filePath, result);
@@ -180,7 +188,7 @@ public class TsFileRecoverPerformer {
try (TsFileSequenceReader reader =
new TsFileSequenceReader(tsFileResource.getTsFile().getAbsolutePath(), true)) {
for (Entry<String, List<TimeseriesMetadata>> entry :
- reader.getAllTimeseriesMetadata().entrySet()) {
+ reader.getAllTimeseriesMetadata(false).entrySet()) {
for (TimeseriesMetadata timeseriesMetaData : entry.getValue()) {
tsFileResource.updateStartTime(
entry.getKey(), timeseriesMetaData.getStatistics().getStartTime());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
index 831f8cd120..9ee1f7f566 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
@@ -186,6 +186,14 @@ public class ChunkMetadata implements IChunkMetadata {
return chunkMetaData;
}
+ public static ChunkMetadata deserializeFrom(ByteBuffer buffer, TSDataType dataType) {
+ ChunkMetadata chunkMetadata = new ChunkMetadata();
+ chunkMetadata.tsDataType = dataType;
+ chunkMetadata.offsetOfChunkHeader = ReadWriteIOUtils.readLong(buffer);
+ chunkMetadata.statistics = Statistics.deserialize(buffer, dataType);
+ return chunkMetadata;
+ }
+
@Override
public long getVersion() {
return version;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
index 062ffd6183..baa93ce9da 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
@@ -123,7 +123,7 @@ public class MetadataIndexConstructor {
* @param out tsfile output
* @param type MetadataIndexNode type
*/
- private static MetadataIndexNode generateRootNode(
+ public static MetadataIndexNode generateRootNode(
Queue<MetadataIndexNode> metadataIndexNodeQueue, TsFileOutput out, MetadataIndexNodeType type)
throws IOException {
int queueSize = metadataIndexNodeQueue.size();
@@ -148,7 +148,7 @@ public class MetadataIndexConstructor {
return metadataIndexNodeQueue.poll();
}
- private static void addCurrentIndexNodeToQueue(
+ public static void addCurrentIndexNodeToQueue(
MetadataIndexNode currentIndexNode,
Queue<MetadataIndexNode> metadataIndexNodeQueue,
TsFileOutput out)
@@ -156,4 +156,39 @@ public class MetadataIndexConstructor {
currentIndexNode.setEndOffset(out.getPosition());
metadataIndexNodeQueue.add(currentIndexNode);
}
+
+ public static MetadataIndexNode checkAndBuildLevelIndex(
+ Map<String, MetadataIndexNode> deviceMetadataIndexMap, TsFileOutput out) throws IOException {
+ // if not exceed the max child nodes num, ignore the device index and directly point to the
+ // measurement
+ if (deviceMetadataIndexMap.size() <= config.getMaxDegreeOfIndexNode()) {
+ MetadataIndexNode metadataIndexNode =
+ new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
+ for (Map.Entry<String, MetadataIndexNode> entry : deviceMetadataIndexMap.entrySet()) {
+ metadataIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), out.getPosition()));
+ entry.getValue().serializeTo(out.wrapAsStream());
+ }
+ metadataIndexNode.setEndOffset(out.getPosition());
+ return metadataIndexNode;
+ }
+
+ // else, build level index for devices
+ Queue<MetadataIndexNode> deviceMetadataIndexQueue = new ArrayDeque<>();
+ MetadataIndexNode currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
+
+ for (Map.Entry<String, MetadataIndexNode> entry : deviceMetadataIndexMap.entrySet()) {
+ // when constructing from internal node, each node is related to an entry
+ if (currentIndexNode.isFull()) {
+ addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadataIndexQueue, out);
+ currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
+ }
+ currentIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), out.getPosition()));
+ entry.getValue().serializeTo(out.wrapAsStream());
+ }
+ addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadataIndexQueue, out);
+ MetadataIndexNode deviceMetadataIndexNode =
+ generateRootNode(deviceMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_DEVICE);
+ deviceMetadataIndexNode.setEndOffset(out.getPosition());
+ return deviceMetadataIndexNode;
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
index 3f6f6336b3..1d3972cafe 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
@@ -74,7 +74,7 @@ public class MetadataIndexNode {
this.children.add(metadataIndexEntry);
}
- boolean isFull() {
+ public boolean isFull() {
return children.size() >= config.getMaxDegreeOfIndexNode();
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
index 95e01e2da1..7b70f54ebe 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
@@ -119,6 +119,19 @@ public class TsFileMetadata {
return byteLen;
}
+ public int serializeBloomFilter(OutputStream outputStream, BloomFilter filter)
+ throws IOException {
+ int byteLen = 0;
+ byte[] bytes = filter.serialize();
+ byteLen += ReadWriteForEncodingUtils.writeUnsignedVarInt(bytes.length, outputStream);
+ outputStream.write(bytes);
+ byteLen += bytes.length;
+ byteLen += ReadWriteForEncodingUtils.writeUnsignedVarInt(filter.getSize(), outputStream);
+ byteLen +=
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(filter.getHashFunctionSize(), outputStream);
+ return byteLen;
+ }
+
/**
* build bloom filter
*
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
index 48d9b2a38c..73ae05703c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
@@ -62,7 +62,7 @@ public enum TSDataType {
return getTsDataType(type);
}
- private static TSDataType getTsDataType(byte type) {
+ public static TSDataType getTsDataType(byte type) {
switch (type) {
case 0:
return TSDataType.BOOLEAN;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 90ec11349f..3a413b2f98 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -929,7 +929,8 @@ public class TsFileSequenceReader implements AutoCloseable {
}
/* TimeseriesMetadata don't need deserialize chunk metadata list */
- public Map<String, List<TimeseriesMetadata>> getAllTimeseriesMetadata() throws IOException {
+ public Map<String, List<TimeseriesMetadata>> getAllTimeseriesMetadata(boolean needChunkMetadata)
+ throws IOException {
if (tsFileMetaData == null) {
readFileMetadata();
}
@@ -949,7 +950,7 @@ public class TsFileSequenceReader implements AutoCloseable {
null,
metadataIndexNode.getNodeType(),
timeseriesMetadataMap,
- false);
+ needChunkMetadata);
}
return timeseriesMetadataMap;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
index 49553e42d9..00536ca1dc 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
@@ -22,7 +22,13 @@ import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.file.metadata.*;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -41,8 +47,15 @@ import org.apache.iotdb.tsfile.v2.file.metadata.TsFileMetadataV2;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
import java.util.stream.Collectors;
public class TsFileSequenceReaderForV2 extends TsFileSequenceReader implements AutoCloseable {
@@ -413,7 +426,8 @@ public class TsFileSequenceReaderForV2 extends TsFileSequenceReader implements A
}
@Override
- public Map<String, List<TimeseriesMetadata>> getAllTimeseriesMetadata() throws IOException {
+ public Map<String, List<TimeseriesMetadata>> getAllTimeseriesMetadata(boolean needChunkMetadata)
+ throws IOException {
if (tsFileMetaData == null) {
readFileMetadata();
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index 70a5d8cf9f..ae2afdb8e1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -78,6 +78,18 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
this(file, true);
}
+ /**
+ * @param file a given tsfile path you want to (continue to) write
+ * @throws IOException if write failed, or the file is broken but autoRepair==false.
+ */
+ public RestorableTsFileIOWriter(File file, long maxMetadataSize) throws IOException {
+ this(file, true);
+ this.maxMetadataSize = maxMetadataSize;
+ this.enableMemoryControl = true;
+ this.chunkMetadataTempFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX);
+ this.checkMetadataSizeAndMayFlush();
+ }
+
public RestorableTsFileIOWriter(File file, boolean truncate) throws IOException {
if (logger.isDebugEnabled()) {
logger.debug("{} is opened.", file.getName());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 2f865f297f..7fdbac27df 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -26,35 +26,47 @@ import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.BloomFilter;
import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.writer.tsmiterator.TSMIterator;
+import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.TreeMap;
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.addCurrentIndexNodeToQueue;
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.checkAndBuildLevelIndex;
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.generateRootNode;
+
/**
* TsFileIOWriter is used to construct metadata and write data stored in memory to output stream.
*/
@@ -93,6 +105,21 @@ public class TsFileIOWriter implements AutoCloseable {
private long minPlanIndex;
private long maxPlanIndex;
+ // the following variable is used for memory control
+ protected long maxMetadataSize;
+ protected long currentChunkMetadataSize = 0L;
+ protected File chunkMetadataTempFile;
+ protected LocalTsFileOutput tempOutput;
+ protected volatile boolean hasChunkMetadataInDisk = false;
+ protected String currentSeries = null;
+ // record the total num of path in order to make bloom filter
+ protected int pathCount = 0;
+ protected boolean enableMemoryControl = false;
+ private Path lastSerializePath = null;
+ protected LinkedList<Long> endPosInCMTForDevice = new LinkedList<>();
+ private volatile int chunkMetadataCount = 0;
+ public static final String CHUNK_METADATA_TEMP_FILE_SUFFIX = ".meta";
+
/** empty construct function. */
protected TsFileIOWriter() {}
@@ -126,6 +153,15 @@ public class TsFileIOWriter implements AutoCloseable {
this.out = output;
}
+ /** for write with memory control */
+ public TsFileIOWriter(File file, boolean enableMemoryControl, long maxMetadataSize)
+ throws IOException {
+ this(file);
+ this.enableMemoryControl = enableMemoryControl;
+ this.maxMetadataSize = maxMetadataSize;
+ chunkMetadataTempFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX);
+ }
+
/**
* Writes given bytes to output stream. This method is called when total memory size exceeds the
* chunk group size threshold.
@@ -236,6 +272,10 @@ public class TsFileIOWriter implements AutoCloseable {
/** end chunk and write some log. */
public void endCurrentChunk() {
+ if (enableMemoryControl) {
+ this.currentChunkMetadataSize += currentChunkMetadata.calculateRamSize();
+ }
+ chunkMetadataCount++;
chunkMetadataList.add(currentChunkMetadata);
currentChunkMetadata = null;
}
@@ -247,47 +287,14 @@ public class TsFileIOWriter implements AutoCloseable {
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void endFile() throws IOException {
- long metaOffset = out.getPosition();
-
- // serialize the SEPARATOR of MetaData
- ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
-
- // group ChunkMetadata by series
- Map<Path, List<IChunkMetadata>> chunkMetadataListMap = new TreeMap<>();
-
- for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
- List<ChunkMetadata> chunkMetadatas = chunkGroupMetadata.getChunkMetadataList();
- for (IChunkMetadata chunkMetadata : chunkMetadatas) {
- Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
- chunkMetadataListMap.computeIfAbsent(series, k -> new ArrayList<>()).add(chunkMetadata);
- }
- }
-
- MetadataIndexNode metadataIndex = flushMetadataIndex(chunkMetadataListMap);
- TsFileMetadata tsFileMetaData = new TsFileMetadata();
- tsFileMetaData.setMetadataIndex(metadataIndex);
- tsFileMetaData.setMetaOffset(metaOffset);
+ checkInMemoryPathCount();
+ readChunkMetadataAndConstructIndexTree();
long footerIndex = out.getPosition();
if (logger.isDebugEnabled()) {
logger.debug("start to flush the footer,file pos:{}", footerIndex);
}
- // write TsFileMetaData
- int size = tsFileMetaData.serializeTo(out.wrapAsStream());
- if (logger.isDebugEnabled()) {
- logger.debug("finish flushing the footer {}, file pos:{}", tsFileMetaData, out.getPosition());
- }
-
- // write bloom filter
- size += tsFileMetaData.serializeBloomFilter(out.wrapAsStream(), chunkMetadataListMap.keySet());
- if (logger.isDebugEnabled()) {
- logger.debug("finish flushing the bloom filter file pos:{}", out.getPosition());
- }
-
- // write TsFileMetaData size
- ReadWriteIOUtils.write(size, out.wrapAsStream()); // write the size of the file metadata.
-
// write magic string
out.write(MAGIC_STRING_BYTES);
@@ -296,66 +303,121 @@ public class TsFileIOWriter implements AutoCloseable {
if (resourceLogger.isDebugEnabled() && file != null) {
resourceLogger.debug("{} writer is closed.", file.getName());
}
+ if (file != null) {
+ File chunkMetadataFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX);
+ if (chunkMetadataFile.exists()) {
+ FileUtils.delete(chunkMetadataFile);
+ }
+ }
canWrite = false;
}
- /**
- * Flush TsFileMetadata, including ChunkMetadataList and TimeseriesMetaData
- *
- * @param chunkMetadataListMap chunkMetadata that Path.mask == 0
- * @return MetadataIndexEntry list in TsFileMetadata
- */
- private MetadataIndexNode flushMetadataIndex(Map<Path, List<IChunkMetadata>> chunkMetadataListMap)
- throws IOException {
-
- // convert ChunkMetadataList to this field
- deviceTimeseriesMetadataMap = new LinkedHashMap<>();
- // create device -> TimeseriesMetaDataList Map
- for (Map.Entry<Path, List<IChunkMetadata>> entry : chunkMetadataListMap.entrySet()) {
- // for ordinary path
- flushOneChunkMetadata(entry.getKey(), entry.getValue());
+ private void readChunkMetadataAndConstructIndexTree() throws IOException {
+ if (tempOutput != null) {
+ tempOutput.close();
}
+ long metaOffset = out.getPosition();
- // construct TsFileMetadata and return
- return MetadataIndexConstructor.constructMetadataIndex(deviceTimeseriesMetadataMap, out);
- }
+ // serialize the SEPARATOR of MetaData
+ ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
- /**
- * Flush one chunkMetadata
- *
- * @param path Path of chunk
- * @param chunkMetadataList List of chunkMetadata about path(previous param)
- */
- private void flushOneChunkMetadata(Path path, List<IChunkMetadata> chunkMetadataList)
- throws IOException {
- // create TimeseriesMetaData
- PublicBAOS publicBAOS = new PublicBAOS();
- TSDataType dataType = chunkMetadataList.get(chunkMetadataList.size() - 1).getDataType();
- Statistics seriesStatistics = Statistics.getStatsByType(dataType);
-
- int chunkMetadataListLength = 0;
- boolean serializeStatistic = (chunkMetadataList.size() > 1);
- // flush chunkMetadataList one by one
- for (IChunkMetadata chunkMetadata : chunkMetadataList) {
- if (!chunkMetadata.getDataType().equals(dataType)) {
- continue;
+ TSMIterator tsmIterator =
+ hasChunkMetadataInDisk
+ ? TSMIterator.getTSMIteratorInDisk(
+ chunkMetadataTempFile, chunkGroupMetadataList, endPosInCMTForDevice)
+ : TSMIterator.getTSMIteratorInMemory(chunkGroupMetadataList);
+ Map<String, MetadataIndexNode> deviceMetadataIndexMap = new TreeMap<>();
+ Queue<MetadataIndexNode> measurementMetadataIndexQueue = new ArrayDeque<>();
+ String currentDevice = null;
+ String prevDevice = null;
+ MetadataIndexNode currentIndexNode =
+ new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+ TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
+ int seriesIdxForCurrDevice = 0;
+ BloomFilter filter =
+ BloomFilter.getEmptyBloomFilter(
+ TSFileDescriptor.getInstance().getConfig().getBloomFilterErrorRate(), pathCount);
+
+ int indexCount = 0;
+ while (tsmIterator.hasNext()) {
+ // read in all chunk metadata of one series
+ // construct the timeseries metadata for this series
+ Pair<String, TimeseriesMetadata> timeseriesMetadataPair = tsmIterator.next();
+ TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right;
+ currentSeries = timeseriesMetadataPair.left;
+
+ indexCount++;
+ // build bloom filter
+ filter.add(currentSeries);
+ // construct the index tree node for the series
+ Path currentPath = null;
+ if (timeseriesMetadata.getTSDataType() == TSDataType.VECTOR) {
+ // this series is the time column of the aligned device
+ // the full series path will be like "root.sg.d."
+ // we remove the last . in the series id here
+ currentPath = new Path(currentSeries);
+ currentDevice = currentSeries.substring(0, currentSeries.length() - 1);
+ } else {
+ currentPath = new Path(currentSeries, true);
+ currentDevice = currentPath.getDevice();
+ }
+ if (!currentDevice.equals(prevDevice)) {
+ if (prevDevice != null) {
+ addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
+ deviceMetadataIndexMap.put(
+ prevDevice,
+ generateRootNode(
+ measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
+ currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+ }
+ measurementMetadataIndexQueue = new ArrayDeque<>();
+ seriesIdxForCurrDevice = 0;
+ }
+
+ if (seriesIdxForCurrDevice % config.getMaxDegreeOfIndexNode() == 0) {
+ if (currentIndexNode.isFull()) {
+ addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
+ currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+ }
+ if (timeseriesMetadata.getTSDataType() != TSDataType.VECTOR) {
+ currentIndexNode.addEntry(
+ new MetadataIndexEntry(currentPath.getMeasurement(), out.getPosition()));
+ } else {
+ currentIndexNode.addEntry(new MetadataIndexEntry("", out.getPosition()));
+ }
}
- chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, serializeStatistic);
- seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
+
+ prevDevice = currentDevice;
+ seriesIdxForCurrDevice++;
+ // serialize the timeseries metadata to file
+ timeseriesMetadata.serializeTo(out.wrapAsStream());
+ }
+
+ addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
+ if (prevDevice != null) {
+ deviceMetadataIndexMap.put(
+ prevDevice,
+ generateRootNode(
+ measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
}
- TimeseriesMetadata timeseriesMetadata =
- new TimeseriesMetadata(
- (byte)
- ((serializeStatistic ? (byte) 1 : (byte) 0) | chunkMetadataList.get(0).getMask()),
- chunkMetadataListLength,
- path.getMeasurement(),
- dataType,
- seriesStatistics,
- publicBAOS);
- deviceTimeseriesMetadataMap
- .computeIfAbsent(path.getDevice(), k -> new ArrayList<>())
- .add(timeseriesMetadata);
+ MetadataIndexNode metadataIndex = checkAndBuildLevelIndex(deviceMetadataIndexMap, out);
+
+ TsFileMetadata tsFileMetadata = new TsFileMetadata();
+ tsFileMetadata.setMetadataIndex(metadataIndex);
+ tsFileMetadata.setMetaOffset(metaOffset);
+
+ int size = tsFileMetadata.serializeTo(out.wrapAsStream());
+ size += tsFileMetadata.serializeBloomFilter(out.wrapAsStream(), filter);
+
+ // write TsFileMetaData size
+ ReadWriteIOUtils.write(size, out.wrapAsStream());
+ }
+
+ private void checkInMemoryPathCount() {
+ for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+ pathCount += chunkGroupMetadata.getChunkMetadataList().size();
+ }
}
/**
@@ -399,6 +461,9 @@ public class TsFileIOWriter implements AutoCloseable {
public void close() throws IOException {
canWrite = false;
out.close();
+ if (tempOutput != null) {
+ this.tempOutput.close();
+ }
}
void writeSeparatorMaskForTest() throws IOException {
@@ -477,6 +542,30 @@ public class TsFileIOWriter implements AutoCloseable {
* @return DeviceTimeseriesMetadataMap
*/
public Map<String, List<TimeseriesMetadata>> getDeviceTimeseriesMetadataMap() {
+ Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap = new TreeMap<>();
+ Map<String, Map<String, List<IChunkMetadata>>> chunkMetadataMap = new TreeMap<>();
+ for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+ for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
+ chunkMetadataMap
+ .computeIfAbsent(chunkGroupMetadata.getDevice(), x -> new TreeMap<>())
+ .computeIfAbsent(chunkMetadata.getMeasurementUid(), x -> new ArrayList<>())
+ .add(chunkMetadata);
+ }
+ }
+ for (String device : chunkMetadataMap.keySet()) {
+ Map<String, List<IChunkMetadata>> seriesToChunkMetadataMap = chunkMetadataMap.get(device);
+ for (Map.Entry<String, List<IChunkMetadata>> entry : seriesToChunkMetadataMap.entrySet()) {
+ try {
+ deviceTimeseriesMetadataMap
+ .computeIfAbsent(device, x -> new ArrayList<>())
+ .add(TSMIterator.constructOneTimeseriesMetadata(entry.getKey(), entry.getValue()));
+ } catch (IOException e) {
+ logger.error("Failed to get device timeseries metadata map", e);
+ return null;
+ }
+ }
+ }
+
return deviceTimeseriesMetadataMap;
}
@@ -495,4 +584,98 @@ public class TsFileIOWriter implements AutoCloseable {
public void setMaxPlanIndex(long maxPlanIndex) {
this.maxPlanIndex = maxPlanIndex;
}
+
+ /**
+ * Check if the size of chunk metadata in memory is greater than the given threshold. If so, the
+ * chunk metadata will be written to a temp files. <b>Notice! If you are writing a aligned device
+ * in row, you should make sure all data of current writing device has been written before this
+ * method is called. For writing not aligned series or writing aligned series in column, you
+ * should make sure that all data of one series is written before you call this function.</b>
+ *
+ * @throws IOException
+ */
+ public void checkMetadataSizeAndMayFlush() throws IOException {
+ // This function should be called after all data of an aligned device has been written
+ if (enableMemoryControl && currentChunkMetadataSize > maxMetadataSize) {
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Flushing chunk metadata, total size is {}, count is {}, avg size is {}",
+ currentChunkMetadataSize,
+ chunkMetadataCount,
+ currentChunkMetadataSize / chunkMetadataCount);
+ }
+ sortAndFlushChunkMetadata();
+ } catch (IOException e) {
+ logger.error("Meets exception when flushing metadata to temp file for {}", file, e);
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Sort the chunk metadata by the lexicographical order and the start time of the chunk, then
+ * flush them to a temp file.
+ *
+ * @throws IOException
+ */
+ protected void sortAndFlushChunkMetadata() throws IOException {
+ // group by series
+ List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList =
+ TSMIterator.sortChunkMetadata(
+ chunkGroupMetadataList, currentChunkGroupDeviceId, chunkMetadataList);
+ if (tempOutput == null) {
+ tempOutput = new LocalTsFileOutput(new FileOutputStream(chunkMetadataTempFile));
+ }
+ hasChunkMetadataInDisk = true;
+ for (Pair<Path, List<IChunkMetadata>> pair : sortedChunkMetadataList) {
+ Path seriesPath = pair.left;
+ boolean isNewPath = !seriesPath.equals(lastSerializePath);
+ if (isNewPath) {
+ // record the count of path to construct bloom filter later
+ pathCount++;
+ }
+ List<IChunkMetadata> iChunkMetadataList = pair.right;
+ writeChunkMetadataToTempFile(iChunkMetadataList, seriesPath, isNewPath);
+ lastSerializePath = seriesPath;
+ logger.debug("Flushing {}", seriesPath);
+ }
+ // clear the cache metadata to release the memory
+ chunkGroupMetadataList.clear();
+ if (chunkMetadataList != null) {
+ chunkMetadataList.clear();
+ }
+ chunkMetadataCount = 0;
+ currentChunkMetadataSize = 0;
+ }
+
+ private void writeChunkMetadataToTempFile(
+ List<IChunkMetadata> iChunkMetadataList, Path seriesPath, boolean isNewPath)
+ throws IOException {
+ // [DeviceId] measurementId datatype size chunkMetadataBuffer
+ if (lastSerializePath == null
+ || !seriesPath.getDevice().equals(lastSerializePath.getDevice())) {
+ // mark the end position of last device
+ endPosInCMTForDevice.add(tempOutput.getPosition());
+ // serialize the device
+ // for each device, we only serialize it once, in order to save io
+ ReadWriteIOUtils.write(seriesPath.getDevice(), tempOutput.wrapAsStream());
+ }
+ if (isNewPath && iChunkMetadataList.size() > 0) {
+ // serialize the public info of this measurement
+ ReadWriteIOUtils.writeVar(seriesPath.getMeasurement(), tempOutput.wrapAsStream());
+ ReadWriteIOUtils.write(iChunkMetadataList.get(0).getDataType(), tempOutput.wrapAsStream());
+ }
+ PublicBAOS buffer = new PublicBAOS();
+ int totalSize = 0;
+ for (IChunkMetadata chunkMetadata : iChunkMetadataList) {
+ totalSize += chunkMetadata.serializeTo(buffer, true);
+ }
+ ReadWriteIOUtils.write(totalSize, tempOutput.wrapAsStream());
+ buffer.writeTo(tempOutput);
+ }
+
+ public String getCurrentChunkGroupDeviceId() {
+ return currentChunkGroupDeviceId;
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/DiskTSMIterator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/DiskTSMIterator.java
new file mode 100644
index 0000000000..fd02f1438a
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/DiskTSMIterator.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write.writer.tsmiterator;
+
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * This class read ChunkMetadata iteratively from disk(.cmt file) and memory(list of
+ * ChunkGroupMetadata), and construct them as TimeseriesMetadata. It will read ChunkMetadata in disk
+ * first, and after all ChunkMetadata in disk is read, it will read ChunkMetadata in memory.
+ */
+public class DiskTSMIterator extends TSMIterator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DiskTSMIterator.class);
+
+ private LinkedList<Long> endPosForEachDevice;
+ private File cmtFile;
+ private LocalTsFileInput input;
+ private long fileLength = 0;
+ private long currentPos = 0;
+ private long nextEndPosForDevice = 0;
+ private String currentDevice;
+ private boolean remainsInFile = true;
+
+ protected DiskTSMIterator(
+ File cmtFile,
+ List<ChunkGroupMetadata> chunkGroupMetadataList,
+ LinkedList<Long> endPosForEachDevice)
+ throws IOException {
+ super(chunkGroupMetadataList);
+ this.cmtFile = cmtFile;
+ this.endPosForEachDevice = endPosForEachDevice;
+ this.input = new LocalTsFileInput(cmtFile.toPath());
+ this.fileLength = cmtFile.length();
+ this.nextEndPosForDevice = endPosForEachDevice.removeFirst();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return remainsInFile || iterator.hasNext();
+ }
+
+ @Override
+ public Pair<String, TimeseriesMetadata> next() {
+ try {
+ if (remainsInFile) {
+ // deserialize from file
+ return getTimeSerisMetadataFromFile();
+ } else {
+ // get from memory iterator
+ return super.next();
+ }
+ } catch (IOException e) {
+ LOG.error("Meets IOException when reading timeseries metadata from disk", e);
+ return null;
+ }
+ }
+
+ private Pair<String, TimeseriesMetadata> getTimeSerisMetadataFromFile() throws IOException {
+ if (currentPos == nextEndPosForDevice) {
+ // deserialize the current device name
+ currentDevice = ReadWriteIOUtils.readString(input.wrapAsInputStream());
+ nextEndPosForDevice =
+ endPosForEachDevice.size() > 0 ? endPosForEachDevice.removeFirst() : fileLength;
+ }
+ // deserialize public info for measurement
+ String measurementUid = ReadWriteIOUtils.readVarIntString(input.wrapAsInputStream());
+ byte dataTypeInByte = ReadWriteIOUtils.readByte(input.wrapAsInputStream());
+ TSDataType dataType = TSDataType.getTsDataType(dataTypeInByte);
+ int chunkBufferSize = ReadWriteIOUtils.readInt(input.wrapAsInputStream());
+ ByteBuffer chunkBuffer = ByteBuffer.allocate(chunkBufferSize);
+ int readSize = ReadWriteIOUtils.readAsPossible(input, chunkBuffer);
+ if (readSize < chunkBufferSize) {
+ throw new IOException(
+ String.format(
+ "Expected to read %s bytes, but actually read %s bytes", chunkBufferSize, readSize));
+ }
+ chunkBuffer.flip();
+
+ // deserialize chunk metadata from chunk buffer
+ List<IChunkMetadata> chunkMetadataList = new ArrayList<>();
+ while (chunkBuffer.hasRemaining()) {
+ chunkMetadataList.add(ChunkMetadata.deserializeFrom(chunkBuffer, dataType));
+ }
+ updateCurrentPos();
+ return new Pair<>(
+ currentDevice + "." + measurementUid,
+ constructOneTimeseriesMetadata(measurementUid, chunkMetadataList));
+ }
+
+ private void updateCurrentPos() throws IOException {
+ currentPos = input.position();
+ if (currentPos >= fileLength) {
+ remainsInFile = false;
+ input.close();
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/TSMIterator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/TSMIterator.java
new file mode 100644
index 0000000000..f11242f296
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/TSMIterator.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.writer.tsmiterator;
+
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * TSMIterator returns full path of series and its TimeseriesMetadata iteratively. It accepts data
+ * source from memory or disk. Static method getTSMIteratorInMemory returns a TSMIterator that reads
+ * from memory, and static method getTSMIteratorInDisk returns a TSMIterator that reads from disk.
+ */
+public class TSMIterator {
+ private static final Logger LOG = LoggerFactory.getLogger(TSMIterator.class);
+ protected List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList;
+ protected Iterator<Pair<Path, List<IChunkMetadata>>> iterator;
+
+ protected TSMIterator(List<ChunkGroupMetadata> chunkGroupMetadataList) {
+ this.sortedChunkMetadataList = sortChunkMetadata(chunkGroupMetadataList, null, null);
+ this.iterator = sortedChunkMetadataList.iterator();
+ }
+
+ public static TSMIterator getTSMIteratorInMemory(
+ List<ChunkGroupMetadata> chunkGroupMetadataList) {
+ return new TSMIterator(chunkGroupMetadataList);
+ }
+
+ public static TSMIterator getTSMIteratorInDisk(
+ File cmtFile, List<ChunkGroupMetadata> chunkGroupMetadataList, LinkedList<Long> serializePos)
+ throws IOException {
+ return new DiskTSMIterator(cmtFile, chunkGroupMetadataList, serializePos);
+ }
+
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ public Pair<String, TimeseriesMetadata> next() throws IOException {
+ Pair<Path, List<IChunkMetadata>> nextPair = iterator.next();
+ return new Pair<>(
+ nextPair.left.getFullPath(),
+ constructOneTimeseriesMetadata(nextPair.left.getMeasurement(), nextPair.right));
+ }
+
+ public static TimeseriesMetadata constructOneTimeseriesMetadata(
+ String measurementId, List<IChunkMetadata> chunkMetadataList) throws IOException {
+ // create TimeseriesMetaData
+ PublicBAOS publicBAOS = new PublicBAOS();
+ TSDataType dataType = chunkMetadataList.get(chunkMetadataList.size() - 1).getDataType();
+ Statistics seriesStatistics = Statistics.getStatsByType(dataType);
+
+ int chunkMetadataListLength = 0;
+ boolean serializeStatistic = (chunkMetadataList.size() > 1);
+ // flush chunkMetadataList one by one
+ for (IChunkMetadata chunkMetadata : chunkMetadataList) {
+ if (!chunkMetadata.getDataType().equals(dataType)) {
+ continue;
+ }
+ chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, serializeStatistic);
+ seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
+ }
+
+ TimeseriesMetadata timeseriesMetadata =
+ new TimeseriesMetadata(
+ (byte)
+ ((serializeStatistic ? (byte) 1 : (byte) 0) | chunkMetadataList.get(0).getMask()),
+ chunkMetadataListLength,
+ measurementId,
+ dataType,
+ seriesStatistics,
+ publicBAOS);
+ return timeseriesMetadata;
+ }
+
+ public static List<Pair<Path, List<IChunkMetadata>>> sortChunkMetadata(
+ List<ChunkGroupMetadata> chunkGroupMetadataList,
+ String currentDevice,
+ List<ChunkMetadata> chunkMetadataList) {
+ Map<String, Map<Path, List<IChunkMetadata>>> chunkMetadataMap = new TreeMap<>();
+ List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList = new LinkedList<>();
+ for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+ chunkMetadataMap.computeIfAbsent(chunkGroupMetadata.getDevice(), x -> new TreeMap<>());
+ for (IChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
+ chunkMetadataMap
+ .get(chunkGroupMetadata.getDevice())
+ .computeIfAbsent(
+ new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid()),
+ x -> new ArrayList<>())
+ .add(chunkMetadata);
+ }
+ }
+ if (currentDevice != null) {
+ for (IChunkMetadata chunkMetadata : chunkMetadataList) {
+ chunkMetadataMap
+ .computeIfAbsent(currentDevice, x -> new TreeMap<>())
+ .computeIfAbsent(
+ new Path(currentDevice, chunkMetadata.getMeasurementUid()), x -> new ArrayList<>())
+ .add(chunkMetadata);
+ }
+ }
+
+ for (Map.Entry<String, Map<Path, List<IChunkMetadata>>> entry : chunkMetadataMap.entrySet()) {
+ Map<Path, List<IChunkMetadata>> seriesChunkMetadataMap = entry.getValue();
+ for (Map.Entry<Path, List<IChunkMetadata>> seriesChunkMetadataEntry :
+ seriesChunkMetadataMap.entrySet()) {
+ sortedChunkMetadataList.add(
+ new Pair<>(seriesChunkMetadataEntry.getKey(), seriesChunkMetadataEntry.getValue()));
+ }
+ }
+ return sortedChunkMetadataList;
+ }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java
index 5e518d4280..f7137e95be 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java
@@ -234,7 +234,7 @@ public class MetadataIndexConstructorTest {
assertFalse(iterator.hasNext());
Map<String, List<TimeseriesMetadata>> allTimeseriesMetadata =
- reader.getAllTimeseriesMetadata();
+ reader.getAllTimeseriesMetadata(false);
for (int j = 0; j < actualDevices.size(); j++) {
for (int i = 0; i < actualMeasurements.get(j).size(); i++) {
assertEquals(
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
index 4c2321928a..ba955912f1 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
@@ -147,7 +147,7 @@ public class TsFileIOWriterTest {
// make sure timeseriesMetadata is only
Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap =
- reader.getAllTimeseriesMetadata();
+ reader.getAllTimeseriesMetadata(false);
Set<String> pathSet = new HashSet<>();
for (Map.Entry<String, List<TimeseriesMetadata>> entry :
deviceTimeseriesMetadataMap.entrySet()) {
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java
new file mode 100644
index 0000000000..c97a9a0774
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.read.reader.IChunkReader;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
+import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+/** This class provide some static method to check the integrity of tsfile */
+public class TsFileIntegrityCheckingTool {
+ private static Logger LOG = LoggerFactory.getLogger(TsFileIntegrityCheckingTool.class);
+
+ /**
+ * This method check the integrity of file by reading it from the start to the end. It mainly
+ * checks the integrity of the chunks.
+ *
+ * @param filename
+ */
+ public static void checkIntegrityBySequenceRead(String filename) {
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
+ String headMagicString = reader.readHeadMagic();
+ Assert.assertEquals(TSFileConfig.MAGIC_STRING, headMagicString);
+ String tailMagicString = reader.readTailMagic();
+ Assert.assertEquals(TSFileConfig.MAGIC_STRING, tailMagicString);
+ reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1);
+ List<long[]> timeBatch = new ArrayList<>();
+ int pageIndex = 0;
+ byte marker;
+ while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
+ switch (marker) {
+ case MetaMarker.CHUNK_HEADER:
+ case MetaMarker.TIME_CHUNK_HEADER:
+ case MetaMarker.VALUE_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
+ ChunkHeader header = reader.readChunkHeader(marker);
+ if (header.getDataSize() == 0) {
+ // empty value chunk
+ break;
+ }
+ Decoder defaultTimeDecoder =
+ Decoder.getDecoderByType(
+ TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+ TSDataType.INT64);
+ Decoder valueDecoder =
+ Decoder.getDecoderByType(header.getEncodingType(), header.getDataType());
+ int dataSize = header.getDataSize();
+ pageIndex = 0;
+ if (header.getDataType() == TSDataType.VECTOR) {
+ timeBatch.clear();
+ }
+ while (dataSize > 0) {
+ valueDecoder.reset();
+ PageHeader pageHeader =
+ reader.readPageHeader(
+ header.getDataType(),
+ (header.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER);
+ ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType());
+ if ((header.getChunkType() & (byte) TsFileConstant.TIME_COLUMN_MASK)
+ == (byte) TsFileConstant.TIME_COLUMN_MASK) { // Time Chunk
+ TimePageReader timePageReader =
+ new TimePageReader(pageHeader, pageData, defaultTimeDecoder);
+ timeBatch.add(timePageReader.getNextTimeBatch());
+ } else if ((header.getChunkType() & (byte) TsFileConstant.VALUE_COLUMN_MASK)
+ == (byte) TsFileConstant.VALUE_COLUMN_MASK) { // Value Chunk
+ ValuePageReader valuePageReader =
+ new ValuePageReader(pageHeader, pageData, header.getDataType(), valueDecoder);
+ TsPrimitiveType[] valueBatch =
+ valuePageReader.nextValueBatch(timeBatch.get(pageIndex));
+ } else { // NonAligned Chunk
+ PageReader pageReader =
+ new PageReader(
+ pageData, header.getDataType(), valueDecoder, defaultTimeDecoder, null);
+ BatchData batchData = pageReader.getAllSatisfiedPageData();
+ }
+ pageIndex++;
+ dataSize -= pageHeader.getSerializedPageSize();
+ }
+ break;
+ case MetaMarker.CHUNK_GROUP_HEADER:
+ ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
+ break;
+ case MetaMarker.OPERATION_INDEX_RANGE:
+ reader.readPlanIndex();
+ break;
+ default:
+ MetaMarker.handleUnexpectedMarker(marker);
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Meet exception when checking integrity of tsfile", e);
+ Assert.fail();
+ }
+ }
+
+ /**
+ * This method checks the integrity of the file by mimicking the process of the query, which reads
+ * the metadata index tree first, and get the timeseries metadata list and chunk metadata list.
+ * After that, this method acquires single chunk according to chunk metadata, then it deserializes
+ * the chunk, and verifies the correctness of the data.
+ *
+ * @param filename File to be check
+ * @param originData The origin data in a map format, Device -> SeriesId -> List<List<Time,Val>>,
+ * each inner list stands for a chunk.
+ */
+ public static void checkIntegrityByQuery(
+ String filename,
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData) {
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
+ Map<String, List<TimeseriesMetadata>> allTimeseriesMetadata =
+ reader.getAllTimeseriesMetadata(true);
+ Assert.assertEquals(originData.size(), allTimeseriesMetadata.size());
+ // check each series
+ for (Map.Entry<String, List<TimeseriesMetadata>> entry : allTimeseriesMetadata.entrySet()) {
+ String deviceId = entry.getKey();
+ List<TimeseriesMetadata> timeseriesMetadataList = entry.getValue();
+ boolean vectorMode = false;
+ if (timeseriesMetadataList.size() > 0
+ && timeseriesMetadataList.get(0).getTSDataType() != TSDataType.VECTOR) {
+ Assert.assertEquals(originData.get(deviceId).size(), timeseriesMetadataList.size());
+ } else {
+ vectorMode = true;
+ Assert.assertEquals(originData.get(deviceId).size(), timeseriesMetadataList.size() - 1);
+ }
+
+ if (!vectorMode) {
+ // check integrity of not aligned series
+ for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
+ // get its chunk metadata list, and read the chunk
+ String measurementId = timeseriesMetadata.getMeasurementId();
+ List<List<Pair<Long, TsPrimitiveType>>> originChunks =
+ originData.get(deviceId).get(measurementId);
+ List<IChunkMetadata> chunkMetadataList = timeseriesMetadata.getChunkMetadataList();
+ Assert.assertEquals(originChunks.size(), chunkMetadataList.size());
+ chunkMetadataList.sort(Comparator.comparing(IChunkMetadata::getStartTime));
+ for (int i = 0; i < chunkMetadataList.size(); ++i) {
+ Chunk chunk = reader.readMemChunk((ChunkMetadata) chunkMetadataList.get(i));
+ ChunkReader chunkReader = new ChunkReader(chunk, null);
+ List<Pair<Long, TsPrimitiveType>> originValue = originChunks.get(i);
+ // deserialize the chunk and verify it with origin data
+ for (int valIdx = 0; chunkReader.hasNextSatisfiedPage(); ) {
+ IPointReader pointReader = chunkReader.nextPageData().getBatchDataIterator();
+ while (pointReader.hasNextTimeValuePair()) {
+ TimeValuePair pair = pointReader.nextTimeValuePair();
+ Assert.assertEquals(
+ originValue.get(valIdx).left.longValue(), pair.getTimestamp());
+ Assert.assertEquals(originValue.get(valIdx++).right, pair.getValue());
+ }
+ }
+ }
+ }
+ } else {
+ // check integrity of vector type
+ // get the timeseries metadata of the time column
+ TimeseriesMetadata timeColumnMetadata = timeseriesMetadataList.get(0);
+ List<IChunkMetadata> timeChunkMetadataList = timeColumnMetadata.getChunkMetadataList();
+ timeChunkMetadataList.sort(Comparator.comparing(IChunkMetadata::getStartTime));
+
+ for (int i = 1; i < timeseriesMetadataList.size(); ++i) {
+ // traverse each value column
+ List<IChunkMetadata> valueChunkMetadataList =
+ timeseriesMetadataList.get(i).getChunkMetadataList();
+ Assert.assertEquals(timeChunkMetadataList.size(), valueChunkMetadataList.size());
+ List<List<Pair<Long, TsPrimitiveType>>> originDataChunks =
+ originData.get(deviceId).get(timeseriesMetadataList.get(i).getMeasurementId());
+ for (int chunkIdx = 0; chunkIdx < timeChunkMetadataList.size(); ++chunkIdx) {
+ Chunk timeChunk =
+ reader.readMemChunk((ChunkMetadata) timeChunkMetadataList.get(chunkIdx));
+ Chunk valueChunk =
+ reader.readMemChunk((ChunkMetadata) valueChunkMetadataList.get(chunkIdx));
+ // construct an aligned chunk reader using time chunk and value chunk
+ IChunkReader chunkReader =
+ new AlignedChunkReader(timeChunk, Collections.singletonList(valueChunk), null);
+ // verify the values
+ List<Pair<Long, TsPrimitiveType>> originValue = originDataChunks.get(chunkIdx);
+ for (int valIdx = 0; chunkReader.hasNextSatisfiedPage(); ) {
+ IBatchDataIterator pointReader = chunkReader.nextPageData().getBatchDataIterator();
+ while (pointReader.hasNext()) {
+ long time = pointReader.currentTime();
+ Assert.assertEquals(originValue.get(valIdx).left.longValue(), time);
+ Assert.assertEquals(
+ originValue.get(valIdx++).right.getValue(),
+ ((TsPrimitiveType[]) pointReader.currentValue())[0].getValue());
+ pointReader.next();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ } catch (IOException e) {
+ LOG.error("Meet exception when checking integrity of tsfile", e);
+ Assert.fail();
+ }
+ }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
new file mode 100644
index 0000000000..b7c6ff84db
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
@@ -0,0 +1,1303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.encoding.encoder.TSEncodingBuilder;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+import org.apache.iotdb.tsfile.write.TsFileIntegrityCheckingTool;
+import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.TimeChunkWriter;
+import org.apache.iotdb.tsfile.write.chunk.ValueChunkWriter;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.tsmiterator.TSMIterator;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+public class TsFileIOWriterMemoryControlTest {
+ private static File testFile = new File("target", "1-1-0-0.tsfile");
+ private static File emptyFile = new File("target", "temp");
+ private long TEST_CHUNK_SIZE = 1000;
+ private List<String> sortedSeriesId = new ArrayList<>();
+ private List<String> sortedDeviceId = new ArrayList<>();
+ private boolean init = false;
+
+ @Before
+ public void setUp() throws IOException {
+ if (!init) {
+ init = true;
+ for (int i = 0; i < 2048; ++i) {
+ sortedSeriesId.add("s" + i);
+ sortedDeviceId.add("root.sg.d" + i);
+ }
+ sortedSeriesId.sort((String::compareTo));
+ sortedDeviceId.sort((String::compareTo));
+ }
+ TEST_CHUNK_SIZE = 1000;
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (testFile.exists()) {
+ FileUtils.delete(testFile);
+ }
+ if (new File(testFile.getPath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX).exists()) {
+ FileUtils.delete(
+ new File(testFile.getPath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX));
+ }
+ if (emptyFile.exists()) {
+ FileUtils.delete(emptyFile);
+ }
+ }
+
+ /** The following tests is for ChunkMetadata serialization and deserialization. */
+ @Test
+ public void testSerializeAndDeserializeChunkMetadata() throws IOException {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) {
+ List<ChunkMetadata> originChunkMetadataList = new ArrayList<>();
+ for (int i = 0; i < 10; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ for (int j = 0; j < 5; ++j) {
+ ChunkWriterImpl chunkWriter;
+ switch (j) {
+ case 0:
+ chunkWriter = generateIntData(j, 0L, new ArrayList<>());
+ break;
+ case 1:
+ chunkWriter = generateBooleanData(j, 0, new ArrayList<>());
+ break;
+ case 2:
+ chunkWriter = generateFloatData(j, 0L, new ArrayList<>());
+ break;
+ case 3:
+ chunkWriter = generateDoubleData(j, 0L, new ArrayList<>());
+ break;
+ case 4:
+ default:
+ chunkWriter = generateTextData(j, 0L, new ArrayList<>());
+ break;
+ }
+ chunkWriter.writeToFileWriter(writer);
+ }
+ originChunkMetadataList.addAll(writer.chunkMetadataList);
+ writer.endChunkGroup();
+ }
+ writer.sortAndFlushChunkMetadata();
+ writer.tempOutput.flush();
+
+ TSMIterator iterator =
+ TSMIterator.getTSMIteratorInDisk(
+ writer.chunkMetadataTempFile,
+ writer.chunkGroupMetadataList,
+ writer.endPosInCMTForDevice);
+ for (int i = 0; iterator.hasNext(); ++i) {
+ Pair<String, TimeseriesMetadata> timeseriesMetadataPair = iterator.next();
+ TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right;
+ Assert.assertEquals(sortedSeriesId.get(i % 5), timeseriesMetadata.getMeasurementId());
+ Assert.assertEquals(
+ originChunkMetadataList.get(i).getDataType(), timeseriesMetadata.getTSDataType());
+ Assert.assertEquals(
+ originChunkMetadataList.get(i).getStatistics(), timeseriesMetadata.getStatistics());
+ }
+ }
+ }
+
+ @Test
+ public void testSerializeAndDeserializeAlignedChunkMetadata() throws IOException {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) {
+ List<ChunkMetadata> originChunkMetadataList = new ArrayList<>();
+ for (int i = 0; i < 10; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ AlignedChunkWriterImpl chunkWriter = generateVectorData(0L, new ArrayList<>(), 6);
+ chunkWriter.writeToFileWriter(writer);
+ originChunkMetadataList.addAll(writer.chunkMetadataList);
+ writer.endChunkGroup();
+ }
+ writer.sortAndFlushChunkMetadata();
+ writer.tempOutput.flush();
+
+ List<String> measurementIds = new ArrayList<>();
+ for (int i = 0; i < 10; ++i) {
+ measurementIds.add(sortedDeviceId.get(i) + ".");
+ for (int j = 1; j <= 6; ++j) {
+ measurementIds.add(sortedDeviceId.get(i) + ".s" + j);
+ }
+ }
+ TSMIterator iterator =
+ TSMIterator.getTSMIteratorInDisk(
+ writer.chunkMetadataTempFile, new ArrayList<>(), writer.endPosInCMTForDevice);
+ for (int i = 0; iterator.hasNext(); ++i) {
+ Pair<String, TimeseriesMetadata> timeseriesMetadataPair = iterator.next();
+ String fullPath = timeseriesMetadataPair.left;
+ TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right;
+ Assert.assertEquals(measurementIds.get(i), fullPath);
+ Assert.assertEquals(
+ originChunkMetadataList.get(i).getDataType(), timeseriesMetadata.getTSDataType());
+ Assert.assertEquals(
+ originChunkMetadataList.get(i).getStatistics(), timeseriesMetadata.getStatistics());
+ }
+ }
+ }
+
+ @Test
+ public void testSerializeAndDeserializeMixedChunkMetadata() throws IOException {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) {
+ List<IChunkMetadata> originChunkMetadataList = new ArrayList<>();
+ List<String> seriesIds = new ArrayList<>();
+ for (int i = 0; i < 10; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ if (i % 2 == 0) {
+ // write normal series
+ for (int j = 0; j < 5; ++j) {
+ ChunkWriterImpl chunkWriter;
+ switch (j) {
+ case 0:
+ chunkWriter = generateIntData(j, 0L, new ArrayList<>());
+ break;
+ case 1:
+ chunkWriter = generateBooleanData(j, 0L, new ArrayList<>());
+ break;
+ case 2:
+ chunkWriter = generateFloatData(j, 0L, new ArrayList<>());
+ break;
+ case 3:
+ chunkWriter = generateDoubleData(j, 0L, new ArrayList<>());
+ break;
+ case 4:
+ default:
+ chunkWriter = generateTextData(j, 0L, new ArrayList<>());
+ break;
+ }
+ chunkWriter.writeToFileWriter(writer);
+ seriesIds.add(deviceId + "." + sortedSeriesId.get(j));
+ }
+ } else {
+ // write vector
+ AlignedChunkWriterImpl chunkWriter = generateVectorData(0L, new ArrayList<>(), 6);
+ chunkWriter.writeToFileWriter(writer);
+ seriesIds.add(deviceId + ".");
+ for (int l = 1; l <= 6; ++l) {
+ seriesIds.add(deviceId + ".s" + l);
+ }
+ }
+ originChunkMetadataList.addAll(writer.chunkMetadataList);
+ writer.endChunkGroup();
+ }
+ writer.sortAndFlushChunkMetadata();
+ writer.tempOutput.flush();
+
+ TSMIterator iterator =
+ TSMIterator.getTSMIteratorInDisk(
+ writer.chunkMetadataTempFile, new ArrayList<>(), writer.endPosInCMTForDevice);
+ for (int i = 0; i < originChunkMetadataList.size(); ++i) {
+ Pair<String, TimeseriesMetadata> timeseriesMetadataPair = iterator.next();
+ Assert.assertEquals(seriesIds.get(i), timeseriesMetadataPair.left);
+ Assert.assertEquals(
+ originChunkMetadataList.get(i).getDataType(),
+ timeseriesMetadataPair.right.getTSDataType());
+ Assert.assertEquals(
+ originChunkMetadataList.get(i).getStatistics(),
+ timeseriesMetadataPair.right.getStatistics());
+ }
+ }
+ }
+
+ /** The following tests is for writing normal series in different nums. */
+
+ /**
+ * Write a file with 10 devices and 5 series in each device. For each series, we write one chunk
+ * for it. This test make sure that each chunk
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testWriteCompleteFileWithNormalChunk() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < 10; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ for (int j = 0; j < 5; ++j) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ ChunkWriterImpl chunkWriter;
+ switch (j) {
+ case 0:
+ chunkWriter = generateIntData(j, 0L, valList);
+ break;
+ case 1:
+ chunkWriter = generateBooleanData(j, 0L, valList);
+ break;
+ case 2:
+ chunkWriter = generateFloatData(j, 0L, valList);
+ break;
+ case 3:
+ chunkWriter = generateDoubleData(j, 0L, valList);
+ break;
+ case 4:
+ default:
+ chunkWriter = generateTextData(j, 0L, valList);
+ break;
+ }
+ chunkWriter.writeToFileWriter(writer);
+ writer.checkMetadataSizeAndMayFlush();
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ writer.endChunkGroup();
+ }
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ writer.endFile();
+ }
+ Assert.assertFalse(
+ new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+ .exists());
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+ }
+
+ /**
+ * Write a file with 10 devices and 5 series in each device. For each series, we write 100 chunks
+ * for it. This test make sure that each chunk
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testWriteCompleteFileWithMultipleNormalChunk() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < 10; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ for (int j = 0; j < 5; ++j) {
+ ChunkWriterImpl chunkWriter;
+ switch (j) {
+ case 0:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 1:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 2:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 3:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 4:
+ default:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ }
+ writer.checkMetadataSizeAndMayFlush();
+ }
+ writer.endChunkGroup();
+ }
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ writer.endFile();
+ }
+ Assert.assertFalse(
+ new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+ .exists());
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+ }
+
+ /**
+ * Write a file with 10 devices and 5 series in each device. For each series, we write 100 chunks
+ * for it. We maintain some chunk metadata in memory when calling endFile().
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testWriteCompleteFileWithMetadataRemainsInMemoryWhenEndFile() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < 10; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ for (int j = 0; j < 5; ++j) {
+ ChunkWriterImpl chunkWriter;
+ switch (j) {
+ case 0:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 1:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 2:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 3:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 4:
+ default:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ }
+ if (i < 9) {
+ writer.checkMetadataSizeAndMayFlush();
+ }
+ }
+ writer.endChunkGroup();
+ }
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ Assert.assertFalse(writer.chunkGroupMetadataList.isEmpty());
+ writer.endFile();
+ }
+ Assert.assertFalse(
+ new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+ .exists());
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+ }
+
+ /**
+ * Write a file with 2 devices and 5 series in each device. For each series, we write 1024 chunks
+ * for it. This test make sure that each chunk
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testWriteCompleteFileWithEnormousNormalChunk() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+ long originTestChunkSize = TEST_CHUNK_SIZE;
+ TEST_CHUNK_SIZE = 10;
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < 2; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ for (int j = 0; j < 5; ++j) {
+ ChunkWriterImpl chunkWriter;
+ switch (j) {
+ case 0:
+ for (int k = 0; k < 1024; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 1:
+ for (int k = 0; k < 1024; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 2:
+ for (int k = 0; k < 1024; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 3:
+ for (int k = 0; k < 1024; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 4:
+ default:
+ for (int k = 0; k < 1024; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ }
+ writer.checkMetadataSizeAndMayFlush();
+ }
+ writer.endChunkGroup();
+ }
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ writer.endFile();
+ } finally {
+ TEST_CHUNK_SIZE = originTestChunkSize;
+ }
+ Assert.assertFalse(
+ new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+ .exists());
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+ }
+
+ /**
+ * Write a file with 2 devices and 1024 series in each device. For each series, we write 50 chunks
+ * for it. This test make sure that each chunk
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testWriteCompleteFileWithEnormousSeriesNum() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originTimes = new HashMap<>();
+ long originTestChunkSize = TEST_CHUNK_SIZE;
+ TEST_CHUNK_SIZE = 1;
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < 2; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ for (int j = 0; j < 1024; ++j) {
+ ChunkWriterImpl chunkWriter;
+ switch (j % 5) {
+ case 0:
+ for (int k = 0; k < 50; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 1:
+ for (int k = 0; k < 50; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 2:
+ for (int k = 0; k < 50; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 3:
+ for (int k = 0; k < 50; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 4:
+ default:
+ for (int k = 0; k < 50; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ }
+ writer.checkMetadataSizeAndMayFlush();
+ }
+ writer.endChunkGroup();
+ }
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ writer.endFile();
+ } finally {
+ TEST_CHUNK_SIZE = originTestChunkSize;
+ }
+ Assert.assertFalse(
+ new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+ .exists());
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originTimes);
+ }
+
+ /**
+ * Write a file with 1024 devices and 5 series in each device. For each series, we write 10 chunks
+ * for it. This test make sure that each chunk
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testWriteCompleteFileWithEnormousDeviceNum() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originTimes = new HashMap<>();
+ long originTestChunkSize = TEST_CHUNK_SIZE;
+ TEST_CHUNK_SIZE = 10;
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < 1024; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ for (int j = 0; j < 5; ++j) {
+ ChunkWriterImpl chunkWriter;
+ switch (j % 5) {
+ case 0:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 1:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 2:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 3:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 4:
+ default:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ }
+ writer.checkMetadataSizeAndMayFlush();
+ }
+ writer.endChunkGroup();
+ }
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ writer.endFile();
+ } finally {
+ TEST_CHUNK_SIZE = originTestChunkSize;
+ }
+ Assert.assertFalse(
+ new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+ .exists());
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originTimes);
+ }
+
+ /** The following tests is for writing aligned series. */
+
+ /**
+ * Test writing 10 align series, 6 in a group.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testWriteCompleteFileWithAlignedSeriesWithOneChunk() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < 10; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
+ AlignedChunkWriterImpl chunkWriter = generateVectorData(0L, valList, 6);
+ for (int j = 1; j <= 6; ++j) {
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent("s" + j, x -> new ArrayList<>())
+ .add(valList.get(j - 1));
+ }
+
+ chunkWriter.writeToFileWriter(writer);
+ writer.endChunkGroup();
+ writer.checkMetadataSizeAndMayFlush();
+ }
+ writer.endFile();
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ }
+ Assert.assertFalse(
+ new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+ .exists());
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+ }
+
+ /**
+ * Test writing 1 aligned series, for each series we write 512 chunks
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testWriteCompleteFileWithAlignedSeriesWithMultiChunks() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+ int chunkNum = 512, seriesNum = 6;
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < 1; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ for (int k = 0; k < chunkNum; ++k) {
+ writer.startChunkGroup(deviceId);
+ List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
+ AlignedChunkWriterImpl chunkWriter =
+ generateVectorData(k * TEST_CHUNK_SIZE, valList, seriesNum);
+ for (int j = 1; j <= seriesNum; ++j) {
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent("s" + j, x -> new ArrayList<>())
+ .add(valList.get(j - 1));
+ }
+
+ chunkWriter.writeToFileWriter(writer);
+ writer.endChunkGroup();
+ }
+ writer.checkMetadataSizeAndMayFlush();
+ }
+ writer.endFile();
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ }
+ Assert.assertFalse(
+ new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+ .exists());
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+ }
+
+ /**
+ * Test write aligned chunk metadata, for each aligned series, we write 1024 components.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testWriteCompleteFileWithAlignedSeriesWithManyComponents() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+ int chunkNum = 5, seriesNum = 1024;
+ long originTestPointNum = TEST_CHUNK_SIZE;
+ TEST_CHUNK_SIZE = 10;
+ try {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < 10; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ for (int k = 0; k < chunkNum; ++k) {
+ writer.startChunkGroup(deviceId);
+ List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
+ AlignedChunkWriterImpl chunkWriter =
+ generateVectorData(k * TEST_CHUNK_SIZE, valList, seriesNum);
+ for (int j = 1; j <= seriesNum; ++j) {
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent("s" + j, x -> new ArrayList<>())
+ .add(valList.get(j - 1));
+ }
+
+ chunkWriter.writeToFileWriter(writer);
+ writer.endChunkGroup();
+ }
+ writer.checkMetadataSizeAndMayFlush();
+ }
+ writer.endFile();
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ }
+ } finally {
+ TEST_CHUNK_SIZE = originTestPointNum;
+ }
+ Assert.assertFalse(
+ new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+ .exists());
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+ }
+
+ @Test
+ public void testWriteCompleteFileWithLotsAlignedSeries() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+ int chunkNum = 5, seriesNum = 12;
+ long originTestPointNum = TEST_CHUNK_SIZE;
+ TEST_CHUNK_SIZE = 10;
+ int deviceNum = 1024;
+ try {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < deviceNum; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ for (int k = 0; k < chunkNum; ++k) {
+ writer.startChunkGroup(deviceId);
+ List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
+ AlignedChunkWriterImpl chunkWriter =
+ generateVectorData(k * TEST_CHUNK_SIZE, valList, seriesNum);
+ for (int j = 1; j <= seriesNum; ++j) {
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent("s" + j, x -> new ArrayList<>())
+ .add(valList.get(j - 1));
+ }
+
+ chunkWriter.writeToFileWriter(writer);
+ writer.endChunkGroup();
+ }
+ writer.checkMetadataSizeAndMayFlush();
+ }
+ writer.endFile();
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ }
+ } finally {
+ TEST_CHUNK_SIZE = originTestPointNum;
+ }
+ Assert.assertFalse(
+ new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+ .exists());
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+ }
+
+ @Test
+ public void testWritingAlignedSeriesByColumnWithMultiComponents() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue = new HashMap<>();
+ TEST_CHUNK_SIZE = 10;
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < 5; i++) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ TSEncoding timeEncoding =
+ TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
+ TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
+ Encoder encoder = TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType);
+ for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) {
+ TimeChunkWriter timeChunkWriter =
+ new TimeChunkWriter("", CompressionType.SNAPPY, TSEncoding.PLAIN, encoder);
+ for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) {
+ timeChunkWriter.write(j);
+ }
+ timeChunkWriter.writeToFileWriter(writer);
+ }
+ writer.sortAndFlushChunkMetadata();
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ for (int k = 0; k < 1024; ++k) {
+ TSEncodingBuilder builder = TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN);
+ builder.initFromProps(null);
+ for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) {
+ ValueChunkWriter chunkWriter =
+ new ValueChunkWriter(
+ sortedSeriesId.get(k),
+ CompressionType.SNAPPY,
+ TSDataType.DOUBLE,
+ TSEncoding.PLAIN,
+ builder.getEncoder(TSDataType.DOUBLE));
+ Random random = new Random();
+ List<Pair<Long, TsPrimitiveType>> valueList = new ArrayList<>();
+ for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) {
+ double val = random.nextDouble();
+ chunkWriter.write(j, val, false);
+ valueList.add(new Pair<>((long) j, new TsPrimitiveType.TsDouble(val)));
+ }
+ chunkWriter.writeToFileWriter(writer);
+ originValue
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(k), x -> new ArrayList<>())
+ .add(valueList);
+ }
+ writer.sortAndFlushChunkMetadata();
+ }
+ writer.endChunkGroup();
+ }
+ writer.endFile();
+ }
+ Assert.assertFalse(
+ new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+ .exists());
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originValue);
+ }
+
+ @Test
+ public void testWritingCompleteMixedFiles() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < 5; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ for (int k = 0; k < 10; ++k) {
+ writer.startChunkGroup(deviceId);
+ List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
+ AlignedChunkWriterImpl chunkWriter = generateVectorData(k * TEST_CHUNK_SIZE, valList, 6);
+ for (int j = 1; j <= 6; ++j) {
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent("s" + j, x -> new ArrayList<>())
+ .add(valList.get(j - 1));
+ }
+
+ chunkWriter.writeToFileWriter(writer);
+ writer.endChunkGroup();
+ }
+ writer.checkMetadataSizeAndMayFlush();
+ }
+ for (int i = 5; i < 10; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ for (int j = 0; j < 5; ++j) {
+ ChunkWriterImpl chunkWriter;
+ switch (j) {
+ case 0:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 1:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 2:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 3:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 4:
+ default:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ }
+ writer.checkMetadataSizeAndMayFlush();
+ }
+ writer.endChunkGroup();
+ }
+ writer.endFile();
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ }
+ Assert.assertFalse(
+ new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+ .exists());
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+ }
+
+ @Test
+ public void testWritingAlignedSeriesByColumn() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue = new HashMap<>();
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < 5; i++) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ TSEncoding timeEncoding =
+ TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
+ TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
+ Encoder encoder = TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType);
+ TimeChunkWriter timeChunkWriter =
+ new TimeChunkWriter("", CompressionType.SNAPPY, TSEncoding.PLAIN, encoder);
+ for (int j = 0; j < TEST_CHUNK_SIZE; ++j) {
+ timeChunkWriter.write(j);
+ }
+ timeChunkWriter.writeToFileWriter(writer);
+ writer.sortAndFlushChunkMetadata();
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ for (int k = 0; k < 5; ++k) {
+ TSEncodingBuilder builder = TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN);
+ builder.initFromProps(null);
+ ValueChunkWriter chunkWriter =
+ new ValueChunkWriter(
+ sortedSeriesId.get(k),
+ CompressionType.SNAPPY,
+ TSDataType.DOUBLE,
+ TSEncoding.PLAIN,
+ builder.getEncoder(TSDataType.DOUBLE));
+ Random random = new Random();
+ List<Pair<Long, TsPrimitiveType>> valueList = new ArrayList<>();
+ for (int j = 0; j < TEST_CHUNK_SIZE; ++j) {
+ double val = random.nextDouble();
+ chunkWriter.write(j, val, false);
+ valueList.add(new Pair<>((long) j, new TsPrimitiveType.TsDouble(val)));
+ }
+ chunkWriter.writeToFileWriter(writer);
+ originValue
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(k), x -> new ArrayList<>())
+ .add(valueList);
+ writer.sortAndFlushChunkMetadata();
+ }
+ writer.endChunkGroup();
+ }
+ writer.endFile();
+ }
+ Assert.assertFalse(
+ new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+ .exists());
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originValue);
+ }
+
+ @Test
+ public void testWritingAlignedSeriesByColumnWithMultiChunks() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue = new HashMap<>();
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < 5; i++) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ TSEncoding timeEncoding =
+ TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
+ TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
+ Encoder encoder = TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType);
+ for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) {
+ TimeChunkWriter timeChunkWriter =
+ new TimeChunkWriter("", CompressionType.SNAPPY, TSEncoding.PLAIN, encoder);
+ for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) {
+ timeChunkWriter.write(j);
+ }
+ timeChunkWriter.writeToFileWriter(writer);
+ }
+ writer.sortAndFlushChunkMetadata();
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ for (int k = 0; k < 5; ++k) {
+ TSEncodingBuilder builder = TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN);
+ builder.initFromProps(null);
+ for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) {
+ ValueChunkWriter chunkWriter =
+ new ValueChunkWriter(
+ sortedSeriesId.get(k),
+ CompressionType.SNAPPY,
+ TSDataType.DOUBLE,
+ TSEncoding.PLAIN,
+ builder.getEncoder(TSDataType.DOUBLE));
+ Random random = new Random();
+ List<Pair<Long, TsPrimitiveType>> valueList = new ArrayList<>();
+ for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) {
+ double val = random.nextDouble();
+ chunkWriter.write(j, val, false);
+ valueList.add(new Pair<>((long) j, new TsPrimitiveType.TsDouble(val)));
+ }
+ chunkWriter.writeToFileWriter(writer);
+ originValue
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(k), x -> new ArrayList<>())
+ .add(valueList);
+ }
+ writer.sortAndFlushChunkMetadata();
+ }
+ writer.endChunkGroup();
+ }
+ writer.endFile();
+ }
+ Assert.assertFalse(
+ new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)
+ .exists());
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originValue);
+ }
+
+ /** The following tests is for writing mixed of normal series and aligned series */
+ private ChunkWriterImpl generateIntData(
+ int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
+ ChunkWriterImpl chunkWriter =
+ new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.INT64));
+ Random random = new Random();
+ for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+ long val = random.nextLong();
+ chunkWriter.write(i, val);
+ record.add(new Pair<>(i, new TsPrimitiveType.TsLong(val)));
+ }
+ return chunkWriter;
+ }
+
+ private ChunkWriterImpl generateFloatData(
+ int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
+ ChunkWriterImpl chunkWriter =
+ new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.FLOAT));
+ Random random = new Random();
+ for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+ float val = random.nextFloat();
+ chunkWriter.write(i, val);
+ record.add(new Pair<>(i, new TsPrimitiveType.TsFloat(val)));
+ }
+ return chunkWriter;
+ }
+
+ private ChunkWriterImpl generateDoubleData(
+ int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
+ ChunkWriterImpl chunkWriter =
+ new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.DOUBLE));
+ Random random = new Random();
+ for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+ double val = random.nextDouble();
+ chunkWriter.write(i, val);
+ record.add(new Pair<>(i, new TsPrimitiveType.TsDouble(val)));
+ }
+ return chunkWriter;
+ }
+
+ private ChunkWriterImpl generateBooleanData(
+ int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
+ ChunkWriterImpl chunkWriter =
+ new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.BOOLEAN));
+ Random random = new Random();
+ for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+ boolean val = random.nextBoolean();
+ chunkWriter.write(i, val);
+ record.add(new Pair<>(i, new TsPrimitiveType.TsBoolean(val)));
+ }
+ return chunkWriter;
+ }
+
+ private AlignedChunkWriterImpl generateVectorData(
+ long startTime, List<List<Pair<Long, TsPrimitiveType>>> record, int seriesNum) {
+ List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
+ TSDataType[] dataTypes =
+ new TSDataType[] {
+ TSDataType.INT32,
+ TSDataType.INT64,
+ TSDataType.FLOAT,
+ TSDataType.DOUBLE,
+ TSDataType.BOOLEAN,
+ TSDataType.TEXT
+ };
+ for (int i = 0; i < seriesNum; ++i) {
+ measurementSchemas.add(new MeasurementSchema("s" + (i + 1), dataTypes[i % 6]));
+ }
+ AlignedChunkWriterImpl chunkWriter = new AlignedChunkWriterImpl(measurementSchemas);
+ Random random = new Random();
+ for (int i = 0; i < seriesNum; ++i) {
+ record.add(new ArrayList<>());
+ }
+ for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+ TsPrimitiveType[] points = new TsPrimitiveType[seriesNum];
+ for (int j = 0; j < seriesNum; ++j) {
+ switch (j % 6) {
+ case 0:
+ points[j] = new TsPrimitiveType.TsInt(random.nextInt());
+ break;
+ case 1:
+ points[j] = new TsPrimitiveType.TsLong(random.nextLong());
+ break;
+ case 2:
+ points[j] = new TsPrimitiveType.TsFloat(random.nextFloat());
+ break;
+ case 3:
+ points[j] = new TsPrimitiveType.TsDouble(random.nextDouble());
+ break;
+ case 4:
+ points[j] = new TsPrimitiveType.TsBoolean(random.nextBoolean());
+ break;
+ case 5:
+ points[j] =
+ new TsPrimitiveType.TsBinary(new Binary(String.valueOf(random.nextDouble())));
+ break;
+ }
+ }
+ for (int j = 0; j < seriesNum; ++j) {
+ record.get(j).add(new Pair<>(i, points[j]));
+ }
+ chunkWriter.write(i, points);
+ }
+ return chunkWriter;
+ }
+
+ private ChunkWriterImpl generateTextData(
+ int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
+ ChunkWriterImpl chunkWriter =
+ new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.TEXT));
+ Random random = new Random();
+ for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+ Binary val = new Binary(String.valueOf(random.nextDouble()));
+ chunkWriter.write(i, val);
+ record.add(new Pair<>(i, new TsPrimitiveType.TsBinary(val)));
+ }
+ return chunkWriter;
+ }
+}