You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/09/22 01:03:21 UTC
[iotdb] branch master updated: [IOTDB-4251] Persist ChunkMetadata in TsFileIOWriter ahead of time to save memory (#7276)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 265992dc21 [IOTDB-4251] Persist ChunkMetadata in TsFileIOWriter ahead of time to save memory (#7276)
265992dc21 is described below
commit 265992dc2166d80492a6a9bce3b1768514b23865
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Thu Sep 22 09:03:15 2022 +0800
[IOTDB-4251] Persist ChunkMetadata in TsFileIOWriter ahead of time to save memory (#7276)
---
.../resources/conf/iotdb-datanode.properties | 8 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 21 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 11 +
.../rewrite/task/ReadPointPerformerSubTask.java | 10 +-
.../utils/AlignedSeriesCompactionExecutor.java | 1 +
.../utils/SingleSeriesCompactionExecutor.java | 1 +
.../compaction/performer/ICompactionPerformer.java | 4 +-
.../impl/ReadChunkCompactionPerformer.java | 15 +-
.../impl/ReadPointCompactionPerformer.java | 90 +-
.../writer/AbstractCompactionWriter.java | 12 +-
.../writer/CrossSpaceCompactionWriter.java | 35 +-
.../writer/InnerSpaceCompactionWriter.java | 30 +-
.../iotdb/db/engine/flush/MemTableFlushTask.java | 23 +-
.../db/engine/storagegroup/TsFileProcessor.java | 9 +-
.../db/engine/storagegroup/TsFileResource.java | 8 +-
.../iotdb/db/tools/TsFileSplitByPartitionTool.java | 4 +-
.../file/AbstractTsFileRecoverPerformer.java | 18 +-
.../ReadPointCompactionPerformerTest.java | 70 +-
.../compaction/inner/InnerSeqCompactionTest.java | 7 +-
.../compaction/inner/InnerUnseqCompactionTest.java | 13 +-
.../inner/ReadChunkCompactionPerformerOldTest.java | 3 +-
.../iotdb/tsfile/file/metadata/ChunkMetadata.java | 8 +
.../file/metadata/MetadataIndexConstructor.java | 9 +-
.../tsfile/file/metadata/MetadataIndexNode.java | 2 +-
.../iotdb/tsfile/file/metadata/TsFileMetadata.java | 9 +-
.../write/writer/RestorableTsFileIOWriter.java | 12 +
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 337 ++++--
.../write/writer/tsmiterator/DiskTSMIterator.java | 132 ++
.../write/writer/tsmiterator/TSMIterator.java | 147 +++
.../tsfile/write/TsFileIntegrityCheckingTool.java | 251 ++++
.../writer/TsFileIOWriterMemoryControlTest.java | 1261 ++++++++++++++++++++
31 files changed, 2359 insertions(+), 202 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 699e30407a..cd759c7220 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -466,6 +466,10 @@ timestamp_precision=ms
# Datatype: int
# primitive_array_size=32
+# the percentage of write memory for chunk metadata remains in a single file writer when flushing memtable
+# Datatype: double
+# chunk_metadata_size_proportion_in_write=0.1
+
# Ratio of write memory for invoking flush disk, 0.4 by default
# If you have extremely high write load (like batch=1000), it can be set lower than the default value like 0.2
# Datatype: double
@@ -582,6 +586,10 @@ timestamp_precision=ms
# BALANCE: alternate two compaction types
# compaction_priority=BALANCE
+# size proportion for chunk metadata maintains in memory when compacting
+# Datatype: double
+# chunk_metadata_size_proportion_in_compaction=0.05
+
# The target tsfile size in compaction
# Datatype: long, Unit: byte
# target_compaction_file_size=1073741824
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 734e53fe35..853e37d893 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -149,6 +149,8 @@ public class IoTDBConfig {
/** The proportion of write memory for memtable */
private double writeProportion = 0.8;
+ private double chunkMetadataSizeProportionInWrite = 0.1;
+
/** The proportion of write memory for compaction */
private double compactionProportion = 0.2;
@@ -436,6 +438,8 @@ public class IoTDBConfig {
*/
private CompactionPriority compactionPriority = CompactionPriority.BALANCE;
+ private double chunkMetadataSizeProportionInCompaction = 0.05;
+
/** The target tsfile size in compaction, 1 GB by default */
private long targetCompactionFileSize = 1073741824L;
@@ -3200,6 +3204,23 @@ public class IoTDBConfig {
this.throttleThreshold = throttleThreshold;
}
+ public double getChunkMetadataSizeProportionInWrite() {
+ return chunkMetadataSizeProportionInWrite;
+ }
+
+ public void setChunkMetadataSizeProportionInWrite(double chunkMetadataSizeProportionInWrite) {
+ this.chunkMetadataSizeProportionInWrite = chunkMetadataSizeProportionInWrite;
+ }
+
+ public double getChunkMetadataSizeProportionInCompaction() {
+ return chunkMetadataSizeProportionInCompaction;
+ }
+
+ public void setChunkMetadataSizeProportionInCompaction(
+ double chunkMetadataSizeProportionInCompaction) {
+ this.chunkMetadataSizeProportionInCompaction = chunkMetadataSizeProportionInCompaction;
+ }
+
public long getCacheWindowTimeInMs() {
return cacheWindowTimeInMs;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index a0117f918c..081b54d084 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -629,6 +629,11 @@ public class IoTDBDescriptor {
properties.getProperty(
"concurrent_compaction_thread",
Integer.toString(conf.getConcurrentCompactionThread()))));
+ conf.setChunkMetadataSizeProportionInCompaction(
+ Double.parseDouble(
+ properties.getProperty(
+ "chunk_metadata_size_proportion_in_compaction",
+ Double.toString(conf.getChunkMetadataSizeProportionInCompaction()))));
conf.setTargetCompactionFileSize(
Long.parseLong(
properties.getProperty(
@@ -1458,6 +1463,12 @@ public class IoTDBDescriptor {
// update tsfile-format config
loadTsFileProps(properties);
+ conf.setChunkMetadataSizeProportionInWrite(
+ Double.parseDouble(
+ properties.getProperty(
+ "chunk_metadata_size_proportion_in_write",
+ Double.toString(conf.getChunkMetadataSizeProportionInWrite()))));
+
// update max_deduplicated_path_num
conf.setMaxQueryDeduplicatedPathNum(
Integer.parseInt(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java
index bc04c09c33..759975d048 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java
@@ -30,10 +30,10 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.Callable;
/**
@@ -45,7 +45,7 @@ public class ReadPointPerformerSubTask implements Callable<Void> {
private static final Logger logger =
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
private final String device;
- private final Set<String> measurementList;
+ private final List<String> measurementList;
private final FragmentInstanceContext fragmentInstanceContext;
private final QueryDataSource queryDataSource;
private final AbstractCompactionWriter compactionWriter;
@@ -54,7 +54,7 @@ public class ReadPointPerformerSubTask implements Callable<Void> {
public ReadPointPerformerSubTask(
String device,
- Set<String> measurementList,
+ List<String> measurementList,
FragmentInstanceContext fragmentInstanceContext,
QueryDataSource queryDataSource,
AbstractCompactionWriter compactionWriter,
@@ -79,7 +79,7 @@ public class ReadPointPerformerSubTask implements Callable<Void> {
device,
Collections.singletonList(measurement),
measurementSchemas,
- schemaMap.keySet(),
+ new ArrayList<>(schemaMap.keySet()),
fragmentInstanceContext,
queryDataSource,
false);
@@ -87,7 +87,7 @@ public class ReadPointPerformerSubTask implements Callable<Void> {
if (dataBlockReader.hasNextBatch()) {
compactionWriter.startMeasurement(measurementSchemas, taskId);
ReadPointCompactionPerformer.writeWithReader(
- compactionWriter, dataBlockReader, taskId, false);
+ compactionWriter, dataBlockReader, device, taskId, false);
compactionWriter.endMeasurement(taskId);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
index c9c36378f5..3de4c64a36 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
@@ -151,6 +151,7 @@ public class AlignedSeriesCompactionExecutor {
chunkWriter.estimateMaxSeriesMemSize());
chunkWriter.writeToFileWriter(writer);
}
+ writer.checkMetadataSizeAndMayFlush();
}
private void compactOneAlignedChunk(AlignedChunkReader chunkReader) throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
index 40f2632d0e..d1d4a366e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
@@ -151,6 +151,7 @@ public class SingleSeriesCompactionExecutor {
} else if (pointCountInChunkWriter != 0L) {
flushChunkWriter();
}
+ fileWriter.checkMetadataSizeAndMayFlush();
targetResource.updateStartTime(device, minStartTimestamp);
targetResource.updateEndTime(device, maxEndTimestamp);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/ICompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/ICompactionPerformer.java
index 172eb50ee7..2799c3236b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/ICompactionPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/ICompactionPerformer.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.ExecutionException;
/**
* CompactionPerformer is used to compact multiple files into one or multiple files. Different
@@ -35,7 +36,8 @@ import java.util.List;
public interface ICompactionPerformer {
void perform()
- throws IOException, MetadataException, StorageEngineException, InterruptedException;
+ throws IOException, MetadataException, StorageEngineException, InterruptedException,
+ ExecutionException;
void setTargetFiles(List<TsFileResource> targetFiles);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java
index 1a94214848..ac0fa1ddc4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.compaction.performer.impl;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.inner.utils.AlignedSeriesCompactionExecutor;
import org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
import org.apache.iotdb.db.engine.compaction.inner.utils.SingleSeriesCompactionExecutor;
@@ -28,11 +29,11 @@ import org.apache.iotdb.db.engine.compaction.performer.ISeqCompactionPerformer;
import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
@@ -63,8 +64,17 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
@Override
public void perform()
throws IOException, MetadataException, InterruptedException, StorageEngineException {
+ // size for file writer is 5% of per compaction task memory budget
+ long sizeForFileWriter =
+ (long)
+ (SystemInfo.getInstance().getMemorySizeForCompaction()
+ / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()
+ * IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getChunkMetadataSizeProportionInCompaction());
try (MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(seqFiles);
- TsFileIOWriter writer = new TsFileIOWriter(targetResource.getTsFile())) {
+ TsFileIOWriter writer =
+ new TsFileIOWriter(targetResource.getTsFile(), true, sizeForFileWriter)) {
while (deviceIterator.hasNextDevice()) {
Pair<String, Boolean> deviceInfo = deviceIterator.nextDevice();
String device = deviceInfo.left;
@@ -138,7 +148,6 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
checkThreadInterrupted();
// TODO: we can provide a configuration item to enable concurrent between each series
PartialPath p = new PartialPath(device, seriesIterator.nextSeries());
- IMeasurementSchema measurementSchema;
// TODO: seriesIterator needs to be refactor.
// This statement must be called before next hasNextSeries() called, or it may be trapped in a
// dead-loop.
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
index de9d36ea16..d2940da096 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
@@ -61,7 +61,6 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
@@ -97,7 +96,8 @@ public class ReadPointCompactionPerformer
@Override
public void perform()
- throws IOException, MetadataException, StorageEngineException, InterruptedException {
+ throws IOException, MetadataException, StorageEngineException, InterruptedException,
+ ExecutionException {
long queryId = QueryResourceManager.getInstance().assignCompactionQueryId();
FragmentInstanceContext fragmentInstanceContext =
FragmentInstanceContext.createFragmentInstanceContextForCompaction(queryId);
@@ -128,7 +128,6 @@ public class ReadPointCompactionPerformer
}
compactionWriter.endFile();
- updateDeviceStartTimeAndEndTime(targetFiles, compactionWriter);
updatePlanIndexes(targetFiles, seqFiles, unseqFiles);
} finally {
QueryResourceManager.getInstance().endQuery(queryId);
@@ -166,7 +165,7 @@ public class ReadPointCompactionPerformer
device,
existedMeasurements,
measurementSchemas,
- schemaMap.keySet(),
+ new ArrayList<>(schemaMap.keySet()),
fragmentInstanceContext,
queryDataSource,
true);
@@ -175,9 +174,10 @@ public class ReadPointCompactionPerformer
// chunkgroup is serialized only when at least one timeseries under this device has data
compactionWriter.startChunkGroup(device, true);
compactionWriter.startMeasurement(measurementSchemas, 0);
- writeWithReader(compactionWriter, dataBlockReader, 0, true);
+ writeWithReader(compactionWriter, dataBlockReader, device, 0, true);
compactionWriter.endMeasurement(0);
compactionWriter.endChunkGroup();
+ compactionWriter.checkAndMayFlushChunkMetadata();
}
}
@@ -187,45 +187,36 @@ public class ReadPointCompactionPerformer
AbstractCompactionWriter compactionWriter,
FragmentInstanceContext fragmentInstanceContext,
QueryDataSource queryDataSource)
- throws IOException, InterruptedException {
- Map<String, MeasurementSchema> measurementSchemaMap = deviceIterator.getAllMeasurementSchemas();
- int subTaskNums = Math.min(measurementSchemaMap.size(), subTaskNum);
-
- // assign all measurements to different sub tasks
- Set<String>[] measurementsForEachSubTask = new HashSet[subTaskNums];
- int idx = 0;
- for (String measurement : measurementSchemaMap.keySet()) {
- if (measurementsForEachSubTask[idx % subTaskNums] == null) {
- measurementsForEachSubTask[idx % subTaskNums] = new HashSet<>();
- }
- measurementsForEachSubTask[idx++ % subTaskNums].add(measurement);
- }
-
+ throws IOException, InterruptedException, IllegalPathException, ExecutionException {
+ MultiTsFileDeviceIterator.MeasurementIterator measurementIterator =
+ deviceIterator.iterateNotAlignedSeries(device, false);
+ List<String> allMeasurements =
+ new ArrayList<>(deviceIterator.getAllMeasurementSchemas().keySet());
+ allMeasurements.sort((String::compareTo));
+ int subTaskNums = Math.min(allMeasurements.size(), subTaskNum);
+ Map<String, MeasurementSchema> schemaMap = deviceIterator.getAllMeasurementSchemas();
// construct sub tasks and start compacting measurements in parallel
- List<Future<Void>> futures = new ArrayList<>();
compactionWriter.startChunkGroup(device, false);
- for (int i = 0; i < subTaskNums; i++) {
- futures.add(
- CompactionTaskManager.getInstance()
- .submitSubTask(
- new ReadPointPerformerSubTask(
- device,
- measurementsForEachSubTask[i],
- fragmentInstanceContext,
- queryDataSource,
- compactionWriter,
- measurementSchemaMap,
- i)));
- }
-
- // wait for all sub tasks finish
- for (int i = 0; i < subTaskNums; i++) {
- try {
- futures.get(i).get();
- } catch (ExecutionException e) {
- LOGGER.error("[Compaction] SubCompactionTask meet errors ", e);
- throw new IOException(e);
+ for (int taskCount = 0; taskCount < allMeasurements.size(); ) {
+ List<Future<Void>> futures = new ArrayList<>();
+ for (int i = 0; i < subTaskNums && taskCount < allMeasurements.size(); i++) {
+ futures.add(
+ CompactionTaskManager.getInstance()
+ .submitSubTask(
+ new ReadPointPerformerSubTask(
+ device,
+ Collections.singletonList(allMeasurements.get(taskCount++)),
+ fragmentInstanceContext,
+ queryDataSource,
+ compactionWriter,
+ schemaMap,
+ i)));
+ }
+ for (Future<Void> future : futures) {
+ future.get();
}
+ // sync all the subtask, and check the writer chunk metadata size
+ compactionWriter.checkAndMayFlushChunkMetadata();
}
compactionWriter.endChunkGroup();
@@ -262,7 +253,7 @@ public class ReadPointCompactionPerformer
String deviceId,
List<String> measurementIds,
List<IMeasurementSchema> measurementSchemas,
- Set<String> allSensors,
+ List<String> allSensors,
FragmentInstanceContext fragmentInstanceContext,
QueryDataSource queryDataSource,
boolean isAlign)
@@ -277,11 +268,20 @@ public class ReadPointCompactionPerformer
tsDataType = measurementSchemas.get(0).getType();
}
return new SeriesDataBlockReader(
- seriesPath, allSensors, tsDataType, fragmentInstanceContext, queryDataSource, true);
+ seriesPath,
+ new HashSet<>(allSensors),
+ tsDataType,
+ fragmentInstanceContext,
+ queryDataSource,
+ true);
}
public static void writeWithReader(
- AbstractCompactionWriter writer, IDataBlockReader reader, int subTaskId, boolean isAligned)
+ AbstractCompactionWriter writer,
+ IDataBlockReader reader,
+ String device,
+ int subTaskId,
+ boolean isAligned)
throws IOException {
while (reader.hasNextBatch()) {
TsBlock tsBlock = reader.nextBatch();
@@ -289,6 +289,7 @@ public class ReadPointCompactionPerformer
writer.write(
tsBlock.getTimeColumn(),
tsBlock.getValueColumns(),
+ device,
subTaskId,
tsBlock.getPositionCount());
} else {
@@ -297,6 +298,7 @@ public class ReadPointCompactionPerformer
TimeValuePair timeValuePair = pointReader.nextTimeValuePair();
writer.write(
timeValuePair.getTimestamp(), timeValuePair.getValue().getValue(), subTaskId);
+ writer.updateStartTimeAndEndTime(device, timeValuePair.getTimestamp(), subTaskId);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
index 542c44c4f0..ae01567f9b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
@@ -72,7 +72,8 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
public abstract void write(long timestamp, Object value, int subTaskId) throws IOException;
- public abstract void write(TimeColumn timestamps, Column[] columns, int subTaskId, int batchSize)
+ public abstract void write(
+ TimeColumn timestamps, Column[] columns, String device, int subTaskId, int batchSize)
throws IOException;
public abstract void endFile() throws IOException;
@@ -140,6 +141,8 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
measurementPointCountArray[subTaskId] += 1;
}
+ public abstract void updateStartTimeAndEndTime(String device, long time, int subTaskId);
+
protected void flushChunkToFileWriter(TsFileIOWriter targetWriter, int subTaskId)
throws IOException {
writeRateLimit(chunkWriters[subTaskId].estimateMaxSeriesMemSize());
@@ -177,4 +180,11 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
}
public abstract List<TsFileIOWriter> getFileIOWriter();
+
+ public void checkAndMayFlushChunkMetadata() throws IOException {
+ List<TsFileIOWriter> writers = this.getFileIOWriter();
+ for (TsFileIOWriter writer : writers) {
+ writer.checkMetadataSizeAndMayFlush();
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
index 80902dd1d9..d192c0f6d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
@@ -18,8 +18,10 @@
*/
package org.apache.iotdb.db.engine.compaction.writer;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
@@ -37,6 +39,7 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
// source tsfiles
private List<TsFileResource> seqTsFileResources;
+ private List<TsFileResource> targetTsFileResources;
// Each sub task has its corresponding seq file index.
// The index of the array corresponds to subTaskId.
@@ -57,11 +60,21 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
public CrossSpaceCompactionWriter(
List<TsFileResource> targetResources, List<TsFileResource> seqFileResources)
throws IOException {
+ this.targetTsFileResources = targetResources;
currentDeviceEndTime = new long[seqFileResources.size()];
isEmptyFile = new boolean[seqFileResources.size()];
isDeviceExistedInTargetFiles = new boolean[targetResources.size()];
+ long memorySizeForEachWriter =
+ (long)
+ (SystemInfo.getInstance().getMemorySizeForCompaction()
+ / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()
+ * IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getChunkMetadataSizeProportionInCompaction()
+ / targetResources.size());
for (int i = 0; i < targetResources.size(); i++) {
- this.fileWriterList.add(new TsFileIOWriter(targetResources.get(i).getTsFile()));
+ this.fileWriterList.add(
+ new TsFileIOWriter(targetResources.get(i).getTsFile(), true, memorySizeForEachWriter));
isEmptyFile[i] = true;
}
this.seqTsFileResources = seqFileResources;
@@ -111,12 +124,19 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
}
@Override
- public void write(TimeColumn timestamps, Column[] columns, int subTaskId, int batchSize)
+ public void write(
+ TimeColumn timestamps, Column[] columns, String device, int subTaskId, int batchSize)
throws IOException {
// todo control time range of target tsfile
checkTimeAndMayFlushChunkToCurrentFile(timestamps.getStartTime(), subTaskId);
AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) this.chunkWriters[subTaskId];
chunkWriter.write(timestamps, columns, batchSize);
+ synchronized (this) {
+ // we need to synchronized here to avoid multi-thread competition in sub-task
+ TsFileResource resource = targetTsFileResources.get(seqFileIndexArray[subTaskId]);
+ resource.updateStartTime(device, timestamps.getStartTime());
+ resource.updateEndTime(device, timestamps.getEndTime());
+ }
checkChunkSizeAndMayOpenANewChunk(fileWriterList.get(seqFileIndexArray[subTaskId]), subTaskId);
isDeviceExistedInTargetFiles[seqFileIndexArray[subTaskId]] = true;
isEmptyFile[seqFileIndexArray[subTaskId]] = false;
@@ -192,4 +212,15 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
fileIndex++;
}
}
+
+ @Override
+ public void updateStartTimeAndEndTime(String device, long time, int subTaskId) {
+ synchronized (this) {
+ int fileIndex = seqFileIndexArray[subTaskId];
+ TsFileResource resource = targetTsFileResources.get(fileIndex);
+ // we need to synchronized here to avoid multi-thread competition in sub-task
+ resource.updateStartTime(device, time);
+ resource.updateEndTime(device, time);
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
index a73c6c2907..2c3c2e58ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
@@ -18,7 +18,9 @@
*/
package org.apache.iotdb.db.engine.compaction.writer;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
@@ -32,10 +34,19 @@ public class InnerSpaceCompactionWriter extends AbstractCompactionWriter {
private TsFileIOWriter fileWriter;
private boolean isEmptyFile;
+ private TsFileResource resource;
public InnerSpaceCompactionWriter(TsFileResource targetFileResource) throws IOException {
- this.fileWriter = new TsFileIOWriter(targetFileResource.getTsFile());
+ long sizeForFileWriter =
+ (long)
+ (SystemInfo.getInstance().getMemorySizeForCompaction()
+ / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()
+ * IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getChunkMetadataSizeProportionInCompaction());
+ this.fileWriter = new TsFileIOWriter(targetFileResource.getTsFile(), true, sizeForFileWriter);
isEmptyFile = true;
+ resource = targetFileResource;
}
@Override
@@ -65,11 +76,17 @@ public class InnerSpaceCompactionWriter extends AbstractCompactionWriter {
}
@Override
- public void write(TimeColumn timestamps, Column[] columns, int subTaskId, int batchSize)
+ public void write(
+ TimeColumn timestamps, Column[] columns, String device, int subTaskId, int batchSize)
throws IOException {
AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) this.chunkWriters[subTaskId];
chunkWriter.write(timestamps, columns, batchSize);
checkChunkSizeAndMayOpenANewChunk(fileWriter, subTaskId);
+ synchronized (this) {
+ // we need to synchronized here to avoid multi-thread competition in sub-task
+ resource.updateStartTime(device, timestamps.getStartTime());
+ resource.updateEndTime(device, timestamps.getEndTime());
+ }
isEmptyFile = false;
}
@@ -89,6 +106,15 @@ public class InnerSpaceCompactionWriter extends AbstractCompactionWriter {
fileWriter = null;
}
+ @Override
+ public void updateStartTimeAndEndTime(String device, long time, int subTaskId) {
+ // we need to synchronized here to avoid multi-thread competition in sub-task
+ synchronized (this) {
+ resource.updateStartTime(device, time);
+ resource.updateEndTime(device, time);
+ }
+ }
+
@Override
public List<TsFileIOWriter> getFileIOWriter() {
return Collections.singletonList(fileWriter);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index a3ca350fea..6c6ab965b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -38,6 +38,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -114,14 +117,19 @@ public class MemTableFlushTask {
long sortTime = 0;
// for map do not use get(key) to iterate
- for (Map.Entry<IDeviceID, IWritableMemChunkGroup> memTableEntry :
- memTable.getMemTableMap().entrySet()) {
- encodingTaskQueue.put(new StartFlushGroupIOTask(memTableEntry.getKey().toStringID()));
-
- final Map<String, IWritableMemChunk> value = memTableEntry.getValue().getMemChunkMap();
- for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) {
+ Map<IDeviceID, IWritableMemChunkGroup> memTableMap = memTable.getMemTableMap();
+ List<IDeviceID> deviceIDList = new ArrayList<>(memTableMap.keySet());
+ // sort the IDeviceID in lexicographical order
+ deviceIDList.sort(Comparator.comparing(IDeviceID::toStringID));
+ for (IDeviceID deviceID : deviceIDList) {
+ encodingTaskQueue.put(new StartFlushGroupIOTask(deviceID.toStringID()));
+
+ final Map<String, IWritableMemChunk> value = memTableMap.get(deviceID).getMemChunkMap();
+ List<String> seriesInOrder = new ArrayList<>(value.keySet());
+ seriesInOrder.sort((String::compareTo));
+ for (String seriesId : seriesInOrder) {
long startTime = System.currentTimeMillis();
- IWritableMemChunk series = iWritableMemChunkEntry.getValue();
+ IWritableMemChunk series = value.get(seriesId);
/*
* sort task (first task of flush pipeline)
*/
@@ -274,6 +282,7 @@ public class MemTableFlushTask {
this.writer.endChunkGroup();
} else {
((IChunkWriter) ioMessage).writeToFileWriter(this.writer);
+ writer.checkMetadataSizeAndMayFlush();
}
} catch (IOException e) {
LOGGER.error(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 570b97421a..336181281b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -178,7 +178,14 @@ public class TsFileProcessor {
this.storageGroupName = storageGroupName;
this.tsFileResource = new TsFileResource(tsfile, this);
this.storageGroupInfo = storageGroupInfo;
- this.writer = new RestorableTsFileIOWriter(tsfile);
+ this.writer =
+ new RestorableTsFileIOWriter(
+ tsfile,
+ (long)
+ (IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold()
+ * IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getChunkMetadataSizeProportionInWrite()));
this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback;
this.sequence = sequence;
this.walNode = WALManager.getInstance().applyForWALNode(storageGroupName);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 99e5cfd1c2..050f50cf92 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.FilePathUtils;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -505,10 +506,15 @@ public class TsFileResource {
modFile = null;
}
- /** Remove the data file, its resource file, and its modification file physically. */
+ /**
+ * Remove the data file, its resource file, its chunk metadata temp file, and its modification
+ * file physically.
+ */
public boolean remove() {
try {
fsFactory.deleteIfExists(file);
+ fsFactory.deleteIfExists(
+ new File(file.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX));
} catch (IOException e) {
LOGGER.error("TsFile {} cannot be deleted: {}", file, e.getMessage());
return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSplitByPartitionTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSplitByPartitionTool.java
index b573ed354d..ddfa8789b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSplitByPartitionTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSplitByPartitionTool.java
@@ -492,10 +492,10 @@ public class TsFileSplitByPartitionTool implements AutoCloseable {
protected TsFileResource endFileAndGenerateResource(TsFileIOWriter tsFileIOWriter)
throws IOException {
- tsFileIOWriter.endFile();
- TsFileResource tsFileResource = new TsFileResource(tsFileIOWriter.getFile());
Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap =
tsFileIOWriter.getDeviceTimeseriesMetadataMap();
+ tsFileIOWriter.endFile();
+ TsFileResource tsFileResource = new TsFileResource(tsFileIOWriter.getFile());
for (Entry<String, List<TimeseriesMetadata>> entry : deviceTimeseriesMetadataMap.entrySet()) {
String device = entry.getKey();
for (TimeseriesMetadata timeseriesMetaData : entry.getValue()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/AbstractTsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/AbstractTsFileRecoverPerformer.java
index f2c3934ccf..e506d66c3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/AbstractTsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/AbstractTsFileRecoverPerformer.java
@@ -18,13 +18,16 @@
*/
package org.apache.iotdb.db.wal.recover.file;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.tsfile.exception.NotCompatibleTsFileException;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +58,12 @@ public abstract class AbstractTsFileRecoverPerformer implements Closeable {
*/
protected void recoverWithWriter() throws DataRegionException, IOException {
File tsFile = tsFileResource.getTsFile();
+ File chunkMetadataTempFile =
+ new File(tsFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX);
+ if (chunkMetadataTempFile.exists()) {
+ // delete chunk metadata temp file
+ FileUtils.delete(chunkMetadataTempFile);
+ }
if (!tsFile.exists()) {
logger.error("TsFile {} is missing, will skip its recovery.", tsFile);
return;
@@ -68,7 +77,14 @@ public abstract class AbstractTsFileRecoverPerformer implements Closeable {
// try to remove corrupted part of the TsFile
try {
- writer = new RestorableTsFileIOWriter(tsFile);
+ writer =
+ new RestorableTsFileIOWriter(
+ tsFile,
+ (long)
+ (IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold()
+ * IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getChunkMetadataSizeProportionInWrite()));
} catch (NotCompatibleTsFileException e) {
boolean result = tsFile.delete();
logger.warn(
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
index c5b65ed8d3..efda562500 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
@@ -65,6 +65,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR;
import static org.junit.Assert.assertEquals;
@@ -95,7 +96,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testSeqInnerSpaceCompactionWithSameTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
registerTimeseriesInMManger(2, 3, false);
createFiles(5, 2, 3, 100, 0, 0, 50, 50, false, true);
@@ -170,7 +171,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testSeqInnerSpaceCompactionWithDifferentTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
registerTimeseriesInMManger(5, 5, false);
createFiles(2, 2, 3, 100, 0, 0, 50, 50, false, true);
createFiles(2, 3, 5, 50, 250, 250, 50, 50, false, true);
@@ -289,7 +290,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testUnSeqInnerSpaceCompactionWithSameTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
registerTimeseriesInMManger(2, 3, false);
createFiles(5, 2, 3, 100, 0, 0, 50, 50, false, false);
@@ -375,7 +376,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testUnSeqInnerSpaceCompactionWithDifferentTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
registerTimeseriesInMManger(9, 9, false);
createFiles(2, 2, 3, 100, 0, 0, 50, 50, false, false);
createFiles(2, 3, 5, 50, 150, 150, 50, 50, false, false);
@@ -501,7 +502,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testUnSeqInnerSpaceCompactionWithAllDataDeletedInTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(5, 7, false);
createFiles(2, 2, 3, 300, 0, 0, 0, 0, false, false);
@@ -636,7 +637,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testUnSeqInnerSpaceCompactionWithAllDataDeletedInDevice()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(5, 7, false);
createFiles(2, 2, 3, 300, 0, 0, 0, 0, false, false);
@@ -764,7 +765,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testUnSeqInnerSpaceCompactionWithAllDataDeletedInTargetFile()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(5, 7, false);
createFiles(2, 2, 3, 300, 0, 0, 0, 0, false, false);
@@ -856,7 +857,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testAlignedSeqInnerSpaceCompactionWithSameTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
registerTimeseriesInMManger(2, 3, true);
createFiles(5, 2, 3, 100, 0, 0, 50, 50, true, true);
@@ -953,7 +954,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testAlignedSeqInnerSpaceCompactionWithDifferentTimeseriesAndEmptyPage()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(50);
registerTimeseriesInMManger(5, 7, true);
createFiles(2, 2, 3, 100, 0, 0, 50, 50, true, true);
@@ -1075,7 +1076,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testAlignedSeqInnerSpaceCompactionWithDifferentTimeseriesAndEmptyChunk()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
registerTimeseriesInMManger(5, 7, true);
createFiles(2, 2, 3, 100, 0, 0, 50, 50, true, true);
createFiles(2, 3, 5, 50, 250, 250, 50, 50, true, true);
@@ -1196,7 +1197,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testAlignedUnSeqInnerSpaceCompactionWithEmptyChunkAndEmptyPage()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(5, 7, true);
createFiles(2, 2, 3, 300, 0, 0, 0, 0, true, false);
@@ -1329,7 +1330,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testAlignedUnSeqInnerSpaceCompactionWithAllDataDeletedInTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(5, 7, true);
createFiles(2, 2, 3, 300, 0, 0, 0, 0, true, false);
@@ -1510,7 +1511,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testAlignedUnSeqInnerSpaceCompactionWithAllDataDeletedInDevice()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(5, 7, true);
createFiles(2, 2, 3, 300, 0, 0, 0, 0, true, false);
@@ -1658,7 +1659,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testAlignedUnSeqInnerSpaceCompactionWithSameTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
registerTimeseriesInMManger(2, 3, true);
createFiles(5, 2, 3, 100, 0, 0, 50, 50, true, false);
@@ -1757,7 +1758,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testCrossSpaceCompactionWithSameTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
registerTimeseriesInMManger(2, 3, false);
createFiles(5, 2, 3, 100, 0, 0, 0, 0, false, true);
createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, false);
@@ -1848,7 +1849,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testCrossSpaceCompactionWithDifferentTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(4, 5, false);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
@@ -2034,7 +2035,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testCrossSpaceCompactionWithAllDataDeletedInTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(4, 5, false);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
@@ -2233,7 +2234,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testCrossSpaceCompactionWithAllDataDeletedInDevice()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(4, 5, false);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
@@ -2423,7 +2424,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testCrossSpaceCompactionWithAllDataDeletedInOneTargetFile()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(4, 5, false);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
@@ -2590,7 +2591,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testCrossSpaceCompactionWithAllDataDeletedInDeviceInSeqFiles()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(4, 5, false);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
@@ -2795,7 +2796,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testAlignedCrossSpaceCompactionWithSameTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
registerTimeseriesInMManger(2, 3, true);
createFiles(5, 2, 3, 100, 0, 0, 0, 0, true, true);
createFiles(5, 2, 3, 50, 0, 10000, 50, 50, true, false);
@@ -2895,7 +2896,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testAlignedCrossSpaceCompactionWithDifferentTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(4, 5, true);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true);
@@ -3060,7 +3061,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testAlignedCrossSpaceCompactionWithAllDataDeletedInTimeseries()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(4, 5, true);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true);
@@ -3298,7 +3299,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testAlignedCrossSpaceCompactionWithAllDataDeletedInOneTargetFile()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(4, 5, true);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true);
@@ -3495,7 +3496,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testCrossSpaceCompactionWithSameTimeseriesInDifferentSourceFiles()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(4, 5, false);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
@@ -3618,7 +3619,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testCrossSpaceCompactionWithDifferentDevicesInDifferentSourceFiles()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(5, 7, false);
List<Integer> deviceIndex = new ArrayList<>();
@@ -3758,7 +3759,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testCrossSpaceCompactionWithDifferentMeasurementsInDifferentSourceFiles()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(5, 5, false);
List<Integer> deviceIndex = new ArrayList<>();
@@ -3874,7 +3875,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testCrossSpaceCompactionWithDifferentDevicesAndMeasurementsInDifferentSourceFiles()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(4, 5, false);
List<Integer> deviceIndex = new ArrayList<>();
@@ -4012,7 +4013,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testAlignedCrossSpaceCompactionWithSameTimeseriesInDifferentSourceFiles()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(4, 5, true);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true);
@@ -4241,7 +4242,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testAlignedCrossSpaceCompactionWithDifferentDevicesInDifferentSourceFiles()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(5, 7, true);
List<Integer> deviceIndex = new ArrayList<>();
@@ -4510,7 +4511,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testAlignedCrossSpaceCompactionWithDifferentMeasurementsInDifferentSourceFiles()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(5, 5, true);
List<Integer> deviceIndex = new ArrayList<>();
@@ -4712,7 +4713,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@Test
public void testAlignedCrossSpaceCompactionWithFileTimeIndexResource()
throws IOException, WriteProcessException, MetadataException, StorageEngineException,
- InterruptedException {
+ InterruptedException, ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
registerTimeseriesInMManger(4, 5, true);
createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true);
@@ -4907,7 +4908,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
}
@Test
- public void testCrossSpaceCompactionWithNewDeviceInUnseqFile() {
+ public void testCrossSpaceCompactionWithNewDeviceInUnseqFile() throws ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
try {
registerTimeseriesInMManger(6, 6, false);
@@ -4982,7 +4983,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
}
@Test
- public void testCrossSpaceCompactionWithDeviceMaxTimeLaterInUnseqFile() {
+ public void testCrossSpaceCompactionWithDeviceMaxTimeLaterInUnseqFile()
+ throws ExecutionException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
try {
registerTimeseriesInMManger(6, 6, false);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java
index 09e17a763f..095a6d260f 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java
@@ -63,6 +63,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.iotdb.db.engine.compaction.utils.CompactionCheckerUtils.putChunk;
@@ -350,7 +351,7 @@ public class InnerSeqCompactionTest {
}
}
}
- } catch (InterruptedException | StorageEngineException e) {
+ } catch (InterruptedException | StorageEngineException | ExecutionException e) {
e.printStackTrace();
} finally {
IoTDBDescriptor.getInstance()
@@ -365,7 +366,7 @@ public class InnerSeqCompactionTest {
@Test
public void testAppendPage()
throws IOException, MetadataException, InterruptedException, StorageEngineException,
- WriteProcessException {
+ WriteProcessException, ExecutionException {
for (int toMergeFileNum : toMergeFileNums) {
for (CompactionTimeseriesType compactionTimeseriesType : compactionTimeseriesTypes) {
@@ -632,7 +633,7 @@ public class InnerSeqCompactionTest {
@Test
public void testAppendChunk()
throws IOException, IllegalPathException, MetadataException, StorageEngineException,
- WriteProcessException {
+ WriteProcessException, ExecutionException {
long prevChunkPointNumLowerBoundInCompaction =
IoTDBDescriptor.getInstance().getConfig().getChunkPointNumLowerBoundInCompaction();
IoTDBDescriptor.getInstance().getConfig().setChunkPointNumLowerBoundInCompaction(1);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java
index 588b1af97e..1fd50b21fe 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java
@@ -47,6 +47,8 @@ import org.apache.iotdb.tsfile.utils.Pair;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -57,10 +59,12 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import static org.apache.iotdb.db.engine.compaction.utils.CompactionCheckerUtils.putOnePageChunk;
public class InnerUnseqCompactionTest {
+ private static final Logger LOG = LoggerFactory.getLogger(InnerUnseqCompactionTest.class);
static final String COMPACTION_TEST_SG = "root.compactionTest";
static final String[] fullPaths =
new String[] {
@@ -132,7 +136,7 @@ public class InnerUnseqCompactionTest {
@Test
public void test()
throws MetadataException, IOException, StorageEngineException, WriteProcessException,
- InterruptedException {
+ InterruptedException, ExecutionException {
for (int toMergeFileNum : toMergeFileNums) {
for (CompactionTimeseriesType compactionTimeseriesType : compactionTimeseriesTypes) {
for (boolean compactionBeforeHasMod : compactionBeforeHasMods) {
@@ -351,6 +355,13 @@ public class InnerUnseqCompactionTest {
toDeleteTimeseriesAndTime, tsFileResource, false);
}
}
+ LOG.error(
+ "{} {} {} {} {}",
+ toMergeFileNum,
+ compactionTimeseriesType,
+ compactionBeforeHasMod,
+ compactionHasMod,
+ compactionOverlapType);
TsFileResource targetTsFileResource =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(
toMergeResources, false)
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerOldTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerOldTest.java
index 59b0ab3d3d..b48d494c0b 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerOldTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerOldTest.java
@@ -45,6 +45,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
+import java.util.concurrent.ExecutionException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -78,7 +79,7 @@ public class ReadChunkCompactionPerformerOldTest extends InnerCompactionTest {
@Test
public void testCompact()
throws IOException, MetadataException, InterruptedException, StorageEngineException,
- WriteProcessException {
+ WriteProcessException, ExecutionException {
TsFileResource targetTsFileResource =
new TsFileResource(
new File(
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
index 831f8cd120..9ee1f7f566 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
@@ -186,6 +186,14 @@ public class ChunkMetadata implements IChunkMetadata {
return chunkMetaData;
}
+ public static ChunkMetadata deserializeFrom(ByteBuffer buffer, TSDataType dataType) {
+ ChunkMetadata chunkMetadata = new ChunkMetadata();
+ chunkMetadata.tsDataType = dataType;
+ chunkMetadata.offsetOfChunkHeader = ReadWriteIOUtils.readLong(buffer);
+ chunkMetadata.statistics = Statistics.deserialize(buffer, dataType);
+ return chunkMetadata;
+ }
+
@Override
public long getVersion() {
return version;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
index 062ffd6183..44cdc8b0bf 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
@@ -81,6 +81,11 @@ public class MetadataIndexConstructor {
measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
}
+ return checkAndBuildLevelIndex(deviceMetadataIndexMap, out);
+ }
+
+ public static MetadataIndexNode checkAndBuildLevelIndex(
+ Map<String, MetadataIndexNode> deviceMetadataIndexMap, TsFileOutput out) throws IOException {
// if not exceed the max child nodes num, ignore the device index and directly point to the
// measurement
if (deviceMetadataIndexMap.size() <= config.getMaxDegreeOfIndexNode()) {
@@ -123,7 +128,7 @@ public class MetadataIndexConstructor {
* @param out tsfile output
* @param type MetadataIndexNode type
*/
- private static MetadataIndexNode generateRootNode(
+ public static MetadataIndexNode generateRootNode(
Queue<MetadataIndexNode> metadataIndexNodeQueue, TsFileOutput out, MetadataIndexNodeType type)
throws IOException {
int queueSize = metadataIndexNodeQueue.size();
@@ -148,7 +153,7 @@ public class MetadataIndexConstructor {
return metadataIndexNodeQueue.poll();
}
- private static void addCurrentIndexNodeToQueue(
+ public static void addCurrentIndexNodeToQueue(
MetadataIndexNode currentIndexNode,
Queue<MetadataIndexNode> metadataIndexNodeQueue,
TsFileOutput out)
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
index 3f6f6336b3..1d3972cafe 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
@@ -74,7 +74,7 @@ public class MetadataIndexNode {
this.children.add(metadataIndexEntry);
}
- boolean isFull() {
+ public boolean isFull() {
return children.size() >= config.getMaxDegreeOfIndexNode();
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
index 95e01e2da1..f6f974fc1a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
@@ -105,10 +105,15 @@ public class TsFileMetadata {
* @param outputStream -output stream to determine byte length
* @return -byte length
*/
- public int serializeBloomFilter(OutputStream outputStream, Set<Path> paths) throws IOException {
- int byteLen = 0;
+ public int buildAndSerializeBloomFilter(OutputStream outputStream, Set<Path> paths)
+ throws IOException {
BloomFilter filter = buildBloomFilter(paths);
+ return serializeBloomFilter(outputStream, filter);
+ }
+ public int serializeBloomFilter(OutputStream outputStream, BloomFilter filter)
+ throws IOException {
+ int byteLen = 0;
byte[] bytes = filter.serialize();
byteLen += ReadWriteForEncodingUtils.writeUnsignedVarInt(bytes.length, outputStream);
outputStream.write(bytes);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index 78253124b8..391426cc34 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -78,6 +78,18 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
this(file, true);
}
+ /**
+ * @param file a given tsfile path you want to (continue to) write
+ * @throws IOException if write failed, or the file is broken but autoRepair==false.
+ */
+ public RestorableTsFileIOWriter(File file, long maxMetadataSize) throws IOException {
+ this(file, true);
+ this.maxMetadataSize = maxMetadataSize;
+ this.enableMemoryControl = true;
+ this.chunkMetadataTempFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX);
+ this.checkMetadataSizeAndMayFlush();
+ }
+
public RestorableTsFileIOWriter(File file, boolean truncate) throws IOException {
if (logger.isDebugEnabled()) {
logger.debug("{} is opened.", file.getName());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 22a68bdfd7..851f03c192 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -26,35 +26,46 @@ import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.BloomFilter;
import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.writer.tsmiterator.TSMIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.TreeMap;
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.addCurrentIndexNodeToQueue;
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.checkAndBuildLevelIndex;
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.generateRootNode;
+
/**
* TsFileIOWriter is used to construct metadata and write data stored in memory to output stream.
*/
@@ -76,7 +87,7 @@ public class TsFileIOWriter implements AutoCloseable {
protected File file;
// current flushed Chunk
- private ChunkMetadata currentChunkMetadata;
+ protected ChunkMetadata currentChunkMetadata;
// current flushed ChunkGroup
protected List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
// all flushed ChunkGroups
@@ -93,6 +104,20 @@ public class TsFileIOWriter implements AutoCloseable {
private long minPlanIndex;
private long maxPlanIndex;
+ // the following variable is used for memory control
+ protected long maxMetadataSize;
+ protected long currentChunkMetadataSize = 0L;
+ protected File chunkMetadataTempFile;
+ protected LocalTsFileOutput tempOutput;
+ protected volatile boolean hasChunkMetadataInDisk = false;
+ protected String currentSeries = null;
+ // record the total num of path in order to make bloom filter
+ protected int pathCount = 0;
+ protected boolean enableMemoryControl = false;
+ private Path lastSerializePath = null;
+ protected LinkedList<Long> endPosInCMTForDevice = new LinkedList<>();
+ public static final String CHUNK_METADATA_TEMP_FILE_SUFFIX = ".cmt";
+
/** empty construct function. */
protected TsFileIOWriter() {}
@@ -126,6 +151,15 @@ public class TsFileIOWriter implements AutoCloseable {
this.out = output;
}
+ /** for write with memory control */
+ public TsFileIOWriter(File file, boolean enableMemoryControl, long maxMetadataSize)
+ throws IOException {
+ this(file);
+ this.enableMemoryControl = enableMemoryControl;
+ this.maxMetadataSize = maxMetadataSize;
+ chunkMetadataTempFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX);
+ }
+
/**
* Writes given bytes to output stream. This method is called when total memory size exceeds the
* chunk group size threshold.
@@ -249,6 +283,9 @@ public class TsFileIOWriter implements AutoCloseable {
/** end chunk and write some log. */
public void endCurrentChunk() {
+ if (enableMemoryControl) {
+ this.currentChunkMetadataSize += currentChunkMetadata.calculateRamSize();
+ }
chunkMetadataList.add(currentChunkMetadata);
currentChunkMetadata = null;
}
@@ -260,47 +297,14 @@ public class TsFileIOWriter implements AutoCloseable {
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void endFile() throws IOException {
- long metaOffset = out.getPosition();
-
- // serialize the SEPARATOR of MetaData
- ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
-
- // group ChunkMetadata by series
- Map<Path, List<IChunkMetadata>> chunkMetadataListMap = new TreeMap<>();
-
- for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
- List<ChunkMetadata> chunkMetadatas = chunkGroupMetadata.getChunkMetadataList();
- for (IChunkMetadata chunkMetadata : chunkMetadatas) {
- Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
- chunkMetadataListMap.computeIfAbsent(series, k -> new ArrayList<>()).add(chunkMetadata);
- }
- }
-
- MetadataIndexNode metadataIndex = flushMetadataIndex(chunkMetadataListMap);
- TsFileMetadata tsFileMetaData = new TsFileMetadata();
- tsFileMetaData.setMetadataIndex(metadataIndex);
- tsFileMetaData.setMetaOffset(metaOffset);
+ checkInMemoryPathCount();
+ readChunkMetadataAndConstructIndexTree();
long footerIndex = out.getPosition();
if (logger.isDebugEnabled()) {
logger.debug("start to flush the footer,file pos:{}", footerIndex);
}
- // write TsFileMetaData
- int size = tsFileMetaData.serializeTo(out.wrapAsStream());
- if (logger.isDebugEnabled()) {
- logger.debug("finish flushing the footer {}, file pos:{}", tsFileMetaData, out.getPosition());
- }
-
- // write bloom filter
- size += tsFileMetaData.serializeBloomFilter(out.wrapAsStream(), chunkMetadataListMap.keySet());
- if (logger.isDebugEnabled()) {
- logger.debug("finish flushing the bloom filter file pos:{}", out.getPosition());
- }
-
- // write TsFileMetaData size
- ReadWriteIOUtils.write(size, out.wrapAsStream()); // write the size of the file metadata.
-
// write magic string
out.write(MAGIC_STRING_BYTES);
@@ -312,63 +316,112 @@ public class TsFileIOWriter implements AutoCloseable {
canWrite = false;
}
- /**
- * Flush TsFileMetadata, including ChunkMetadataList and TimeseriesMetaData
- *
- * @param chunkMetadataListMap chunkMetadata that Path.mask == 0
- * @return MetadataIndexEntry list in TsFileMetadata
- */
- private MetadataIndexNode flushMetadataIndex(Map<Path, List<IChunkMetadata>> chunkMetadataListMap)
- throws IOException {
+ private void checkInMemoryPathCount() {
+ for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+ pathCount += chunkGroupMetadata.getChunkMetadataList().size();
+ }
+ }
- // convert ChunkMetadataList to this field
- deviceTimeseriesMetadataMap = new LinkedHashMap<>();
- // create device -> TimeseriesMetaDataList Map
- for (Map.Entry<Path, List<IChunkMetadata>> entry : chunkMetadataListMap.entrySet()) {
- // for ordinary path
- flushOneChunkMetadata(entry.getKey(), entry.getValue());
+ private void readChunkMetadataAndConstructIndexTree() throws IOException {
+ if (tempOutput != null) {
+ tempOutput.close();
}
+ long metaOffset = out.getPosition();
- // construct TsFileMetadata and return
- return MetadataIndexConstructor.constructMetadataIndex(deviceTimeseriesMetadataMap, out);
- }
+ // serialize the SEPARATOR of MetaData
+ ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
- /**
- * Flush one chunkMetadata
- *
- * @param path Path of chunk
- * @param chunkMetadataList List of chunkMetadata about path(previous param)
- */
- private void flushOneChunkMetadata(Path path, List<IChunkMetadata> chunkMetadataList)
- throws IOException {
- // create TimeseriesMetaData
- PublicBAOS publicBAOS = new PublicBAOS();
- TSDataType dataType = chunkMetadataList.get(chunkMetadataList.size() - 1).getDataType();
- Statistics seriesStatistics = Statistics.getStatsByType(dataType);
-
- int chunkMetadataListLength = 0;
- boolean serializeStatistic = (chunkMetadataList.size() > 1);
- // flush chunkMetadataList one by one
- for (IChunkMetadata chunkMetadata : chunkMetadataList) {
- if (!chunkMetadata.getDataType().equals(dataType)) {
- continue;
+ TSMIterator tsmIterator =
+ hasChunkMetadataInDisk
+ ? TSMIterator.getTSMIteratorInDisk(
+ chunkMetadataTempFile, chunkGroupMetadataList, endPosInCMTForDevice)
+ : TSMIterator.getTSMIteratorInMemory(chunkGroupMetadataList);
+ Map<String, MetadataIndexNode> deviceMetadataIndexMap = new TreeMap<>();
+ Queue<MetadataIndexNode> measurementMetadataIndexQueue = new ArrayDeque<>();
+ String currentDevice = null;
+ String prevDevice = null;
+ MetadataIndexNode currentIndexNode =
+ new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+ TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
+ int seriesIdxForCurrDevice = 0;
+ BloomFilter filter =
+ BloomFilter.getEmptyBloomFilter(
+ TSFileDescriptor.getInstance().getConfig().getBloomFilterErrorRate(), pathCount);
+
+ int indexCount = 0;
+ while (tsmIterator.hasNext()) {
+ // read in all chunk metadata of one series
+ // construct the timeseries metadata for this series
+ Pair<String, TimeseriesMetadata> timeseriesMetadataPair = tsmIterator.next();
+ TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right;
+ currentSeries = timeseriesMetadataPair.left;
+
+ indexCount++;
+ // build bloom filter
+ filter.add(currentSeries);
+ // construct the index tree node for the series
+ Path currentPath = null;
+ if (timeseriesMetadata.getTSDataType() == TSDataType.VECTOR) {
+ // this series is the time column of the aligned device
+ // the full series path will be like "root.sg.d."
+ // we remove the last . in the series id here
+ currentPath = new Path(currentSeries);
+ currentDevice = currentSeries.substring(0, currentSeries.length() - 1);
+ } else {
+ currentPath = new Path(currentSeries, true);
+ currentDevice = currentPath.getDevice();
}
- chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, serializeStatistic);
- seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
+ if (!currentDevice.equals(prevDevice)) {
+ if (prevDevice != null) {
+ addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
+ deviceMetadataIndexMap.put(
+ prevDevice,
+ generateRootNode(
+ measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
+ currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+ }
+ measurementMetadataIndexQueue = new ArrayDeque<>();
+ seriesIdxForCurrDevice = 0;
+ }
+
+ if (seriesIdxForCurrDevice % config.getMaxDegreeOfIndexNode() == 0) {
+ if (currentIndexNode.isFull()) {
+ addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
+ currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+ }
+ if (timeseriesMetadata.getTSDataType() != TSDataType.VECTOR) {
+ currentIndexNode.addEntry(
+ new MetadataIndexEntry(currentPath.getMeasurement(), out.getPosition()));
+ } else {
+ currentIndexNode.addEntry(new MetadataIndexEntry("", out.getPosition()));
+ }
+ }
+
+ prevDevice = currentDevice;
+ seriesIdxForCurrDevice++;
+ // serialize the timeseries metadata to file
+ timeseriesMetadata.serializeTo(out.wrapAsStream());
}
- TimeseriesMetadata timeseriesMetadata =
- new TimeseriesMetadata(
- (byte)
- ((serializeStatistic ? (byte) 1 : (byte) 0) | chunkMetadataList.get(0).getMask()),
- chunkMetadataListLength,
- path.getMeasurement(),
- dataType,
- seriesStatistics,
- publicBAOS);
- deviceTimeseriesMetadataMap
- .computeIfAbsent(path.getDevice(), k -> new ArrayList<>())
- .add(timeseriesMetadata);
+ addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
+ if (prevDevice != null) {
+ deviceMetadataIndexMap.put(
+ prevDevice,
+ generateRootNode(
+ measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
+ }
+
+ MetadataIndexNode metadataIndex = checkAndBuildLevelIndex(deviceMetadataIndexMap, out);
+
+ TsFileMetadata tsFileMetadata = new TsFileMetadata();
+ tsFileMetadata.setMetadataIndex(metadataIndex);
+ tsFileMetadata.setMetaOffset(metaOffset);
+
+ int size = tsFileMetadata.serializeTo(out.wrapAsStream());
+ size += tsFileMetadata.serializeBloomFilter(out.wrapAsStream(), filter);
+
+ // write TsFileMetaData size
+ ReadWriteIOUtils.write(size, out.wrapAsStream());
}
/**
@@ -412,6 +465,9 @@ public class TsFileIOWriter implements AutoCloseable {
public void close() throws IOException {
canWrite = false;
out.close();
+ if (tempOutput != null) {
+ this.tempOutput.close();
+ }
}
void writeSeparatorMaskForTest() throws IOException {
@@ -490,6 +546,30 @@ public class TsFileIOWriter implements AutoCloseable {
* @return DeviceTimeseriesMetadataMap
*/
public Map<String, List<TimeseriesMetadata>> getDeviceTimeseriesMetadataMap() {
+ Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap = new TreeMap<>();
+ Map<String, Map<String, List<IChunkMetadata>>> chunkMetadataMap = new TreeMap<>();
+ for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+ for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
+ chunkMetadataMap
+ .computeIfAbsent(chunkGroupMetadata.getDevice(), x -> new TreeMap<>())
+ .computeIfAbsent(chunkMetadata.getMeasurementUid(), x -> new ArrayList<>())
+ .add(chunkMetadata);
+ }
+ }
+ for (String device : chunkMetadataMap.keySet()) {
+ Map<String, List<IChunkMetadata>> seriesToChunkMetadataMap = chunkMetadataMap.get(device);
+ for (Map.Entry<String, List<IChunkMetadata>> entry : seriesToChunkMetadataMap.entrySet()) {
+ try {
+ deviceTimeseriesMetadataMap
+ .computeIfAbsent(device, x -> new ArrayList<>())
+ .add(TSMIterator.constructOneTimeseriesMetadata(entry.getKey(), entry.getValue()));
+ } catch (IOException e) {
+ logger.error("Failed to get device timeseries metadata map", e);
+ return null;
+ }
+ }
+ }
+
return deviceTimeseriesMetadataMap;
}
@@ -508,4 +588,85 @@ public class TsFileIOWriter implements AutoCloseable {
public void setMaxPlanIndex(long maxPlanIndex) {
this.maxPlanIndex = maxPlanIndex;
}
+
+ /**
+ * Check if the size of chunk metadata in memory is greater than the given threshold. If so, the
+ * chunk metadata will be written to a temp files. <b>Notice! If you are writing a aligned device
+ * in row, you should make sure all data of current writing device has been written before this
+ * method is called. For writing not aligned series or writing aligned series in column, you
+ * should make sure that all data of one series is written before you call this function.</b>
+ *
+ * @throws IOException
+ */
+ public void checkMetadataSizeAndMayFlush() throws IOException {
+ // This function should be called after all data of an aligned device has been written
+ if (enableMemoryControl && currentChunkMetadataSize > maxMetadataSize) {
+ try {
+ sortAndFlushChunkMetadata();
+ } catch (IOException e) {
+ logger.error("Meets exception when flushing metadata to temp file for {}", file, e);
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Sort the chunk metadata by the lexicographical order and the start time of the chunk, then
+ * flush them to a temp file.
+ *
+ * @throws IOException
+ */
+ protected void sortAndFlushChunkMetadata() throws IOException {
+ // group by series
+ List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList =
+ TSMIterator.sortChunkMetadata(
+ chunkGroupMetadataList, currentChunkGroupDeviceId, chunkMetadataList);
+ if (tempOutput == null) {
+ tempOutput = new LocalTsFileOutput(new FileOutputStream(chunkMetadataTempFile));
+ }
+ hasChunkMetadataInDisk = true;
+ for (Pair<Path, List<IChunkMetadata>> pair : sortedChunkMetadataList) {
+ Path seriesPath = pair.left;
+ boolean isNewPath = !seriesPath.equals(lastSerializePath);
+ if (isNewPath) {
+ // record the count of path to construct bloom filter later
+ pathCount++;
+ }
+ List<IChunkMetadata> iChunkMetadataList = pair.right;
+ writeChunkMetadataToTempFile(iChunkMetadataList, seriesPath, isNewPath);
+ lastSerializePath = seriesPath;
+ logger.debug("Flushing {}", seriesPath);
+ }
+ // clear the cache metadata to release the memory
+ chunkGroupMetadataList.clear();
+ if (chunkMetadataList != null) {
+ chunkMetadataList.clear();
+ }
+ }
+
+ private void writeChunkMetadataToTempFile(
+ List<IChunkMetadata> iChunkMetadataList, Path seriesPath, boolean isNewPath)
+ throws IOException {
+ // [DeviceId] measurementId datatype size chunkMetadataBuffer
+ if (lastSerializePath == null
+ || !seriesPath.getDevice().equals(lastSerializePath.getDevice())) {
+ // mark the end position of last device
+ endPosInCMTForDevice.add(tempOutput.getPosition());
+ // serialize the device
+ // for each device, we only serialize it once, in order to save io
+ ReadWriteIOUtils.write(seriesPath.getDevice(), tempOutput.wrapAsStream());
+ }
+ if (isNewPath && iChunkMetadataList.size() > 0) {
+ // serialize the public info of this measurement
+ ReadWriteIOUtils.writeVar(seriesPath.getMeasurement(), tempOutput.wrapAsStream());
+ ReadWriteIOUtils.write(iChunkMetadataList.get(0).getDataType(), tempOutput.wrapAsStream());
+ }
+ PublicBAOS buffer = new PublicBAOS();
+ int totalSize = 0;
+ for (IChunkMetadata chunkMetadata : iChunkMetadataList) {
+ totalSize += chunkMetadata.serializeTo(buffer, true);
+ }
+ ReadWriteIOUtils.write(totalSize, tempOutput.wrapAsStream());
+ buffer.writeTo(tempOutput);
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/DiskTSMIterator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/DiskTSMIterator.java
new file mode 100644
index 0000000000..fd02f1438a
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/DiskTSMIterator.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write.writer.tsmiterator;
+
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * This class read ChunkMetadata iteratively from disk(.cmt file) and memory(list of
+ * ChunkGroupMetadata), and construct them as TimeseriesMetadata. It will read ChunkMetadata in disk
+ * first, and after all ChunkMetadata in disk is read, it will read ChunkMetadata in memory.
+ */
+public class DiskTSMIterator extends TSMIterator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DiskTSMIterator.class);
+
+ private LinkedList<Long> endPosForEachDevice;
+ private File cmtFile;
+ private LocalTsFileInput input;
+ private long fileLength = 0;
+ private long currentPos = 0;
+ private long nextEndPosForDevice = 0;
+ private String currentDevice;
+ private boolean remainsInFile = true;
+
+ protected DiskTSMIterator(
+ File cmtFile,
+ List<ChunkGroupMetadata> chunkGroupMetadataList,
+ LinkedList<Long> endPosForEachDevice)
+ throws IOException {
+ super(chunkGroupMetadataList);
+ this.cmtFile = cmtFile;
+ this.endPosForEachDevice = endPosForEachDevice;
+ this.input = new LocalTsFileInput(cmtFile.toPath());
+ this.fileLength = cmtFile.length();
+ this.nextEndPosForDevice = endPosForEachDevice.removeFirst();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return remainsInFile || iterator.hasNext();
+ }
+
+ @Override
+ public Pair<String, TimeseriesMetadata> next() {
+ try {
+ if (remainsInFile) {
+ // deserialize from file
+ return getTimeSerisMetadataFromFile();
+ } else {
+ // get from memory iterator
+ return super.next();
+ }
+ } catch (IOException e) {
+ LOG.error("Meets IOException when reading timeseries metadata from disk", e);
+ return null;
+ }
+ }
+
+ private Pair<String, TimeseriesMetadata> getTimeSerisMetadataFromFile() throws IOException {
+ if (currentPos == nextEndPosForDevice) {
+ // deserialize the current device name
+ currentDevice = ReadWriteIOUtils.readString(input.wrapAsInputStream());
+ nextEndPosForDevice =
+ endPosForEachDevice.size() > 0 ? endPosForEachDevice.removeFirst() : fileLength;
+ }
+ // deserialize public info for measurement
+ String measurementUid = ReadWriteIOUtils.readVarIntString(input.wrapAsInputStream());
+ byte dataTypeInByte = ReadWriteIOUtils.readByte(input.wrapAsInputStream());
+ TSDataType dataType = TSDataType.getTsDataType(dataTypeInByte);
+ int chunkBufferSize = ReadWriteIOUtils.readInt(input.wrapAsInputStream());
+ ByteBuffer chunkBuffer = ByteBuffer.allocate(chunkBufferSize);
+ int readSize = ReadWriteIOUtils.readAsPossible(input, chunkBuffer);
+ if (readSize < chunkBufferSize) {
+ throw new IOException(
+ String.format(
+ "Expected to read %s bytes, but actually read %s bytes", chunkBufferSize, readSize));
+ }
+ chunkBuffer.flip();
+
+ // deserialize chunk metadata from chunk buffer
+ List<IChunkMetadata> chunkMetadataList = new ArrayList<>();
+ while (chunkBuffer.hasRemaining()) {
+ chunkMetadataList.add(ChunkMetadata.deserializeFrom(chunkBuffer, dataType));
+ }
+ updateCurrentPos();
+ return new Pair<>(
+ currentDevice + "." + measurementUid,
+ constructOneTimeseriesMetadata(measurementUid, chunkMetadataList));
+ }
+
+ private void updateCurrentPos() throws IOException {
+ currentPos = input.position();
+ if (currentPos >= fileLength) {
+ remainsInFile = false;
+ input.close();
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/TSMIterator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/TSMIterator.java
new file mode 100644
index 0000000000..f11242f296
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/TSMIterator.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.writer.tsmiterator;
+
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * TSMIterator returns full path of series and its TimeseriesMetadata iteratively. It accepts data
+ * source from memory or disk. Static method getTSMIteratorInMemory returns a TSMIterator that reads
+ * from memory, and static method getTSMIteratorInDisk returns a TSMIterator that reads from disk.
+ */
+public class TSMIterator {
+ private static final Logger LOG = LoggerFactory.getLogger(TSMIterator.class);
+ protected List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList;
+ protected Iterator<Pair<Path, List<IChunkMetadata>>> iterator;
+
+ protected TSMIterator(List<ChunkGroupMetadata> chunkGroupMetadataList) {
+ this.sortedChunkMetadataList = sortChunkMetadata(chunkGroupMetadataList, null, null);
+ this.iterator = sortedChunkMetadataList.iterator();
+ }
+
+ public static TSMIterator getTSMIteratorInMemory(
+ List<ChunkGroupMetadata> chunkGroupMetadataList) {
+ return new TSMIterator(chunkGroupMetadataList);
+ }
+
+ public static TSMIterator getTSMIteratorInDisk(
+ File cmtFile, List<ChunkGroupMetadata> chunkGroupMetadataList, LinkedList<Long> serializePos)
+ throws IOException {
+ return new DiskTSMIterator(cmtFile, chunkGroupMetadataList, serializePos);
+ }
+
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ public Pair<String, TimeseriesMetadata> next() throws IOException {
+ Pair<Path, List<IChunkMetadata>> nextPair = iterator.next();
+ return new Pair<>(
+ nextPair.left.getFullPath(),
+ constructOneTimeseriesMetadata(nextPair.left.getMeasurement(), nextPair.right));
+ }
+
+ public static TimeseriesMetadata constructOneTimeseriesMetadata(
+ String measurementId, List<IChunkMetadata> chunkMetadataList) throws IOException {
+ // create TimeseriesMetaData
+ PublicBAOS publicBAOS = new PublicBAOS();
+ TSDataType dataType = chunkMetadataList.get(chunkMetadataList.size() - 1).getDataType();
+ Statistics seriesStatistics = Statistics.getStatsByType(dataType);
+
+ int chunkMetadataListLength = 0;
+ boolean serializeStatistic = (chunkMetadataList.size() > 1);
+ // flush chunkMetadataList one by one
+ for (IChunkMetadata chunkMetadata : chunkMetadataList) {
+ if (!chunkMetadata.getDataType().equals(dataType)) {
+ continue;
+ }
+ chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, serializeStatistic);
+ seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
+ }
+
+ TimeseriesMetadata timeseriesMetadata =
+ new TimeseriesMetadata(
+ (byte)
+ ((serializeStatistic ? (byte) 1 : (byte) 0) | chunkMetadataList.get(0).getMask()),
+ chunkMetadataListLength,
+ measurementId,
+ dataType,
+ seriesStatistics,
+ publicBAOS);
+ return timeseriesMetadata;
+ }
+
+ public static List<Pair<Path, List<IChunkMetadata>>> sortChunkMetadata(
+ List<ChunkGroupMetadata> chunkGroupMetadataList,
+ String currentDevice,
+ List<ChunkMetadata> chunkMetadataList) {
+ Map<String, Map<Path, List<IChunkMetadata>>> chunkMetadataMap = new TreeMap<>();
+ List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList = new LinkedList<>();
+ for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+ chunkMetadataMap.computeIfAbsent(chunkGroupMetadata.getDevice(), x -> new TreeMap<>());
+ for (IChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
+ chunkMetadataMap
+ .get(chunkGroupMetadata.getDevice())
+ .computeIfAbsent(
+ new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid()),
+ x -> new ArrayList<>())
+ .add(chunkMetadata);
+ }
+ }
+ if (currentDevice != null) {
+ for (IChunkMetadata chunkMetadata : chunkMetadataList) {
+ chunkMetadataMap
+ .computeIfAbsent(currentDevice, x -> new TreeMap<>())
+ .computeIfAbsent(
+ new Path(currentDevice, chunkMetadata.getMeasurementUid()), x -> new ArrayList<>())
+ .add(chunkMetadata);
+ }
+ }
+
+ for (Map.Entry<String, Map<Path, List<IChunkMetadata>>> entry : chunkMetadataMap.entrySet()) {
+ Map<Path, List<IChunkMetadata>> seriesChunkMetadataMap = entry.getValue();
+ for (Map.Entry<Path, List<IChunkMetadata>> seriesChunkMetadataEntry :
+ seriesChunkMetadataMap.entrySet()) {
+ sortedChunkMetadataList.add(
+ new Pair<>(seriesChunkMetadataEntry.getKey(), seriesChunkMetadataEntry.getValue()));
+ }
+ }
+ return sortedChunkMetadataList;
+ }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java
new file mode 100644
index 0000000000..c97a9a0774
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.read.reader.IChunkReader;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
+import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+/** This class provide some static method to check the integrity of tsfile */
+public class TsFileIntegrityCheckingTool {
+ private static Logger LOG = LoggerFactory.getLogger(TsFileIntegrityCheckingTool.class);
+
+ /**
+ * This method check the integrity of file by reading it from the start to the end. It mainly
+ * checks the integrity of the chunks.
+ *
+ * @param filename
+ */
+ public static void checkIntegrityBySequenceRead(String filename) {
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
+ String headMagicString = reader.readHeadMagic();
+ Assert.assertEquals(TSFileConfig.MAGIC_STRING, headMagicString);
+ String tailMagicString = reader.readTailMagic();
+ Assert.assertEquals(TSFileConfig.MAGIC_STRING, tailMagicString);
+ reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1);
+ List<long[]> timeBatch = new ArrayList<>();
+ int pageIndex = 0;
+ byte marker;
+ while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
+ switch (marker) {
+ case MetaMarker.CHUNK_HEADER:
+ case MetaMarker.TIME_CHUNK_HEADER:
+ case MetaMarker.VALUE_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
+ ChunkHeader header = reader.readChunkHeader(marker);
+ if (header.getDataSize() == 0) {
+ // empty value chunk
+ break;
+ }
+ Decoder defaultTimeDecoder =
+ Decoder.getDecoderByType(
+ TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+ TSDataType.INT64);
+ Decoder valueDecoder =
+ Decoder.getDecoderByType(header.getEncodingType(), header.getDataType());
+ int dataSize = header.getDataSize();
+ pageIndex = 0;
+ if (header.getDataType() == TSDataType.VECTOR) {
+ timeBatch.clear();
+ }
+ while (dataSize > 0) {
+ valueDecoder.reset();
+ PageHeader pageHeader =
+ reader.readPageHeader(
+ header.getDataType(),
+ (header.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER);
+ ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType());
+ if ((header.getChunkType() & (byte) TsFileConstant.TIME_COLUMN_MASK)
+ == (byte) TsFileConstant.TIME_COLUMN_MASK) { // Time Chunk
+ TimePageReader timePageReader =
+ new TimePageReader(pageHeader, pageData, defaultTimeDecoder);
+ timeBatch.add(timePageReader.getNextTimeBatch());
+ } else if ((header.getChunkType() & (byte) TsFileConstant.VALUE_COLUMN_MASK)
+ == (byte) TsFileConstant.VALUE_COLUMN_MASK) { // Value Chunk
+ ValuePageReader valuePageReader =
+ new ValuePageReader(pageHeader, pageData, header.getDataType(), valueDecoder);
+ TsPrimitiveType[] valueBatch =
+ valuePageReader.nextValueBatch(timeBatch.get(pageIndex));
+ } else { // NonAligned Chunk
+ PageReader pageReader =
+ new PageReader(
+ pageData, header.getDataType(), valueDecoder, defaultTimeDecoder, null);
+ BatchData batchData = pageReader.getAllSatisfiedPageData();
+ }
+ pageIndex++;
+ dataSize -= pageHeader.getSerializedPageSize();
+ }
+ break;
+ case MetaMarker.CHUNK_GROUP_HEADER:
+ ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
+ break;
+ case MetaMarker.OPERATION_INDEX_RANGE:
+ reader.readPlanIndex();
+ break;
+ default:
+ MetaMarker.handleUnexpectedMarker(marker);
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Meet exception when checking integrity of tsfile", e);
+ Assert.fail();
+ }
+ }
+
+ /**
+ * This method checks the integrity of the file by mimicking the process of the query, which reads
+ * the metadata index tree first, and get the timeseries metadata list and chunk metadata list.
+ * After that, this method acquires single chunk according to chunk metadata, then it deserializes
+ * the chunk, and verifies the correctness of the data.
+ *
+ * @param filename File to be check
+ * @param originData The origin data in a map format, Device -> SeriesId -> List<List<Time,Val>>,
+ * each inner list stands for a chunk.
+ */
+ public static void checkIntegrityByQuery(
+ String filename,
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData) {
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
+ Map<String, List<TimeseriesMetadata>> allTimeseriesMetadata =
+ reader.getAllTimeseriesMetadata(true);
+ Assert.assertEquals(originData.size(), allTimeseriesMetadata.size());
+ // check each series
+ for (Map.Entry<String, List<TimeseriesMetadata>> entry : allTimeseriesMetadata.entrySet()) {
+ String deviceId = entry.getKey();
+ List<TimeseriesMetadata> timeseriesMetadataList = entry.getValue();
+ boolean vectorMode = false;
+ if (timeseriesMetadataList.size() > 0
+ && timeseriesMetadataList.get(0).getTSDataType() != TSDataType.VECTOR) {
+ Assert.assertEquals(originData.get(deviceId).size(), timeseriesMetadataList.size());
+ } else {
+ vectorMode = true;
+ Assert.assertEquals(originData.get(deviceId).size(), timeseriesMetadataList.size() - 1);
+ }
+
+ if (!vectorMode) {
+ // check integrity of not aligned series
+ for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
+ // get its chunk metadata list, and read the chunk
+ String measurementId = timeseriesMetadata.getMeasurementId();
+ List<List<Pair<Long, TsPrimitiveType>>> originChunks =
+ originData.get(deviceId).get(measurementId);
+ List<IChunkMetadata> chunkMetadataList = timeseriesMetadata.getChunkMetadataList();
+ Assert.assertEquals(originChunks.size(), chunkMetadataList.size());
+ chunkMetadataList.sort(Comparator.comparing(IChunkMetadata::getStartTime));
+ for (int i = 0; i < chunkMetadataList.size(); ++i) {
+ Chunk chunk = reader.readMemChunk((ChunkMetadata) chunkMetadataList.get(i));
+ ChunkReader chunkReader = new ChunkReader(chunk, null);
+ List<Pair<Long, TsPrimitiveType>> originValue = originChunks.get(i);
+ // deserialize the chunk and verify it with origin data
+ for (int valIdx = 0; chunkReader.hasNextSatisfiedPage(); ) {
+ IPointReader pointReader = chunkReader.nextPageData().getBatchDataIterator();
+ while (pointReader.hasNextTimeValuePair()) {
+ TimeValuePair pair = pointReader.nextTimeValuePair();
+ Assert.assertEquals(
+ originValue.get(valIdx).left.longValue(), pair.getTimestamp());
+ Assert.assertEquals(originValue.get(valIdx++).right, pair.getValue());
+ }
+ }
+ }
+ }
+ } else {
+ // check integrity of vector type
+ // get the timeseries metadata of the time column
+ TimeseriesMetadata timeColumnMetadata = timeseriesMetadataList.get(0);
+ List<IChunkMetadata> timeChunkMetadataList = timeColumnMetadata.getChunkMetadataList();
+ timeChunkMetadataList.sort(Comparator.comparing(IChunkMetadata::getStartTime));
+
+ for (int i = 1; i < timeseriesMetadataList.size(); ++i) {
+ // traverse each value column
+ List<IChunkMetadata> valueChunkMetadataList =
+ timeseriesMetadataList.get(i).getChunkMetadataList();
+ Assert.assertEquals(timeChunkMetadataList.size(), valueChunkMetadataList.size());
+ List<List<Pair<Long, TsPrimitiveType>>> originDataChunks =
+ originData.get(deviceId).get(timeseriesMetadataList.get(i).getMeasurementId());
+ for (int chunkIdx = 0; chunkIdx < timeChunkMetadataList.size(); ++chunkIdx) {
+ Chunk timeChunk =
+ reader.readMemChunk((ChunkMetadata) timeChunkMetadataList.get(chunkIdx));
+ Chunk valueChunk =
+ reader.readMemChunk((ChunkMetadata) valueChunkMetadataList.get(chunkIdx));
+ // construct an aligned chunk reader using time chunk and value chunk
+ IChunkReader chunkReader =
+ new AlignedChunkReader(timeChunk, Collections.singletonList(valueChunk), null);
+ // verify the values
+ List<Pair<Long, TsPrimitiveType>> originValue = originDataChunks.get(chunkIdx);
+ for (int valIdx = 0; chunkReader.hasNextSatisfiedPage(); ) {
+ IBatchDataIterator pointReader = chunkReader.nextPageData().getBatchDataIterator();
+ while (pointReader.hasNext()) {
+ long time = pointReader.currentTime();
+ Assert.assertEquals(originValue.get(valIdx).left.longValue(), time);
+ Assert.assertEquals(
+ originValue.get(valIdx++).right.getValue(),
+ ((TsPrimitiveType[]) pointReader.currentValue())[0].getValue());
+ pointReader.next();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ } catch (IOException e) {
+ LOG.error("Meet exception when checking integrity of tsfile", e);
+ Assert.fail();
+ }
+ }
+}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
new file mode 100644
index 0000000000..44e4af3678
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java
@@ -0,0 +1,1261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.encoding.encoder.TSEncodingBuilder;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+import org.apache.iotdb.tsfile.write.TsFileIntegrityCheckingTool;
+import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.TimeChunkWriter;
+import org.apache.iotdb.tsfile.write.chunk.ValueChunkWriter;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.tsmiterator.TSMIterator;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+public class TsFileIOWriterMemoryControlTest {
+ private static File testFile = new File("target", "1-1-0-0.tsfile");
+ private static File emptyFile = new File("target", "temp");
+ private long TEST_CHUNK_SIZE = 1000;
+ private List<String> sortedSeriesId = new ArrayList<>();
+ private List<String> sortedDeviceId = new ArrayList<>();
+ private boolean init = false;
+
+ @Before
+ public void setUp() throws IOException {
+ if (!init) {
+ init = true;
+ for (int i = 0; i < 2048; ++i) {
+ sortedSeriesId.add("s" + i);
+ sortedDeviceId.add("root.sg.d" + i);
+ }
+ sortedSeriesId.sort((String::compareTo));
+ sortedDeviceId.sort((String::compareTo));
+ }
+ TEST_CHUNK_SIZE = 1000;
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (testFile.exists()) {
+ FileUtils.delete(testFile);
+ }
+ if (new File(testFile.getPath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX).exists()) {
+ FileUtils.delete(
+ new File(testFile.getPath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX));
+ }
+ if (emptyFile.exists()) {
+ FileUtils.delete(emptyFile);
+ }
+ }
+
+ /** The following tests is for ChunkMetadata serialization and deserialization. */
+ @Test
+ public void testSerializeAndDeserializeChunkMetadata() throws IOException {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) {
+ List<ChunkMetadata> originChunkMetadataList = new ArrayList<>();
+ for (int i = 0; i < 10; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ for (int j = 0; j < 5; ++j) {
+ ChunkWriterImpl chunkWriter;
+ switch (j) {
+ case 0:
+ chunkWriter = generateIntData(j, 0L, new ArrayList<>());
+ break;
+ case 1:
+ chunkWriter = generateBooleanData(j, 0, new ArrayList<>());
+ break;
+ case 2:
+ chunkWriter = generateFloatData(j, 0L, new ArrayList<>());
+ break;
+ case 3:
+ chunkWriter = generateDoubleData(j, 0L, new ArrayList<>());
+ break;
+ case 4:
+ default:
+ chunkWriter = generateTextData(j, 0L, new ArrayList<>());
+ break;
+ }
+ chunkWriter.writeToFileWriter(writer);
+ }
+ originChunkMetadataList.addAll(writer.chunkMetadataList);
+ writer.endChunkGroup();
+ }
+ writer.sortAndFlushChunkMetadata();
+ writer.tempOutput.flush();
+
+ TSMIterator iterator =
+ TSMIterator.getTSMIteratorInDisk(
+ writer.chunkMetadataTempFile,
+ writer.chunkGroupMetadataList,
+ writer.endPosInCMTForDevice);
+ for (int i = 0; iterator.hasNext(); ++i) {
+ Pair<String, TimeseriesMetadata> timeseriesMetadataPair = iterator.next();
+ TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right;
+ Assert.assertEquals(sortedSeriesId.get(i % 5), timeseriesMetadata.getMeasurementId());
+ Assert.assertEquals(
+ originChunkMetadataList.get(i).getDataType(), timeseriesMetadata.getTSDataType());
+ Assert.assertEquals(
+ originChunkMetadataList.get(i).getStatistics(), timeseriesMetadata.getStatistics());
+ }
+ }
+ }
+
+ @Test
+ public void testSerializeAndDeserializeAlignedChunkMetadata() throws IOException {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) {
+ List<ChunkMetadata> originChunkMetadataList = new ArrayList<>();
+ for (int i = 0; i < 10; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ AlignedChunkWriterImpl chunkWriter = generateVectorData(0L, new ArrayList<>(), 6);
+ chunkWriter.writeToFileWriter(writer);
+ originChunkMetadataList.addAll(writer.chunkMetadataList);
+ writer.endChunkGroup();
+ }
+ writer.sortAndFlushChunkMetadata();
+ writer.tempOutput.flush();
+
+ List<String> measurementIds = new ArrayList<>();
+ for (int i = 0; i < 10; ++i) {
+ measurementIds.add(sortedDeviceId.get(i) + ".");
+ for (int j = 1; j <= 6; ++j) {
+ measurementIds.add(sortedDeviceId.get(i) + ".s" + j);
+ }
+ }
+ TSMIterator iterator =
+ TSMIterator.getTSMIteratorInDisk(
+ writer.chunkMetadataTempFile, new ArrayList<>(), writer.endPosInCMTForDevice);
+ for (int i = 0; iterator.hasNext(); ++i) {
+ Pair<String, TimeseriesMetadata> timeseriesMetadataPair = iterator.next();
+ String fullPath = timeseriesMetadataPair.left;
+ TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right;
+ Assert.assertEquals(measurementIds.get(i), fullPath);
+ Assert.assertEquals(
+ originChunkMetadataList.get(i).getDataType(), timeseriesMetadata.getTSDataType());
+ Assert.assertEquals(
+ originChunkMetadataList.get(i).getStatistics(), timeseriesMetadata.getStatistics());
+ }
+ }
+ }
+
+ @Test
+ public void testSerializeAndDeserializeMixedChunkMetadata() throws IOException {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) {
+ List<IChunkMetadata> originChunkMetadataList = new ArrayList<>();
+ List<String> seriesIds = new ArrayList<>();
+ for (int i = 0; i < 10; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ if (i % 2 == 0) {
+ // write normal series
+ for (int j = 0; j < 5; ++j) {
+ ChunkWriterImpl chunkWriter;
+ switch (j) {
+ case 0:
+ chunkWriter = generateIntData(j, 0L, new ArrayList<>());
+ break;
+ case 1:
+ chunkWriter = generateBooleanData(j, 0L, new ArrayList<>());
+ break;
+ case 2:
+ chunkWriter = generateFloatData(j, 0L, new ArrayList<>());
+ break;
+ case 3:
+ chunkWriter = generateDoubleData(j, 0L, new ArrayList<>());
+ break;
+ case 4:
+ default:
+ chunkWriter = generateTextData(j, 0L, new ArrayList<>());
+ break;
+ }
+ chunkWriter.writeToFileWriter(writer);
+ seriesIds.add(deviceId + "." + sortedSeriesId.get(j));
+ }
+ } else {
+ // write vector
+ AlignedChunkWriterImpl chunkWriter = generateVectorData(0L, new ArrayList<>(), 6);
+ chunkWriter.writeToFileWriter(writer);
+ seriesIds.add(deviceId + ".");
+ for (int l = 1; l <= 6; ++l) {
+ seriesIds.add(deviceId + ".s" + l);
+ }
+ }
+ originChunkMetadataList.addAll(writer.chunkMetadataList);
+ writer.endChunkGroup();
+ }
+ writer.sortAndFlushChunkMetadata();
+ writer.tempOutput.flush();
+
+ TSMIterator iterator =
+ TSMIterator.getTSMIteratorInDisk(
+ writer.chunkMetadataTempFile, new ArrayList<>(), writer.endPosInCMTForDevice);
+ for (int i = 0; i < originChunkMetadataList.size(); ++i) {
+ Pair<String, TimeseriesMetadata> timeseriesMetadataPair = iterator.next();
+ Assert.assertEquals(seriesIds.get(i), timeseriesMetadataPair.left);
+ Assert.assertEquals(
+ originChunkMetadataList.get(i).getDataType(),
+ timeseriesMetadataPair.right.getTSDataType());
+ Assert.assertEquals(
+ originChunkMetadataList.get(i).getStatistics(),
+ timeseriesMetadataPair.right.getStatistics());
+ }
+ }
+ }
+
+ /** The following tests is for writing normal series in different nums. */
+
+ /**
+ * Write a file with 10 devices and 5 series in each device. For each series, we write one chunk
+ * for it. This test make sure that each chunk
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testWriteCompleteFileWithNormalChunk() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < 10; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ for (int j = 0; j < 5; ++j) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ ChunkWriterImpl chunkWriter;
+ switch (j) {
+ case 0:
+ chunkWriter = generateIntData(j, 0L, valList);
+ break;
+ case 1:
+ chunkWriter = generateBooleanData(j, 0L, valList);
+ break;
+ case 2:
+ chunkWriter = generateFloatData(j, 0L, valList);
+ break;
+ case 3:
+ chunkWriter = generateDoubleData(j, 0L, valList);
+ break;
+ case 4:
+ default:
+ chunkWriter = generateTextData(j, 0L, valList);
+ break;
+ }
+ chunkWriter.writeToFileWriter(writer);
+ writer.checkMetadataSizeAndMayFlush();
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ writer.endChunkGroup();
+ }
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ writer.endFile();
+ }
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+ }
+
+ /**
+ * Write a file with 10 devices and 5 series in each device. For each series, we write 100 chunks
+ * for it. This test make sure that each chunk
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testWriteCompleteFileWithMultipleNormalChunk() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < 10; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ for (int j = 0; j < 5; ++j) {
+ ChunkWriterImpl chunkWriter;
+ switch (j) {
+ case 0:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 1:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 2:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 3:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 4:
+ default:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ }
+ writer.checkMetadataSizeAndMayFlush();
+ }
+ writer.endChunkGroup();
+ }
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ writer.endFile();
+ }
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+ }
+
+ /**
+ * Write a file with 10 devices and 5 series in each device. For each series, we write 100 chunks
+ * for it. We maintain some chunk metadata in memory when calling endFile().
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testWriteCompleteFileWithMetadataRemainsInMemoryWhenEndFile() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < 10; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ for (int j = 0; j < 5; ++j) {
+ ChunkWriterImpl chunkWriter;
+ switch (j) {
+ case 0:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 1:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 2:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 3:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 4:
+ default:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ }
+ if (i < 9) {
+ writer.checkMetadataSizeAndMayFlush();
+ }
+ }
+ writer.endChunkGroup();
+ }
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ Assert.assertFalse(writer.chunkGroupMetadataList.isEmpty());
+ writer.endFile();
+ }
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+ }
+
+ /**
+ * Write a file with 2 devices and 5 series in each device. For each series, we write 1024 chunks
+ * for it. This test make sure that each chunk
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testWriteCompleteFileWithEnormousNormalChunk() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+ long originTestChunkSize = TEST_CHUNK_SIZE;
+ TEST_CHUNK_SIZE = 10;
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < 2; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ for (int j = 0; j < 5; ++j) {
+ ChunkWriterImpl chunkWriter;
+ switch (j) {
+ case 0:
+ for (int k = 0; k < 1024; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 1:
+ for (int k = 0; k < 1024; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 2:
+ for (int k = 0; k < 1024; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 3:
+ for (int k = 0; k < 1024; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 4:
+ default:
+ for (int k = 0; k < 1024; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ }
+ writer.checkMetadataSizeAndMayFlush();
+ }
+ writer.endChunkGroup();
+ }
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ writer.endFile();
+ } finally {
+ TEST_CHUNK_SIZE = originTestChunkSize;
+ }
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+ }
+
+ /**
+ * Write a file with 2 devices and 1024 series in each device. For each series, we write 50 chunks
+ * for it. This test make sure that each chunk
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testWriteCompleteFileWithEnormousSeriesNum() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originTimes = new HashMap<>();
+ long originTestChunkSize = TEST_CHUNK_SIZE;
+ TEST_CHUNK_SIZE = 1;
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < 2; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ for (int j = 0; j < 1024; ++j) {
+ ChunkWriterImpl chunkWriter;
+ switch (j % 5) {
+ case 0:
+ for (int k = 0; k < 50; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 1:
+ for (int k = 0; k < 50; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 2:
+ for (int k = 0; k < 50; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 3:
+ for (int k = 0; k < 50; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 4:
+ default:
+ for (int k = 0; k < 50; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ }
+ writer.checkMetadataSizeAndMayFlush();
+ }
+ writer.endChunkGroup();
+ }
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ writer.endFile();
+ } finally {
+ TEST_CHUNK_SIZE = originTestChunkSize;
+ }
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originTimes);
+ }
+
+ /**
+ * Write a file with 1024 devices and 5 series in each device. For each series, we write 10 chunks
+ * for it. This test make sure that each chunk
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testWriteCompleteFileWithEnormousDeviceNum() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originTimes = new HashMap<>();
+ long originTestChunkSize = TEST_CHUNK_SIZE;
+ TEST_CHUNK_SIZE = 10;
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < 1024; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ for (int j = 0; j < 5; ++j) {
+ ChunkWriterImpl chunkWriter;
+ switch (j % 5) {
+ case 0:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 1:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 2:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 3:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 4:
+ default:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ }
+ writer.checkMetadataSizeAndMayFlush();
+ }
+ writer.endChunkGroup();
+ }
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ writer.endFile();
+ } finally {
+ TEST_CHUNK_SIZE = originTestChunkSize;
+ }
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originTimes);
+ }
+
+ /** The following tests is for writing aligned series. */
+
+ /**
+ * Test writing 10 align series, 6 in a group.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testWriteCompleteFileWithAlignedSeriesWithOneChunk() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < 10; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
+ AlignedChunkWriterImpl chunkWriter = generateVectorData(0L, valList, 6);
+ for (int j = 1; j <= 6; ++j) {
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent("s" + j, x -> new ArrayList<>())
+ .add(valList.get(j - 1));
+ }
+
+ chunkWriter.writeToFileWriter(writer);
+ writer.endChunkGroup();
+ writer.checkMetadataSizeAndMayFlush();
+ }
+ writer.endFile();
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ }
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+ }
+
+ /**
+ * Test writing 1 aligned series, for each series we write 512 chunks
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testWriteCompleteFileWithAlignedSeriesWithMultiChunks() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+ int chunkNum = 512, seriesNum = 6;
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < 1; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ for (int k = 0; k < chunkNum; ++k) {
+ writer.startChunkGroup(deviceId);
+ List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
+ AlignedChunkWriterImpl chunkWriter =
+ generateVectorData(k * TEST_CHUNK_SIZE, valList, seriesNum);
+ for (int j = 1; j <= seriesNum; ++j) {
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent("s" + j, x -> new ArrayList<>())
+ .add(valList.get(j - 1));
+ }
+
+ chunkWriter.writeToFileWriter(writer);
+ writer.endChunkGroup();
+ }
+ writer.checkMetadataSizeAndMayFlush();
+ }
+ writer.endFile();
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ }
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+ }
+
+ /**
+ * Test write aligned chunk metadata, for each aligned series, we write 1024 components.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testWriteCompleteFileWithAlignedSeriesWithManyComponents() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+ int chunkNum = 5, seriesNum = 1024;
+ long originTestPointNum = TEST_CHUNK_SIZE;
+ TEST_CHUNK_SIZE = 10;
+ try {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < 10; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ for (int k = 0; k < chunkNum; ++k) {
+ writer.startChunkGroup(deviceId);
+ List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
+ AlignedChunkWriterImpl chunkWriter =
+ generateVectorData(k * TEST_CHUNK_SIZE, valList, seriesNum);
+ for (int j = 1; j <= seriesNum; ++j) {
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent("s" + j, x -> new ArrayList<>())
+ .add(valList.get(j - 1));
+ }
+
+ chunkWriter.writeToFileWriter(writer);
+ writer.endChunkGroup();
+ }
+ writer.checkMetadataSizeAndMayFlush();
+ }
+ writer.endFile();
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ }
+ } finally {
+ TEST_CHUNK_SIZE = originTestPointNum;
+ }
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+ }
+
+ @Test
+ public void testWriteCompleteFileWithLotsAlignedSeries() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+ int chunkNum = 5, seriesNum = 12;
+ long originTestPointNum = TEST_CHUNK_SIZE;
+ TEST_CHUNK_SIZE = 10;
+ int deviceNum = 1024;
+ try {
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < deviceNum; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ for (int k = 0; k < chunkNum; ++k) {
+ writer.startChunkGroup(deviceId);
+ List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
+ AlignedChunkWriterImpl chunkWriter =
+ generateVectorData(k * TEST_CHUNK_SIZE, valList, seriesNum);
+ for (int j = 1; j <= seriesNum; ++j) {
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent("s" + j, x -> new ArrayList<>())
+ .add(valList.get(j - 1));
+ }
+
+ chunkWriter.writeToFileWriter(writer);
+ writer.endChunkGroup();
+ }
+ writer.checkMetadataSizeAndMayFlush();
+ }
+ writer.endFile();
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ }
+ } finally {
+ TEST_CHUNK_SIZE = originTestPointNum;
+ }
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+ }
+
+ @Test
+ public void testWritingAlignedSeriesByColumnWithMultiComponents() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue = new HashMap<>();
+ TEST_CHUNK_SIZE = 10;
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < 5; i++) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ TSEncoding timeEncoding =
+ TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
+ TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
+ Encoder encoder = TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType);
+ for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) {
+ TimeChunkWriter timeChunkWriter =
+ new TimeChunkWriter("", CompressionType.SNAPPY, TSEncoding.PLAIN, encoder);
+ for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) {
+ timeChunkWriter.write(j);
+ }
+ timeChunkWriter.writeToFileWriter(writer);
+ }
+ writer.sortAndFlushChunkMetadata();
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ for (int k = 0; k < 1024; ++k) {
+ TSEncodingBuilder builder = TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN);
+ builder.initFromProps(null);
+ for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) {
+ ValueChunkWriter chunkWriter =
+ new ValueChunkWriter(
+ sortedSeriesId.get(k),
+ CompressionType.SNAPPY,
+ TSDataType.DOUBLE,
+ TSEncoding.PLAIN,
+ builder.getEncoder(TSDataType.DOUBLE));
+ Random random = new Random();
+ List<Pair<Long, TsPrimitiveType>> valueList = new ArrayList<>();
+ for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) {
+ double val = random.nextDouble();
+ chunkWriter.write(j, val, false);
+ valueList.add(new Pair<>((long) j, new TsPrimitiveType.TsDouble(val)));
+ }
+ chunkWriter.writeToFileWriter(writer);
+ originValue
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(k), x -> new ArrayList<>())
+ .add(valueList);
+ }
+ writer.sortAndFlushChunkMetadata();
+ }
+ writer.endChunkGroup();
+ }
+ writer.endFile();
+ }
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originValue);
+ }
+
+ @Test
+ public void testWritingCompleteMixedFiles() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < 5; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ for (int k = 0; k < 10; ++k) {
+ writer.startChunkGroup(deviceId);
+ List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>();
+ AlignedChunkWriterImpl chunkWriter = generateVectorData(k * TEST_CHUNK_SIZE, valList, 6);
+ for (int j = 1; j <= 6; ++j) {
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent("s" + j, x -> new ArrayList<>())
+ .add(valList.get(j - 1));
+ }
+
+ chunkWriter.writeToFileWriter(writer);
+ writer.endChunkGroup();
+ }
+ writer.checkMetadataSizeAndMayFlush();
+ }
+ for (int i = 5; i < 10; ++i) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ for (int j = 0; j < 5; ++j) {
+ ChunkWriterImpl chunkWriter;
+ switch (j) {
+ case 0:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 1:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 2:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 3:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ case 4:
+ default:
+ for (int k = 0; k < 10; ++k) {
+ List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>();
+ chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList);
+ chunkWriter.writeToFileWriter(writer);
+ originData
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>())
+ .add(valList);
+ }
+ break;
+ }
+ writer.checkMetadataSizeAndMayFlush();
+ }
+ writer.endChunkGroup();
+ }
+ writer.endFile();
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ }
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData);
+ }
+
+ @Test
+ public void testWritingAlignedSeriesByColumn() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue = new HashMap<>();
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < 5; i++) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ TSEncoding timeEncoding =
+ TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
+ TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
+ Encoder encoder = TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType);
+ TimeChunkWriter timeChunkWriter =
+ new TimeChunkWriter("", CompressionType.SNAPPY, TSEncoding.PLAIN, encoder);
+ for (int j = 0; j < TEST_CHUNK_SIZE; ++j) {
+ timeChunkWriter.write(j);
+ }
+ timeChunkWriter.writeToFileWriter(writer);
+ writer.sortAndFlushChunkMetadata();
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ for (int k = 0; k < 5; ++k) {
+ TSEncodingBuilder builder = TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN);
+ builder.initFromProps(null);
+ ValueChunkWriter chunkWriter =
+ new ValueChunkWriter(
+ sortedSeriesId.get(k),
+ CompressionType.SNAPPY,
+ TSDataType.DOUBLE,
+ TSEncoding.PLAIN,
+ builder.getEncoder(TSDataType.DOUBLE));
+ Random random = new Random();
+ List<Pair<Long, TsPrimitiveType>> valueList = new ArrayList<>();
+ for (int j = 0; j < TEST_CHUNK_SIZE; ++j) {
+ double val = random.nextDouble();
+ chunkWriter.write(j, val, false);
+ valueList.add(new Pair<>((long) j, new TsPrimitiveType.TsDouble(val)));
+ }
+ chunkWriter.writeToFileWriter(writer);
+ originValue
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(k), x -> new ArrayList<>())
+ .add(valueList);
+ writer.sortAndFlushChunkMetadata();
+ }
+ writer.endChunkGroup();
+ }
+ writer.endFile();
+ }
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originValue);
+ }
+
+ @Test
+ public void testWritingAlignedSeriesByColumnWithMultiChunks() throws IOException {
+ Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue = new HashMap<>();
+ try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
+ for (int i = 0; i < 5; i++) {
+ String deviceId = sortedDeviceId.get(i);
+ writer.startChunkGroup(deviceId);
+ TSEncoding timeEncoding =
+ TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
+ TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
+ Encoder encoder = TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType);
+ for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) {
+ TimeChunkWriter timeChunkWriter =
+ new TimeChunkWriter("", CompressionType.SNAPPY, TSEncoding.PLAIN, encoder);
+ for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) {
+ timeChunkWriter.write(j);
+ }
+ timeChunkWriter.writeToFileWriter(writer);
+ }
+ writer.sortAndFlushChunkMetadata();
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ for (int k = 0; k < 5; ++k) {
+ TSEncodingBuilder builder = TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN);
+ builder.initFromProps(null);
+ for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) {
+ ValueChunkWriter chunkWriter =
+ new ValueChunkWriter(
+ sortedSeriesId.get(k),
+ CompressionType.SNAPPY,
+ TSDataType.DOUBLE,
+ TSEncoding.PLAIN,
+ builder.getEncoder(TSDataType.DOUBLE));
+ Random random = new Random();
+ List<Pair<Long, TsPrimitiveType>> valueList = new ArrayList<>();
+ for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) {
+ double val = random.nextDouble();
+ chunkWriter.write(j, val, false);
+ valueList.add(new Pair<>((long) j, new TsPrimitiveType.TsDouble(val)));
+ }
+ chunkWriter.writeToFileWriter(writer);
+ originValue
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(sortedSeriesId.get(k), x -> new ArrayList<>())
+ .add(valueList);
+ }
+ writer.sortAndFlushChunkMetadata();
+ }
+ writer.endChunkGroup();
+ }
+ writer.endFile();
+ }
+ TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originValue);
+ }
+
+ /** The following tests is for writing mixed of normal series and aligned series */
+ private ChunkWriterImpl generateIntData(
+ int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
+ ChunkWriterImpl chunkWriter =
+ new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.INT64));
+ Random random = new Random();
+ for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+ long val = random.nextLong();
+ chunkWriter.write(i, val);
+ record.add(new Pair<>(i, new TsPrimitiveType.TsLong(val)));
+ }
+ return chunkWriter;
+ }
+
+ private ChunkWriterImpl generateFloatData(
+ int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
+ ChunkWriterImpl chunkWriter =
+ new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.FLOAT));
+ Random random = new Random();
+ for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+ float val = random.nextFloat();
+ chunkWriter.write(i, val);
+ record.add(new Pair<>(i, new TsPrimitiveType.TsFloat(val)));
+ }
+ return chunkWriter;
+ }
+
+ private ChunkWriterImpl generateDoubleData(
+ int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
+ ChunkWriterImpl chunkWriter =
+ new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.DOUBLE));
+ Random random = new Random();
+ for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+ double val = random.nextDouble();
+ chunkWriter.write(i, val);
+ record.add(new Pair<>(i, new TsPrimitiveType.TsDouble(val)));
+ }
+ return chunkWriter;
+ }
+
+ private ChunkWriterImpl generateBooleanData(
+ int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
+ ChunkWriterImpl chunkWriter =
+ new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.BOOLEAN));
+ Random random = new Random();
+ for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+ boolean val = random.nextBoolean();
+ chunkWriter.write(i, val);
+ record.add(new Pair<>(i, new TsPrimitiveType.TsBoolean(val)));
+ }
+ return chunkWriter;
+ }
+
+ private AlignedChunkWriterImpl generateVectorData(
+ long startTime, List<List<Pair<Long, TsPrimitiveType>>> record, int seriesNum) {
+ List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
+ TSDataType[] dataTypes =
+ new TSDataType[] {
+ TSDataType.INT32,
+ TSDataType.INT64,
+ TSDataType.FLOAT,
+ TSDataType.DOUBLE,
+ TSDataType.BOOLEAN,
+ TSDataType.TEXT
+ };
+ for (int i = 0; i < seriesNum; ++i) {
+ measurementSchemas.add(new MeasurementSchema("s" + (i + 1), dataTypes[i % 6]));
+ }
+ AlignedChunkWriterImpl chunkWriter = new AlignedChunkWriterImpl(measurementSchemas);
+ Random random = new Random();
+ for (int i = 0; i < seriesNum; ++i) {
+ record.add(new ArrayList<>());
+ }
+ for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+ TsPrimitiveType[] points = new TsPrimitiveType[seriesNum];
+ for (int j = 0; j < seriesNum; ++j) {
+ switch (j % 6) {
+ case 0:
+ points[j] = new TsPrimitiveType.TsInt(random.nextInt());
+ break;
+ case 1:
+ points[j] = new TsPrimitiveType.TsLong(random.nextLong());
+ break;
+ case 2:
+ points[j] = new TsPrimitiveType.TsFloat(random.nextFloat());
+ break;
+ case 3:
+ points[j] = new TsPrimitiveType.TsDouble(random.nextDouble());
+ break;
+ case 4:
+ points[j] = new TsPrimitiveType.TsBoolean(random.nextBoolean());
+ break;
+ case 5:
+ points[j] =
+ new TsPrimitiveType.TsBinary(new Binary(String.valueOf(random.nextDouble())));
+ break;
+ }
+ }
+ for (int j = 0; j < seriesNum; ++j) {
+ record.get(j).add(new Pair<>(i, points[j]));
+ }
+ chunkWriter.write(i, points);
+ }
+ return chunkWriter;
+ }
+
+ private ChunkWriterImpl generateTextData(
+ int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) {
+ ChunkWriterImpl chunkWriter =
+ new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.TEXT));
+ Random random = new Random();
+ for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
+ Binary val = new Binary(String.valueOf(random.nextDouble()));
+ chunkWriter.write(i, val);
+ record.add(new Pair<>(i, new TsPrimitiveType.TsBinary(val)));
+ }
+ return chunkWriter;
+ }
+}