You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2020/04/25 13:39:43 UTC
[incubator-iotdb] 01/01: TsFileMetadataIndex pro
This is an automated email from the ASF dual-hosted git repository.
sunzesong pushed a commit to branch tsfile_metadata_index_pro
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 5de0f0dfa0240a71eadf85555872c244ac492211
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Sat Apr 25 21:39:10 2020 +0800
TsFileMetadataIndex pro
---
.../apache/iotdb/tsfile/TsFileSequenceRead.java | 18 +-
.../iotdb/flink/tsfile/util/TSFileConfigUtil.java | 1 +
.../util/TSFileConfigUtilCompletenessTest.java | 102 +++---
.../db/engine/cache/TimeSeriesMetadataCache.java | 3 +-
.../iotdb/db/engine/cache/TsFileMetaDataCache.java | 2 +-
.../apache/iotdb/db/tools/TsFileSketchTool.java | 47 ++-
.../org/apache/iotdb/db/utils/FileLoaderUtils.java | 22 +-
.../java/org/apache/iotdb/db/utils/MergeUtils.java | 10 +-
.../writelog/recover/TsFileRecoverPerformer.java | 15 +-
.../apache/iotdb/spark/tsfile/DefaultSource.scala | 2 +-
.../iotdb/spark/tsfile/NarrowConverter.scala | 2 +-
.../apache/iotdb/spark/tsfile/WideConverter.scala | 6 +-
.../iotdb/tsfile/common/conf/TSFileConfig.java | 12 +
.../iotdb/tsfile/common/conf/TSFileDescriptor.java | 2 +
.../tsfile/file/metadata/TimeseriesMetadata.java | 14 +-
.../iotdb/tsfile/file/metadata/TsFileMetadata.java | 42 +--
.../metadata/enums/ChildMetadataIndexType.java | 86 +++++
.../iotdb/tsfile/read/TsFileSequenceReader.java | 365 +++++++++++++++------
.../read/controller/MetadataQuerierByFileImpl.java | 29 +-
.../apache/iotdb/tsfile/utils/MetadataIndex.java | 88 +++++
.../write/writer/RestorableTsFileIOWriter.java | 9 +-
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 163 +++++++--
.../tsfile/file/metadata/utils/TestHelper.java | 13 +-
.../iotdb/tsfile/file/metadata/utils/Utils.java | 5 +-
.../iotdb/tsfile/write/TsFileIOWriterTest.java | 3 +-
25 files changed, 773 insertions(+), 288 deletions(-)
diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
index 0ec5fb4..9a4fba0 100644
--- a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
+++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
@@ -37,7 +37,7 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.page.PageReader;
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.MetadataIndex;
public class TsFileSequenceRead {
@@ -47,7 +47,8 @@ public class TsFileSequenceRead {
filename = args[0];
}
TsFileSequenceReader reader = new TsFileSequenceReader(filename);
- System.out.println("file length: " + FSFactoryProducer.getFSFactory().getFile(filename).length());
+ System.out
+ .println("file length: " + FSFactoryProducer.getFSFactory().getFile(filename).length());
System.out.println("file magic head: " + reader.readHeadMagic());
System.out.println("file magic tail: " + reader.readTailMagic());
System.out.println("Level 1 metadata position: " + reader.getFileMetadataPos());
@@ -57,7 +58,8 @@ public class TsFileSequenceRead {
// first SeriesChunks (headers and data) in one ChunkGroup, then the CHUNK_GROUP_FOOTER
// Because we do not know how many chunks a ChunkGroup may have, we should read one byte (the marker) ahead and
// judge accordingly.
- reader.position(TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER.getBytes().length);
+ reader.position(TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER
+ .getBytes().length);
System.out.println("[Chunk Group]");
System.out.println("position: " + reader.position());
byte marker;
@@ -107,13 +109,13 @@ public class TsFileSequenceRead {
}
}
System.out.println("[Metadata]");
- Map<String, Pair<Long, Integer>> deviceOffsetsMap = metaData.getDeviceMetadataIndex();
- for (Map.Entry<String, Pair<Long, Integer>> entry: deviceOffsetsMap.entrySet()) {
- String deviceId = entry.getKey();
+ List<MetadataIndex> metadataIndexList = metaData.getDeviceMetadataIndex();
+ for (MetadataIndex metadataIndex : metadataIndexList) {
Map<String, List<ChunkMetadata>> seriesMetaData =
- reader.readChunkMetadataInDevice(deviceId);
+ reader.readChunkMetadataInDevice(metadataIndex.getName()); // TODO
System.out.println(String
- .format("\t[Device]Device %s, Number of Measurements %d", deviceId, seriesMetaData.size()));
+ .format("\t[Device]Device %s, Number of Measurements %d", metadataIndex.getName(),
+ seriesMetaData.size())); // TODO
for (Map.Entry<String, List<ChunkMetadata>> serie : seriesMetaData.entrySet()) {
System.out.println("\t\tMeasurement:" + serie.getKey());
for (ChunkMetadata chunkMetadata : serie.getValue()) {
diff --git a/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java
index 41596ea..ce47091 100644
--- a/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java
+++ b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java
@@ -50,6 +50,7 @@ public class TSFileConfigUtil {
globalConfig.setKerberosKeytabFilePath(config.getKerberosKeytabFilePath());
globalConfig.setKerberosPrincipal(config.getKerberosPrincipal());
globalConfig.setMaxNumberOfPointsInPage(config.getMaxNumberOfPointsInPage());
+ globalConfig.setMaxNumberOfIndexItemsInNode(config.getMaxNumberOfIndexItemsInNode());
globalConfig.setMaxStringLength(config.getMaxStringLength());
globalConfig.setPageCheckSizeThreshold(config.getPageCheckSizeThreshold());
globalConfig.setPageSizeInByte(config.getPageSizeInByte());
diff --git a/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TSFileConfigUtilCompletenessTest.java b/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TSFileConfigUtilCompletenessTest.java
index 2af10b6..fc6f5fe 100644
--- a/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TSFileConfigUtilCompletenessTest.java
+++ b/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TSFileConfigUtilCompletenessTest.java
@@ -19,66 +19,66 @@
package org.apache.iotdb.flink.util;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.junit.Test;
+import static org.junit.Assert.assertTrue;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
-
-import static org.junit.Assert.assertTrue;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.junit.Test;
/**
* This test is used to help maintain the {@link org.apache.iotdb.flink.tsfile.util.TSFileConfigUtil}.
*/
public class TSFileConfigUtilCompletenessTest {
- @Test
- public void testTSFileConfigUtilCompleteness() {
- String[] addedSetters = {
- "setBatchSize",
- "setBloomFilterErrorRate",
- "setCompressor",
- "setCoreSitePath",
- "setDeltaBlockSize",
- "setDfsClientFailoverProxyProvider",
- "setDfsHaAutomaticFailoverEnabled",
- "setDfsHaNamenodes",
- "setDfsNameServices",
- "setDftSatisfyRate",
- "setEndian",
- "setFloatPrecision",
- "setFreqType",
- "setGroupSizeInByte",
- "setHdfsIp",
- "setHdfsPort",
- "setHdfsSitePath",
- "setKerberosKeytabFilePath",
- "setKerberosPrincipal",
- "setMaxNumberOfPointsInPage",
- "setMaxStringLength",
- "setPageCheckSizeThreshold",
- "setPageSizeInByte",
- "setPlaMaxError",
- "setRleBitWidth",
- "setSdtMaxError",
- "setTimeEncoder",
- "setTimeSeriesDataType",
- "setTSFileStorageFs",
- "setUseKerberos",
- "setValueEncoder"
- };
- Set<String> newSetters = Arrays.stream(TSFileConfig.class.getMethods())
- .map(Method::getName)
- .filter(s -> s.startsWith("set"))
- .filter(s -> !Arrays.asList(addedSetters).contains(s))
- .collect(Collectors.toSet());
- assertTrue(
- String.format(
- "New setters in TSFileConfig are detected, please add them to " +
- "org.apache.iotdb.flink.tsfile.util.TSFileConfigUtil. The setters need to be added: %s",
- newSetters),
- newSetters.isEmpty());
- }
+ @Test
+ public void testTSFileConfigUtilCompleteness() {
+ String[] addedSetters = {
+ "setBatchSize",
+ "setBloomFilterErrorRate",
+ "setCompressor",
+ "setCoreSitePath",
+ "setDeltaBlockSize",
+ "setDfsClientFailoverProxyProvider",
+ "setDfsHaAutomaticFailoverEnabled",
+ "setDfsHaNamenodes",
+ "setDfsNameServices",
+ "setDftSatisfyRate",
+ "setEndian",
+ "setFloatPrecision",
+ "setFreqType",
+ "setGroupSizeInByte",
+ "setHdfsIp",
+ "setHdfsPort",
+ "setHdfsSitePath",
+ "setKerberosKeytabFilePath",
+ "setKerberosPrincipal",
+ "setMaxNumberOfPointsInPage",
+ "setMaxNumberOfIndexItemsInNode",
+ "setMaxStringLength",
+ "setPageCheckSizeThreshold",
+ "setPageSizeInByte",
+ "setPlaMaxError",
+ "setRleBitWidth",
+ "setSdtMaxError",
+ "setTimeEncoder",
+ "setTimeSeriesDataType",
+ "setTSFileStorageFs",
+ "setUseKerberos",
+ "setValueEncoder"
+ };
+ Set<String> newSetters = Arrays.stream(TSFileConfig.class.getMethods())
+ .map(Method::getName)
+ .filter(s -> s.startsWith("set"))
+ .filter(s -> !Arrays.asList(addedSetters).contains(s))
+ .collect(Collectors.toSet());
+ assertTrue(
+ String.format(
+ "New setters in TSFileConfig are detected, please add them to " +
+ "org.apache.iotdb.flink.tsfile.util.TSFileConfigUtil. The setters need to be added: %s",
+ newSetters),
+ newSetters.isEmpty());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
index 7fe0ea0..89cd9b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.BloomFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,7 +93,7 @@ public class TimeSeriesMetadataCache {
return null;
}
TsFileSequenceReader reader = FileReaderManager.getInstance().get(key.filePath, true);
- return reader.readDeviceMetadata(key.device).get(key.measurement);
+ return reader.readMeasurementMetadata(new Path(key.device, key.measurement));
}
cacheRequestNum.incrementAndGet();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java
index 6870e53..9021dd6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java
@@ -64,7 +64,7 @@ public class TsFileMetaDataCache {
if (deviceIndexMapEntrySize == 0 && value.getDeviceMetadataIndex() != null
&& value.getDeviceMetadataIndex().size() > 0) {
deviceIndexMapEntrySize = RamUsageEstimator
- .sizeOf(value.getDeviceMetadataIndex().entrySet().iterator().next());
+ .sizeOf(value.getDeviceMetadataIndex().iterator().next());
}
// totalChunkNum, invalidChunkNum
long valueSize = 4 + 4L;
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
index 68c3037..7ff2011 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
@@ -19,16 +19,6 @@
package org.apache.iotdb.db.tools;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
-import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
-import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.utils.BloomFilter;
-import org.apache.iotdb.tsfile.utils.Pair;
-
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
@@ -37,6 +27,15 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.utils.BloomFilter;
+import org.apache.iotdb.tsfile.utils.MetadataIndex;
public class TsFileSketchTool {
@@ -62,16 +61,13 @@ public class TsFileSketchTool {
TsFileSequenceReader reader = new TsFileSequenceReader(filename);
TsFileMetadata tsFileMetaData = reader.readFileMetadata();
List<String> tsDeviceSortedList = tsFileMetaData.getDeviceMetadataIndex()
- .keySet()
- .stream()
- .sorted().collect(Collectors.toList());
+ .stream().map(MetadataIndex::getName).sorted().collect(Collectors.toList());
Map<String, Map<String, List<ChunkMetadata>>> tsDeviceSeriesMetadataMap = new LinkedHashMap<>();
for (String deviceId : tsDeviceSortedList) {
Map<String, List<ChunkMetadata>> seriesMetadataMap =
reader.readChunkMetadataInDevice(deviceId);
tsDeviceSeriesMetadataMap.put(deviceId, seriesMetadataMap);
}
-
// begin print
StringBuilder str1 = new StringBuilder();
@@ -89,8 +85,9 @@ public class TsFileSketchTool {
+ reader.readVersionNumber());
// device begins
for (
- Entry<String, Map<String, List<ChunkMetadata>>> entry : tsDeviceSeriesMetadataMap.entrySet()) {
- printlnBoth(pw, str1.toString() + "\t[Chunks] of "+ entry.getKey() +
+ Entry<String, Map<String, List<ChunkMetadata>>> entry : tsDeviceSeriesMetadataMap
+ .entrySet()) {
+ printlnBoth(pw, str1.toString() + "\t[Chunks] of " + entry.getKey() +
", num of Chunks:" + entry.getValue().size());
// chunk begins
long chunkEndPos = 0;
@@ -108,7 +105,8 @@ public class TsFileSketchTool {
printlnBoth(pw,
String.format("%20s", "") + "|\t\t" + chunk.getHeader().getNumOfPages() + " pages");
chunkEndPos =
- chunkMetaData.getOffsetOfChunkHeader() + chunk.getHeader().getSerializedSize() + chunk
+ chunkMetaData.getOffsetOfChunkHeader() + chunk.getHeader().getSerializedSize()
+ + chunk
.getHeader().getDataSize();
}
}
@@ -128,19 +126,18 @@ public class TsFileSketchTool {
// metadata begins
if (tsDeviceSortedList.isEmpty()) {
- printlnBoth(pw, String.format("%20s", reader.getFileMetadataPos() - 1)
- + "|\t[marker] 2");
+ printlnBoth(pw, String.format("%20s", reader.getFileMetadataPos() - 1)
+ + "|\t[marker] 2");
} else {
printlnBoth(pw,
String.format("%20s", (tsFileMetaData.getDeviceMetadataIndex()
- .get(tsDeviceSortedList.get(0))).left - 1)
- + "|\t[marker] 2");
+ .get(0).getOffset() - 1)
+ + "|\t[marker] 2"));
}
- for (Entry<String, Pair<Long,Integer>> entry
- : tsFileMetaData.getDeviceMetadataIndex().entrySet()) {
+ for (MetadataIndex metadataIndex : tsFileMetaData.getDeviceMetadataIndex()) {
printlnBoth(pw,
- String.format("%20s", entry.getValue().left)
- + "|\t[DeviceMetadata] of " + entry.getKey());
+ String.format("%20s", metadataIndex.getOffset())
+ + "|\t[DeviceMetadata] of " + metadataIndex.getName());
}
printlnBoth(pw, String.format("%20s", reader.getFileMetadataPos()) + "|\t[TsFileMetaData]");
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 8f4661a..d099860 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -18,14 +18,15 @@
*/
package org.apache.iotdb.db.utils;
-import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.engine.modification.Modification;
-import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.db.query.reader.chunk.DiskChunkLoader;
import org.apache.iotdb.db.query.reader.chunk.MemChunkLoader;
import org.apache.iotdb.db.query.reader.chunk.MemChunkReader;
import org.apache.iotdb.db.query.reader.chunk.metadata.DiskChunkMetadataLoader;
@@ -42,12 +43,6 @@ import org.apache.iotdb.tsfile.read.reader.IChunkReader;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
public class FileLoaderUtils {
private FileLoaderUtils() {
@@ -72,7 +67,8 @@ public class FileLoaderUtils {
public static void updateTsFileResource(TsFileMetadata metaData, TsFileSequenceReader reader,
TsFileResource tsFileResource) throws IOException {
- for (String device : metaData.getDeviceMetadataIndex().keySet()) {
+ List<String> deviceList = reader.getDevicesByMetadata(metaData.getDeviceMetadataIndex());
+ for (String device : deviceList) {
Map<String, TimeseriesMetadata> chunkMetadataListInOneDevice = reader
.readDeviceMetadata(device);
for (TimeseriesMetadata timeseriesMetaData : chunkMetadataListInOneDevice.values()) {
@@ -84,7 +80,6 @@ public class FileLoaderUtils {
/**
- *
* @param resource TsFile
* @param seriesPath Timeseries path
* @param allSensors measurements queried at the same time of this device
@@ -128,6 +123,7 @@ public class FileLoaderUtils {
/**
* load all chunk metadata of one time series in one file.
+ *
* @param timeSeriesMetadata the corresponding TimeSeriesMetadata in that file.
*/
public static List<ChunkMetadata> loadChunkMetadataList(TimeseriesMetadata timeSeriesMetadata)
@@ -138,6 +134,7 @@ public class FileLoaderUtils {
/**
* load all page readers in one chunk that satisfying the timeFilter
+ *
* @param chunkMetaData the corresponding chunk metadata
* @param timeFilter it should be a TimeFilter instead of a ValueFilter
*/
@@ -159,7 +156,8 @@ public class FileLoaderUtils {
return chunkReader.loadPageReaderList();
}
- public static List<ChunkMetadata> getChunkMetadataList(Path path, String filePath) throws IOException {
+ public static List<ChunkMetadata> getChunkMetadataList(Path path, String filePath)
+ throws IOException {
TsFileSequenceReader tsFileReader = FileReaderManager.getInstance().get(filePath, true);
return tsFileReader.getChunkMetadataList(path);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
index d1ddbab..4ee6351 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
@@ -36,7 +36,7 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.MetadataIndex;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,7 +48,7 @@ public class MergeUtils {
private MergeUtils() {
// util class
}
-
+
public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter chunkWriter) {
switch (chunkWriter.getDataType()) {
case TEXT:
@@ -148,8 +148,10 @@ public class MergeUtils {
public static long getFileMetaSize(TsFileResource seqFile, TsFileSequenceReader sequenceReader) throws IOException {
long minPos = Long.MAX_VALUE;
TsFileMetadata fileMetaData = sequenceReader.readFileMetadata();
- for (Pair<Long, Integer> deviceMetaData : fileMetaData.getDeviceMetadataIndex().values()) {
- long timeseriesMetaDataEndOffset = deviceMetaData.left + deviceMetaData.right;
+ // FIXME
+ List<MetadataIndex> metadataIndexList = fileMetaData.getDeviceMetadataIndex();
+ for(int i = 1; i < metadataIndexList.size(); i++){
+ long timeseriesMetaDataEndOffset = metadataIndexList.get(i).getOffset();
minPos = timeseriesMetaDataEndOffset < minPos ? timeseriesMetaDataEndOffset : minPos;
}
return seqFile.getFileSize() - minPos;
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index 71726cb..9bbce15 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -43,7 +43,6 @@ import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,8 +75,9 @@ public class TsFileRecoverPerformer {
/**
* 1. recover the TsFile by RestorableTsFileIOWriter and truncate the file to remaining corrected
* data 2. redo the WALs to recover unpersisted data 3. flush and close the file 4. clean WALs
- * @return a RestorableTsFileIOWriter if the file is not closed before crush, so this writer
- * can be used to continue writing
+ *
+ * @return a RestorableTsFileIOWriter if the file is not closed before crush, so this writer can
+ * be used to continue writing
*/
public RestorableTsFileIOWriter recover() throws StorageGroupProcessorException {
@@ -113,7 +113,8 @@ public class TsFileRecoverPerformer {
}
// write .resource file
long fileVersion =
- Long.parseLong(resource.getFile().getName().split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[1]);
+ Long.parseLong(
+ resource.getFile().getName().split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[1]);
resource.setHistoricalVersions(Collections.singleton(fileVersion));
resource.serialize();
}
@@ -158,10 +159,8 @@ public class TsFileRecoverPerformer {
try (TsFileSequenceReader reader =
new TsFileSequenceReader(resource.getFile().getAbsolutePath(), false)) {
TsFileMetadata fileMetadata = reader.readFileMetadata();
-
- Map<String, Pair<Long, Integer>> deviceMetaDataMap = fileMetadata.getDeviceMetadataIndex();
- for (Map.Entry<String, Pair<Long, Integer>> entry: deviceMetaDataMap.entrySet()) {
- String deviceId = entry.getKey();
+ List<String> devices = reader.getDevicesByMetadata(fileMetadata.getDeviceMetadataIndex());
+ for (String deviceId : devices) {
for (TimeseriesMetadata timeseriesMetadata : reader.readDeviceMetadata(deviceId).values()) {
resource.updateStartTime(deviceId, timeseriesMetadata.getStatistics().getStartTime());
resource.updateStartTime(deviceId, timeseriesMetadata.getStatistics().getEndTime());
diff --git a/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/DefaultSource.scala b/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/DefaultSource.scala
index f230491..248359a 100755
--- a/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/DefaultSource.scala
+++ b/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/DefaultSource.scala
@@ -115,7 +115,7 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister {
}
if (options.getOrElse(DefaultSource.isNarrowForm, "").equals("narrow_form")) {
- val deviceNames = tsFileMetaData.getDeviceMetadataIndex.keySet()
+ val deviceNames = reader.getDevicesByMetadata(tsFileMetaData.getDeviceMetadataIndex)
val measurementNames = reader.getAllMeasurements.keySet()
// construct queryExpression based on queriedSchema and filters
diff --git a/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/NarrowConverter.scala b/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/NarrowConverter.scala
index f922dd6..9c820d0 100644
--- a/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/NarrowConverter.scala
+++ b/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/NarrowConverter.scala
@@ -166,7 +166,7 @@ object NarrowConverter extends Converter {
* @return query expression
*/
def toQueryExpression(schema: StructType,
- device_name: util.Set[String],
+ device_name: util.List[String],
measurement_name: util.Set[String],
filters: Seq[Filter],
in: TsFileSequenceReader,
diff --git a/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/WideConverter.scala b/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/WideConverter.scala
index e5b7de2..dc1f20a 100755
--- a/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/WideConverter.scala
+++ b/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/WideConverter.scala
@@ -62,7 +62,7 @@ object WideConverter extends Converter {
def getSeries(tsFileMetaData: TsFileMetadata, reader: TsFileSequenceReader): util.ArrayList[Series] = {
val series = new util.ArrayList[Series]()
- val devices = tsFileMetaData.getDeviceMetadataIndex.keySet()
+ val devices = tsFileMetaData.getDeviceMetadataIndex
val measurements = reader.getAllMeasurements
devices.foreach(d => {
@@ -92,7 +92,7 @@ object WideConverter extends Converter {
val in = new HDFSInput(f.getPath, conf)
val reader = new TsFileSequenceReader(in)
val tsFileMetaData = reader.readFileMetadata
- val devices = tsFileMetaData.getDeviceMetadataIndex.keySet()
+ val devices = tsFileMetaData.getDeviceMetadataIndex
val measurements = reader.getAllMeasurements
devices.foreach(d => {
@@ -133,7 +133,7 @@ object WideConverter extends Converter {
} else { // Remove nonexistent schema according to the current file's metadata.
// This may happen when queried TsFiles in the same folder do not have the same schema.
- val devices = tsFileMetaData.getDeviceMetadataIndex.keySet()
+ val devices = tsFileMetaData.getDeviceMetadataIndex
val measurementIds = reader.getAllMeasurements.keySet()
requiredSchema.foreach(f => {
if (!QueryConstant.RESERVED_TIME.equals(f.name)) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
index c5c8627..4ef34df 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
@@ -83,6 +83,10 @@ public class TSFileConfig implements Serializable {
*/
private int maxNumberOfPointsInPage = 1024 * 1024;
/**
+ * The maximum number of index items in a metadataIndex node, default value is 1024
+ */
+ private int maxNumberOfIndexItemsInNode = 5;
+ /**
* Data type for input timestamp, TsFile supports INT32 or INT64.
*/
private String timeSeriesDataType = "INT64";
@@ -229,6 +233,14 @@ public class TSFileConfig implements Serializable {
this.maxNumberOfPointsInPage = maxNumberOfPointsInPage;
}
+ public int getMaxNumberOfIndexItemsInNode() {
+ return maxNumberOfIndexItemsInNode;
+ }
+
+ public void setMaxNumberOfIndexItemsInNode(int maxNumberOfIndexItemsInNode) {
+ this.maxNumberOfIndexItemsInNode = maxNumberOfIndexItemsInNode;
+ }
+
public String getTimeSeriesDataType() {
return timeSeriesDataType;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java
index 2ccee2f..ea34b8e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java
@@ -120,6 +120,8 @@ public class TSFileDescriptor {
}
conf.setMaxNumberOfPointsInPage(Integer.parseInt(
properties.getProperty("max_number_of_points_in_page", Integer.toString(conf.getMaxNumberOfPointsInPage()))));
+ conf.setMaxNumberOfIndexItemsInNode(Integer.parseInt(
+ properties.getProperty("max_number_of_index_items_in_node", Integer.toString(conf.getMaxNumberOfIndexItemsInNode()))));
conf.setTimeSeriesDataType(properties.getProperty("time_series_data_type", conf.getTimeSeriesDataType()));
conf.setMaxStringLength(
Integer.parseInt(properties.getProperty("max_string_length", Integer.toString(conf.getMaxStringLength()))));
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
index e0fa68f..0522ed7 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
@@ -19,15 +19,14 @@
package org.apache.iotdb.tsfile.file.metadata;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.controller.IChunkMetadataLoader;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.List;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.controller.IChunkMetadataLoader;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
public class TimeseriesMetadata {
@@ -36,9 +35,10 @@ public class TimeseriesMetadata {
private String measurementId;
private TSDataType tsDataType;
-
+
private Statistics<?> statistics;
-// modified is true when there are modifications of the series, or from unseq file
+
+ // modified is true when there are modifications of the series, or from unseq file
private boolean modified;
private IChunkMetadataLoader chunkMetadataLoader;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
index 3195c08..4fbeb3f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
@@ -19,19 +19,19 @@
package org.apache.iotdb.tsfile.file.metadata;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.ChildMetadataIndexType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.BloomFilter;
+import org.apache.iotdb.tsfile.utils.MetadataIndex;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
/**
* TSFileMetaData collects all metadata info and saves in its data structure.
@@ -48,8 +48,8 @@ public class TsFileMetadata {
// bloom filter
private BloomFilter bloomFilter;
- // DeviceId -> offset and length of Map<String, TimeseriesMetadata>
- private Map<String, Pair<Long, Integer>> deviceMetadataIndex;
+ // List of <name, offset, childMetadataIndexType>
+ private List<MetadataIndex> deviceMetadataIndex;
// offset -> version
private List<Pair<Long, Long>> versionInfo;
@@ -68,16 +68,17 @@ public class TsFileMetadata {
// deviceMetadataIndex
int deviceNum = ReadWriteIOUtils.readInt(buffer);
- Map<String, Pair<Long, Integer>> deviceMetaDataMap = new HashMap<>();
+ List<MetadataIndex> deviceMetaDataList = new ArrayList<>();
if (deviceNum > 0) {
for (int i = 0; i < deviceNum; i++) {
- String deviceId = ReadWriteIOUtils.readString(buffer);
+ String name = ReadWriteIOUtils.readString(buffer);
long offset = ReadWriteIOUtils.readLong(buffer);
- int length = ReadWriteIOUtils.readInt(buffer);
- deviceMetaDataMap.put(deviceId, new Pair<>(offset, length));
+ ChildMetadataIndexType type = ChildMetadataIndexType
+ .deserialize(ReadWriteIOUtils.readShort(buffer));
+ deviceMetaDataList.add(new MetadataIndex(name, offset, type));
}
}
- fileMetaData.setDeviceMetadataIndex(deviceMetaDataMap);
+ fileMetaData.setDeviceMetadataIndex(deviceMetaDataList);
fileMetaData.totalChunkNum = ReadWriteIOUtils.readInt(buffer);
fileMetaData.invalidChunkNum = ReadWriteIOUtils.readInt(buffer);
@@ -123,10 +124,11 @@ public class TsFileMetadata {
// deviceMetadataIndex
if (deviceMetadataIndex != null) {
byteLen += ReadWriteIOUtils.write(deviceMetadataIndex.size(), outputStream);
- for (Map.Entry<String, Pair<Long, Integer>> entry : deviceMetadataIndex.entrySet()) {
- byteLen += ReadWriteIOUtils.write(entry.getKey(), outputStream);
- byteLen += ReadWriteIOUtils.write(entry.getValue().left, outputStream);
- byteLen += ReadWriteIOUtils.write(entry.getValue().right, outputStream);
+ for (MetadataIndex metadataIndex : deviceMetadataIndex) {
+ byteLen += ReadWriteIOUtils.write(metadataIndex.getName(), outputStream);
+ byteLen += ReadWriteIOUtils.write(metadataIndex.getOffset(), outputStream);
+ byteLen += ReadWriteIOUtils
+ .write(metadataIndex.getChildMetadataIndexType().serialize(), outputStream);
}
} else {
byteLen += ReadWriteIOUtils.write(0, outputStream);
@@ -152,7 +154,7 @@ public class TsFileMetadata {
/**
* use the given outputStream to serialize bloom filter.
*
- * @param outputStream -output stream to determine byte length
+ * @param outputStream -output stream to determine byte length
* @return -byte length
*/
public int serializeBloomFilter(OutputStream outputStream, Set<Path> paths)
@@ -208,11 +210,11 @@ public class TsFileMetadata {
this.metaOffset = metaOffset;
}
- public Map<String, Pair<Long, Integer>> getDeviceMetadataIndex() {
+ public List<MetadataIndex> getDeviceMetadataIndex() {
return deviceMetadataIndex;
}
- public void setDeviceMetadataIndex(Map<String, Pair<Long, Integer>> deviceMetadataIndex) {
+ public void setDeviceMetadataIndex(List<MetadataIndex> deviceMetadataIndex) {
this.deviceMetadataIndex = deviceMetadataIndex;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/ChildMetadataIndexType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/ChildMetadataIndexType.java
new file mode 100644
index 0000000..615d457
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/ChildMetadataIndexType.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.file.metadata.enums;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public enum ChildMetadataIndexType {
+ DEVICE_INDEX, DEVICE, MEASUREMENT_INDEX, MEASUREMENT;
+
+ /**
+ * deserialize short number.
+ *
+ * @param i short number
+ * @return ChildMetadataIndexType
+ */
+ public static ChildMetadataIndexType deserialize(short i) {
+ if (i >= 4) {
+ throw new IllegalArgumentException("Invalid input: " + i);
+ }
+ switch (i) {
+ case 0:
+ return DEVICE_INDEX;
+ case 1:
+ return DEVICE;
+ case 2:
+ return MEASUREMENT_INDEX;
+ default:
+ return MEASUREMENT;
+ }
+ }
+
+ public static ChildMetadataIndexType deserializeFrom(ByteBuffer buffer) {
+ return deserialize(buffer.getShort());
+ }
+
+ public static int getSerializedSize() {
+ return Short.BYTES;
+ }
+
+ public void serializeTo(ByteBuffer byteBuffer) {
+ byteBuffer.putShort(serialize());
+ }
+
+ public void serializeTo(DataOutputStream outputStream) throws IOException {
+ outputStream.writeShort(serialize());
+ }
+
+ /**
+ * return a serialize child metadata index type.
+ *
+ * @return -enum type
+ */
+ public short serialize() {
+ switch (this) {
+ case DEVICE_INDEX:
+ return 0;
+ case DEVICE:
+ return 1;
+ case MEASUREMENT_INDEX:
+ return 2;
+ case MEASUREMENT:
+ return 3;
+ default:
+ return -1;
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index eb0c6ae..601209f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -18,6 +18,23 @@
*/
package org.apache.iotdb.tsfile.read;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.compress.IUnCompressor;
@@ -30,6 +47,7 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.ChildMetadataIndexType;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
@@ -38,6 +56,7 @@ import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
import org.apache.iotdb.tsfile.read.reader.TsFileInput;
+import org.apache.iotdb.tsfile.utils.MetadataIndex;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.utils.VersionUtils;
@@ -45,15 +64,6 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
public class TsFileSequenceReader implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(TsFileSequenceReader.class);
@@ -88,7 +98,7 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* construct function for TsFileSequenceReader.
*
- * @param file -given file name
+ * @param file -given file name
* @param loadMetadataSize -whether load meta data size
*/
public TsFileSequenceReader(String file, boolean loadMetadataSize) throws IOException {
@@ -129,7 +139,7 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* construct function for TsFileSequenceReader.
*
- * @param input -given input
+ * @param input -given input
* @param loadMetadataSize -load meta data size
*/
public TsFileSequenceReader(TsFileInput input, boolean loadMetadataSize) throws IOException {
@@ -147,10 +157,10 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* construct function for TsFileSequenceReader.
*
- * @param input the input of a tsfile. The current position should be a markder and
- * then a chunk Header, rather than the magic number
- * @param fileMetadataPos the position of the file metadata in the TsFileInput from the beginning
- * of the input to the current position
+ * @param input the input of a tsfile. The current position should be a markder and then a chunk
+ * Header, rather than the magic number
+ * @param fileMetadataPos the position of the file metadata in the TsFileInput from the beginning
+ * of the input to the current position
* @param fileMetadataSize the byte size of the file metadata in the input
*/
public TsFileSequenceReader(TsFileInput input, long fileMetadataPos, int fileMetadataSize) {
@@ -213,7 +223,7 @@ public class TsFileSequenceReader implements AutoCloseable {
* this function does not modify the position of the file reader.
*
* @param movePosition whether move the position of the file reader after reading the magic header
- * to the end of the magic head string.
+ * to the end of the magic head string.
*/
public String readHeadMagic(boolean movePosition) throws IOException {
ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.getBytes().length);
@@ -244,6 +254,7 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* this function does not modify the position of the file reader.
+ *
* @throws IOException io error
*/
public TsFileMetadata readFileMetadata() throws IOException {
@@ -254,8 +265,8 @@ public class TsFileSequenceReader implements AutoCloseable {
}
/**
- * this function reads measurements and TimeseriesMetaDatas in given device
- * Thread Safe
+ * this function reads measurements and TimeseriesMetaDatas in given device Thread Safe
+ *
* @param device name
* @return the map measurementId -> TimeseriesMetaData in one device
* @throws IOException io error
@@ -270,7 +281,7 @@ public class TsFileSequenceReader implements AutoCloseable {
if (cachedDeviceMetadata.containsKey(device)) {
return cachedDeviceMetadata.get(device);
}
- } finally{
+ } finally {
cacheLock.readLock().unlock();
}
@@ -280,9 +291,6 @@ public class TsFileSequenceReader implements AutoCloseable {
return cachedDeviceMetadata.get(device);
}
readFileMetadata();
- if (!tsFileMetaData.getDeviceMetadataIndex().containsKey(device)) {
- return new HashMap<>();
- }
Map<String, TimeseriesMetadata> deviceMetadata = readDeviceMetadataFromDisk(device);
cachedDeviceMetadata.put(device, deviceMetadata);
return deviceMetadata;
@@ -291,21 +299,83 @@ public class TsFileSequenceReader implements AutoCloseable {
}
}
- private Map<String, TimeseriesMetadata> readDeviceMetadataFromDisk(String device) throws IOException {
+ private Map<String, TimeseriesMetadata> readDeviceMetadataFromDisk(String device)
+ throws IOException {
readFileMetadata();
- if (!tsFileMetaData.getDeviceMetadataIndex().containsKey(device)) {
- return Collections.emptyMap();
- }
- Pair<Long, Integer> deviceMetadataIndex = tsFileMetaData.getDeviceMetadataIndex().get(device);
+ List<TimeseriesMetadata> timeseriesMetadataList = getDeviceTimeseriesMetadata(device);
Map<String, TimeseriesMetadata> deviceMetadata = new HashMap<>();
- ByteBuffer buffer = readData(deviceMetadataIndex.left, deviceMetadataIndex.right);
- while (buffer.hasRemaining()) {
- TimeseriesMetadata tsMetaData = TimeseriesMetadata.deserializeFrom(buffer);
- deviceMetadata.put(tsMetaData.getMeasurementId(), tsMetaData);
+ for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
+ deviceMetadata.put(timeseriesMetadata.getMeasurementId(), timeseriesMetadata);
}
return deviceMetadata;
}
+ public TimeseriesMetadata readMeasurementMetadata(Path path) throws IOException {
+ readFileMetadata();
+ List<MetadataIndex> deviceMetadataIndexList = tsFileMetaData.getDeviceMetadataIndex();
+ Pair<MetadataIndex, Long> metadataIndexPair = getDeviceMetaDataAndEndOffset(
+ deviceMetadataIndexList, path.getDevice());
+ ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(),
+ metadataIndexPair.right);
+ while (!metadataIndexPair.left.getChildMetadataIndexType()
+ .equals(ChildMetadataIndexType.MEASUREMENT)) {
+ List<MetadataIndex> measurementMetadataIndexList = new ArrayList<>();
+ while (buffer.hasRemaining()) {
+ measurementMetadataIndexList.add(MetadataIndex.deserializeFrom(buffer));
+ }
+ metadataIndexPair = getMeasurementMetaDataAndEndOffset(
+ measurementMetadataIndexList, path.getMeasurement());
+ }
+ List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
+ buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
+ while (buffer.hasRemaining()) {
+ timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer));
+ }
+ String[] measurementNameList = timeseriesMetadataList.stream()
+ .map(TimeseriesMetadata::getMeasurementId).collect(Collectors.toList())
+ .toArray(new String[timeseriesMetadataList.size()]);
+
+ return timeseriesMetadataList
+ .get(Arrays.binarySearch(measurementNameList, path.getMeasurement()));
+ }
+
+ public List<String> getDevicesByMetadata(List<MetadataIndex> metadataIndexList)
+ throws IOException {
+ Set<String> deviceSet = new TreeSet<>();
+ for (int i = 0; i < metadataIndexList.size() - 1; i++) {
+ MetadataIndex metadataIndex = metadataIndexList.get(i);
+ switch (metadataIndex.getChildMetadataIndexType()) {
+ case DEVICE:
+ case MEASUREMENT:
+ case MEASUREMENT_INDEX:
+ for (MetadataIndex index : metadataIndexList) {
+ if (!index.getName().equals("")) {
+ deviceSet.add(index.getName());
+ }
+ }
+ break;
+ case DEVICE_INDEX:
+ ByteBuffer buffer = readData(metadataIndex.getOffset(),
+ metadataIndexList.get(i + 1).getOffset());
+ List<MetadataIndex> nextMetadataIndexList = new ArrayList<>();
+ while (buffer.hasRemaining()) {
+ MetadataIndex nextMetadataIndex = MetadataIndex.deserializeFrom(buffer);
+ nextMetadataIndexList.add(nextMetadataIndex);
+
+ }
+ for (String name : getDevicesByMetadata(nextMetadataIndexList)) {
+ if (!name.equals("")) {
+ deviceSet.add(name);
+ }
+ }
+ break;
+ default:
+ throw new IOException("Error TsFileMetadata to get devices");
+ }
+
+ }
+ return new ArrayList<>(deviceSet);
+ }
/**
* read all ChunkMetaDatas of given device
@@ -314,28 +384,23 @@ public class TsFileSequenceReader implements AutoCloseable {
* @return measurement -> ChunkMetadata list
* @throws IOException io error
*/
- public Map<String, List<ChunkMetadata>> readChunkMetadataInDevice(String device) throws IOException {
+ public Map<String, List<ChunkMetadata>> readChunkMetadataInDevice(String device)
+ throws IOException {
if (tsFileMetaData == null) {
readFileMetadata();
}
- if (tsFileMetaData.getDeviceMetadataIndex() == null
- || !tsFileMetaData.getDeviceMetadataIndex().containsKey(device)) {
- return new HashMap<>();
- }
- Pair<Long, Integer> deviceMetaData = tsFileMetaData.getDeviceMetadataIndex().get(device);
- ByteBuffer buffer = readData(deviceMetaData.left, deviceMetaData.right);
long start = 0;
int size = 0;
- while (buffer.hasRemaining()) {
- TimeseriesMetadata timeseriesMetaData = TimeseriesMetadata.deserializeFrom(buffer);
+ List<TimeseriesMetadata> timeseriesMetadataMap = getDeviceTimeseriesMetadata(device);
+ for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataMap) {
if (start == 0) {
- start = timeseriesMetaData.getOffsetOfChunkMetaDataList();
+ start = timeseriesMetadata.getOffsetOfChunkMetaDataList();
}
- size += timeseriesMetaData.getDataSizeOfChunkMetaDataList();
+ size += timeseriesMetadata.getDataSizeOfChunkMetaDataList();
}
// read buffer of all ChunkMetadatas of this device
- buffer = readData(start, size);
+ ByteBuffer buffer = readData(start, size);
Map<String, List<ChunkMetadata>> seriesMetadata = new HashMap<>();
@@ -364,20 +429,125 @@ public class TsFileSequenceReader implements AutoCloseable {
if (tsFileMetaData == null) {
readFileMetadata();
}
- Map<String, Pair<Long, Integer>> deviceMetaDataMap = tsFileMetaData.getDeviceMetadataIndex();
- for (Map.Entry<String, Pair<Long, Integer>> entry : deviceMetaDataMap.entrySet()) {
- String deviceId = entry.getKey();
- Pair<Long, Integer> deviceMetaData = entry.getValue();
- ByteBuffer buffer = readData(deviceMetaData.left, deviceMetaData.right);
- while (buffer.hasRemaining()) {
- TimeseriesMetadata tsMetaData = TimeseriesMetadata.deserializeFrom(buffer);
- paths.add(new Path(deviceId, tsMetaData.getMeasurementId()));
+ List<MetadataIndex> metadataIndexList = tsFileMetaData.getDeviceMetadataIndex();
+ for (int i = 0; i < metadataIndexList.size() - 1; i++) {
+ ByteBuffer buffer = readData(metadataIndexList.get(i).getOffset(),
+ metadataIndexList.get(i + 1).getOffset());
+ Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new TreeMap<>();
+ analyzeMetadataIndex(metadataIndexList.get(i), buffer, timeseriesMetadataMap);
+ for (Entry<String, List<TimeseriesMetadata>> entry : timeseriesMetadataMap.entrySet()) {
+ for (TimeseriesMetadata timeseriesMetadata : entry.getValue()) {
+ paths.add(new Path(entry.getKey(), timeseriesMetadata.getMeasurementId()));
+ }
}
}
return paths;
}
/**
+ * Traverse the metadata index from top-level MetadataIndex to get TimeseriesMetadatas
+ *
+ * @param metadataIndex top-level MetadataIndex
+ * @param buffer byte buffer
+ * @param timeseriesMetadataMap map deviceId -> timeseriesMetadata list
+ */
+ private void analyzeMetadataIndex(MetadataIndex metadataIndex, ByteBuffer buffer,
+ Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap) throws IOException {
+ String deviceId;
+ switch (metadataIndex.getChildMetadataIndexType()) {
+ case DEVICE_INDEX:
+ case DEVICE:
+ case MEASUREMENT_INDEX:
+ List<MetadataIndex> metadataIndexList = new ArrayList<>();
+ while (buffer.hasRemaining()) {
+ metadataIndexList.add(MetadataIndex.deserializeFrom(buffer));
+ }
+ for (int i = 0; i < metadataIndexList.size() - 1; i++) {
+ ByteBuffer nextBuffer = readData(metadataIndexList.get(i).getOffset(),
+ metadataIndexList.get(i + 1).getOffset());
+ analyzeMetadataIndex(metadataIndexList.get(i), nextBuffer, timeseriesMetadataMap);
+ }
+ break;
+ case MEASUREMENT:
+ deviceId = metadataIndex.getName();
+ List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
+ while (buffer.hasRemaining()) {
+ timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer));
+ }
+ if (timeseriesMetadataMap.containsKey(deviceId)) {
+ timeseriesMetadataList.addAll(timeseriesMetadataMap.get(deviceId));
+ }
+ timeseriesMetadataMap.put(deviceId, timeseriesMetadataList);
+ break;
+ }
+ }
+
+ private List<TimeseriesMetadata> getDeviceTimeseriesMetadata(String device) throws IOException {
+ List<MetadataIndex> deviceMetadataIndexList = tsFileMetaData.getDeviceMetadataIndex();
+ Pair<MetadataIndex, Long> metadataIndexPair = getDeviceMetaDataAndEndOffset(
+ deviceMetadataIndexList, device);
+ ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
+ Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new TreeMap<>();
+ analyzeMetadataIndex(metadataIndexPair.left, buffer, timeseriesMetadataMap);
+ List<TimeseriesMetadata> deviceTimeseriesMetadata = new ArrayList<>();
+ for (List<TimeseriesMetadata> timeseriesMetadataList : timeseriesMetadataMap.values()) {
+ deviceTimeseriesMetadata.addAll(timeseriesMetadataList);
+ }
+ return deviceTimeseriesMetadata;
+ }
+
+ private Pair<MetadataIndex, Long> getDeviceMetaDataAndEndOffset(
+ List<MetadataIndex> metadataIndexList, String device) throws IOException {
+ int size = metadataIndexList.size();
+ String[] deviceNameList = metadataIndexList.stream().map(MetadataIndex::getName).collect(
+ Collectors.toList()).toArray(new String[size]);
+ int deviceIndex = Arrays.binarySearch(deviceNameList, device);
+ if (deviceIndex < 0) {
+ deviceIndex = -deviceIndex - 2;
+ }
+ if (deviceIndex == size - 1) {
+ deviceIndex--;
+ }
+ MetadataIndex metadataIndex = metadataIndexList.get(deviceIndex);
+ if (!metadataIndex.getChildMetadataIndexType().equals(ChildMetadataIndexType.DEVICE_INDEX)) {
+ return new Pair<>(metadataIndex, metadataIndexList.get(deviceIndex + 1).getOffset());
+ }
+ List<MetadataIndex> nextMetadataIndexList = new ArrayList<>();
+ ByteBuffer buffer = readData(metadataIndex.getOffset(),
+ metadataIndexList.get(deviceIndex + 1).getOffset());
+ while (buffer.hasRemaining()) {
+ nextMetadataIndexList.add(MetadataIndex.deserializeFrom(buffer));
+ }
+ return getDeviceMetaDataAndEndOffset(nextMetadataIndexList, device);
+ }
+
+ private Pair<MetadataIndex, Long> getMeasurementMetaDataAndEndOffset(
+ List<MetadataIndex> metadataIndexList, String measurement) throws IOException {
+ int size = metadataIndexList.size();
+ String[] measurementnameList = metadataIndexList.stream().map(MetadataIndex::getName).collect(
+ Collectors.toList()).toArray(new String[size]);
+ int deviceIndex = Arrays.binarySearch(measurementnameList, measurement);
+ if (deviceIndex < 0) {
+ deviceIndex = -deviceIndex - 2;
+ }
+ if (deviceIndex == size - 1) {
+ deviceIndex--;
+ }
+ MetadataIndex metadataIndex = metadataIndexList.get(deviceIndex);
+ if (!metadataIndex.getChildMetadataIndexType()
+ .equals(ChildMetadataIndexType.MEASUREMENT_INDEX)) {
+ return new Pair<>(metadataIndex, metadataIndexList.get(deviceIndex + 1).getOffset());
+ }
+ List<MetadataIndex> nextMetadataIndexList = new ArrayList<>();
+ ByteBuffer buffer = readData(metadataIndex.getOffset(),
+ metadataIndexList.get(deviceIndex + 1).getOffset());
+ while (buffer.hasRemaining()) {
+ nextMetadataIndexList.add(MetadataIndex.deserializeFrom(buffer));
+ }
+ return getDeviceMetaDataAndEndOffset(nextMetadataIndexList, measurement);
+ }
+
+ /**
* read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER. <br>
* This method is not threadsafe.
*
@@ -391,7 +561,7 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER.
*
- * @param position the offset of the chunk group footer in the file
+ * @param position the offset of the chunk group footer in the file
* @param markerRead true if the offset does not contains the marker , otherwise false
* @return a CHUNK_GROUP_FOOTER
* @throws IOException io error
@@ -424,9 +594,9 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* read the chunk's header.
*
- * @param position the file offset of this chunk's header
+ * @param position the file offset of this chunk's header
* @param chunkHeaderSize the size of chunk's header
- * @param markerRead true if the offset does not contains the marker , otherwise false
+ * @param markerRead true if the offset does not contains the marker , otherwise false
*/
private ChunkHeader readChunkHeader(long position, int chunkHeaderSize, boolean markerRead)
throws IOException {
@@ -531,13 +701,13 @@ public class TsFileSequenceReader implements AutoCloseable {
* changed.
*
* @param position the start position of data in the tsFileInput, or the current position if
- * position = -1
- * @param size the size of data that want to read
+ * position = -1
+ * @param size the size of data that want to read
* @return data that been read.
*/
private ByteBuffer readData(long position, int size) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(size);
- if (position == -1) {
+ if (position < 0) {
if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) != size) {
throw new IOException("reach the end of the data");
}
@@ -551,6 +721,19 @@ public class TsFileSequenceReader implements AutoCloseable {
}
/**
+ * read data from tsFileInput, from the current position (if position = -1), or the given
+ * position.
+ *
+ * @param start the start position of data in the tsFileInput, or the current position if position
+ * = -1
+ * @param end the end position of data that want to read
+ * @return data that been read.
+ */
+ private ByteBuffer readData(long start, long end) throws IOException {
+ return readData(start, (int) (end - start));
+ }
+
+ /**
* notice, the target bytebuffer are not flipped.
*/
public int readRaw(long position, int length, ByteBuffer target) throws IOException {
@@ -560,10 +743,10 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* Self Check the file and return the position before where the data is safe.
*
- * @param newSchema the schema on each time series in the file
- * @param chunkGroupMetadataList ChunkGroupMetadata List
- * @param fastFinish if true and the file is complete, then newSchema and newMetaData parameter
- * will be not modified.
+ * @param newSchema the schema on each time series in the file
+ * @param chunkGroupMetadataList ChunkGroupMetadata List
+ * @param fastFinish if true and the file is complete, then newSchema and newMetaData parameter
+ * will be not modified.
* @return the position of the file that is fine. All data after the position in the file should
* be truncated.
*/
@@ -589,7 +772,7 @@ public class TsFileSequenceReader implements AutoCloseable {
String deviceID;
int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER
- .getBytes().length;
+ .getBytes().length;
if (fileSize < headerLength) {
return TsFileCheckStatus.INCOMPATIBLE_FILE;
}
@@ -697,10 +880,7 @@ public class TsFileSequenceReader implements AutoCloseable {
* @return List of ChunkMetaData
*/
public List<ChunkMetadata> getChunkMetadataList(Path path) throws IOException {
- Map<String, TimeseriesMetadata> timeseriesMetaDataMap =
- readDeviceMetadata(path.getDevice());
-
- TimeseriesMetadata timeseriesMetaData = timeseriesMetaDataMap.get(path.getMeasurement());
+ TimeseriesMetadata timeseriesMetaData = readMeasurementMetadata(path);
if (timeseriesMetaData == null) {
return new ArrayList<>();
}
@@ -712,7 +892,6 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* get ChunkMetaDatas in given TimeseriesMetaData
*
- * @param timeseriesMetaData
* @return List of ChunkMetaData
*/
public List<ChunkMetadata> readChunkMetaDataList(TimeseriesMetadata timeseriesMetaData)
@@ -732,24 +911,29 @@ public class TsFileSequenceReader implements AutoCloseable {
return chunkMetadataList;
}
-
/**
* get all measurements in this file
+ *
* @return measurement -> datatype
*/
- public Map<String, TSDataType> getAllMeasurements() throws IOException{
+ public Map<String, TSDataType> getAllMeasurements() throws IOException {
if (tsFileMetaData == null) {
readFileMetadata();
}
Map<String, TSDataType> result = new HashMap<>();
- for (Map.Entry<String, Pair<Long, Integer>> entry : tsFileMetaData.getDeviceMetadataIndex()
- .entrySet()) {
+ List<MetadataIndex> metadataIndexList = tsFileMetaData.getDeviceMetadataIndex();
+ for (int i = 0; i < metadataIndexList.size() - 1; i++) {
// read TimeseriesMetaData from file
- ByteBuffer buffer = readData(entry.getValue().left, entry.getValue().right);
- while (buffer.hasRemaining()) {
- TimeseriesMetadata timeserieMetaData = TimeseriesMetadata.deserializeFrom(buffer);
- result.put(timeserieMetaData.getMeasurementId(), timeserieMetaData.getTSDataType());
+ ByteBuffer buffer = readData(metadataIndexList.get(i).getOffset(),
+ metadataIndexList.get(i + 1).getOffset());
+ Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new TreeMap<>();
+ analyzeMetadataIndex(metadataIndexList.get(i), buffer, timeseriesMetadataMap);
+ for (List<TimeseriesMetadata> timeseriesMetadataList : timeseriesMetadataMap.values()) {
+ for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
+ result.put(timeseriesMetadata.getMeasurementId(), timeseriesMetadata.getTSDataType());
+ }
}
+
}
return result;
}
@@ -758,21 +942,18 @@ public class TsFileSequenceReader implements AutoCloseable {
* get device names which has valid chunks in [start, end)
*
* @param start start of the partition
- * @param end end of the partition
+ * @param end end of the partition
* @return device names in range
*/
public List<String> getDeviceNameInRange(long start, long end) throws IOException {
List<String> res = new ArrayList<>();
readFileMetadata();
- for (Map.Entry<String, Pair<Long, Integer>> entry : tsFileMetaData.getDeviceMetadataIndex()
- .entrySet()) {
-
- Map<String, List<ChunkMetadata>> seriesMetadataMap = readChunkMetadataInDevice(
- entry.getKey());
-
+ List<String> devices = getDevicesByMetadata(tsFileMetaData.getDeviceMetadataIndex());
+ for (String device : devices) {
+ Map<String, List<ChunkMetadata>> seriesMetadataMap = readChunkMetadataInDevice(device);
if (hasDataInPartition(seriesMetadataMap, start, end)) {
- res.add(entry.getKey());
+ res.add(device);
}
}
@@ -782,15 +963,16 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* Check if the device has at least one Chunk in this partition
*
- * @param seriesMetadataMap chunkMetaDataList of each measurement
+ * @param seriesMetadataMap chunkMetaDataList of each measurement
* @param start the start position of the space partition
- * @param end the end position of the space partition
+ * @param end the end position of the space partition
*/
private boolean hasDataInPartition(Map<String, List<ChunkMetadata>> seriesMetadataMap,
long start, long end) {
for (List<ChunkMetadata> chunkMetadataList : seriesMetadataMap.values()) {
for (ChunkMetadata chunkMetadata : chunkMetadataList) {
- LocateStatus location = MetadataQuerierByFileImpl.checkLocateStatus(chunkMetadata, start, end);
+ LocateStatus location = MetadataQuerierByFileImpl
+ .checkLocateStatus(chunkMetadata, start, end);
if (location == LocateStatus.in) {
return true;
}
@@ -799,14 +981,11 @@ public class TsFileSequenceReader implements AutoCloseable {
return false;
}
-
/**
- * The location of a chunkGroupMetaData with respect to a space partition constraint.
- * <p>
- * in - the middle point of the chunkGroupMetaData is located in the current space partition.
- * before - the middle point of the chunkGroupMetaData is located before the current space
- * partition. after - the middle point of the chunkGroupMetaData is located after the current
- * space partition.
+ * The location of a chunkGroupMetaData with respect to a space partition constraint. <p> in - the
+ * middle point of the chunkGroupMetaData is located in the current space partition. before - the
+ * middle point of the chunkGroupMetaData is located before the current space partition. after -
+ * the middle point of the chunkGroupMetaData is located after the current space partition.
*/
public enum LocateStatus {
in, before, after
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java
index aa86346..d437914 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java
@@ -18,7 +18,17 @@
*/
package org.apache.iotdb.tsfile.read.controller;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
import org.apache.iotdb.tsfile.common.cache.LRUCache;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
@@ -28,8 +38,6 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader.LocateStatus;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.TimeRange;
-import java.io.IOException;
-import java.util.*;
public class MetadataQuerierByFileImpl implements IMetadataQuerier {
@@ -101,8 +109,11 @@ public class MetadataQuerierByFileImpl implements IMetadataQuerier {
String selectedDevice = deviceMeasurements.getKey();
// s1, s2, s3
Set<String> selectedMeasurements = deviceMeasurements.getValue();
- if (fileMetaData.getDeviceMetadataIndex() == null
- || !fileMetaData.getDeviceMetadataIndex().containsKey(selectedDevice)) {
+
+ List<String> devices = this.tsFileReader
+ .getDevicesByMetadata(fileMetaData.getDeviceMetadataIndex());
+ String[] deviceNames = devices.toArray(new String[devices.size()]);
+ if (Arrays.binarySearch(deviceNames, selectedDevice) < 0) {
continue;
}
@@ -177,7 +188,8 @@ public class MetadataQuerierByFileImpl implements IMetadataQuerier {
TreeMap<String, Set<String>> deviceMeasurementsMap = new TreeMap<>();
for (Path path : paths) {
- deviceMeasurementsMap.computeIfAbsent(path.getDevice(), key -> new HashSet<>()).add(path.getMeasurement());
+ deviceMeasurementsMap.computeIfAbsent(path.getDevice(), key -> new HashSet<>())
+ .add(path.getMeasurement());
}
for (Map.Entry<String, Set<String>> deviceMeasurements : deviceMeasurementsMap.entrySet()) {
String selectedDevice = deviceMeasurements.getKey();
@@ -233,17 +245,14 @@ public class MetadataQuerierByFileImpl implements IMetadataQuerier {
return resTimeRanges;
}
-
/**
* Check the location of a given chunkGroupMetaData with respect to a space partition constraint.
*
- * @param chunkMetaData the given chunkMetaData
+ * @param chunkMetaData the given chunkMetaData
* @param spacePartitionStartPos the start position of the space partition
- * @param spacePartitionEndPos the end position of the space partition
+ * @param spacePartitionEndPos the end position of the space partition
* @return LocateStatus
*/
-
-
public static LocateStatus checkLocateStatus(ChunkMetadata chunkMetaData,
long spacePartitionStartPos, long spacePartitionEndPos) {
long startOffsetOfChunk = chunkMetaData.getOffsetOfChunkHeader();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/MetadataIndex.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/MetadataIndex.java
new file mode 100644
index 0000000..689dfc2
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/MetadataIndex.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.utils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import org.apache.iotdb.tsfile.file.metadata.enums.ChildMetadataIndexType;
+
+public class MetadataIndex {
+
+ private String name;
+ private long offset;
+ private ChildMetadataIndexType childMetadataIndexType;
+
+ public MetadataIndex() {
+ }
+
+ public MetadataIndex(String name, long offset,
+ ChildMetadataIndexType childMetadataIndexType) {
+ this.name = name;
+ this.offset = offset;
+ this.childMetadataIndexType = childMetadataIndexType;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public ChildMetadataIndexType getChildMetadataIndexType() {
+ return childMetadataIndexType;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+
+ public void setChildMetadataIndexType(
+ ChildMetadataIndexType childMetadataIndexType) {
+ this.childMetadataIndexType = childMetadataIndexType;
+ }
+
+ public String toString() {
+ return "<" + name + "," + offset + "," + childMetadataIndexType + ">";
+ }
+
+ public int serializeTo(OutputStream outputStream) throws IOException {
+ int byteLen = 0;
+ byteLen += ReadWriteIOUtils.write(name, outputStream);
+ byteLen += ReadWriteIOUtils.write(offset, outputStream);
+ byteLen += ReadWriteIOUtils.write(childMetadataIndexType.serialize(), outputStream);
+ return byteLen;
+ }
+
+ public static MetadataIndex deserializeFrom(ByteBuffer buffer) {
+ MetadataIndex metadataIndex = new MetadataIndex();
+ metadataIndex.setName(ReadWriteIOUtils.readString(buffer));
+ metadataIndex.setOffset(ReadWriteIOUtils.readLong(buffer));
+ metadataIndex.setChildMetadataIndexType(
+ ChildMetadataIndexType.deserialize(ReadWriteIOUtils.readShort(buffer)));
+ return metadataIndex;
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index a55e8a5..89c4b0f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -38,7 +38,7 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TsFileCheckStatus;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.MetadataIndex;
import org.apache.iotdb.tsfile.utils.VersionUtils;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
@@ -129,9 +129,10 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
if (reader.isComplete()) {
reader.loadMetadataSize();
TsFileMetadata metaData = reader.readFileMetadata();
- for (Pair<Long, Integer> deviceMetaData : metaData.getDeviceMetadataIndex().values()) {
- if (position > deviceMetaData.left) {
- position = deviceMetaData.left;
+ // TODO
+ for (MetadataIndex deviceMetaData : metaData.getDeviceMetadataIndex()) {
+ if (position > deviceMetaData.getOffset()) {
+ position = deviceMetaData.getOffset();
}
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 06460b0..1346bc1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -18,7 +18,17 @@
*/
package org.apache.iotdb.tsfile.write.writer;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.TreeMap;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.MetaMarker;
@@ -28,6 +38,7 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.ChildMetadataIndexType;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -36,6 +47,7 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.MetadataIndex;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -43,14 +55,6 @@ import org.apache.iotdb.tsfile.utils.VersionUtils;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
/**
* TsFileIOWriter is used to construct metadata and write data stored in memory to output stream.
@@ -62,6 +66,7 @@ public class TsFileIOWriter {
protected static final TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
private static final Logger logger = LoggerFactory.getLogger(TsFileIOWriter.class);
private static final Logger resourceLogger = LoggerFactory.getLogger("FileMonitor");
+
static {
magicStringBytes = BytesUtils.stringToBytes(TSFileConfig.MAGIC_STRING);
versionNumberBytes = TSFileConfig.VERSION_NUMBER.getBytes();
@@ -154,7 +159,8 @@ public class TsFileIOWriter {
ChunkGroupFooter chunkGroupFooter = new ChunkGroupFooter(currentChunkGroupDeviceId, dataSize,
chunkMetadataList.size());
chunkGroupFooter.serializeTo(out.wrapAsStream());
- chunkGroupMetadataList.add(new ChunkGroupMetadata(currentChunkGroupDeviceId, chunkMetadataList));
+ chunkGroupMetadataList
+ .add(new ChunkGroupMetadata(currentChunkGroupDeviceId, chunkMetadataList));
currentChunkGroupDeviceId = null;
chunkMetadataList = null;
}
@@ -162,11 +168,11 @@ public class TsFileIOWriter {
/**
* start a {@linkplain ChunkMetadata ChunkMetaData}.
*
- * @param measurementSchema - schema of this time series
+ * @param measurementSchema - schema of this time series
* @param compressionCodecName - compression name of this time series
- * @param tsDataType - data type
- * @param statistics - Chunk statistics
- * @param dataSize - the serialized size of all pages
+ * @param tsDataType - data type
+ * @param statistics - Chunk statistics
+ * @param dataSize - the serialized size of all pages
* @throws IOException if I/O error occurs
*/
public void startFlushChunk(MeasurementSchema measurementSchema,
@@ -225,17 +231,17 @@ public class TsFileIOWriter {
// group ChunkMetadata by series
Map<Path, List<ChunkMetadata>> chunkMetadataListMap = new TreeMap<>();
- for (ChunkGroupMetadata chunkGroupMetadata: chunkGroupMetadataList) {
+ for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
chunkMetadataListMap.computeIfAbsent(series, k -> new ArrayList<>()).add(chunkMetadata);
}
}
- Map<String, Pair<Long, Integer>> deviceMetaDataMap = flushAllChunkMetadataList(chunkMetadataListMap);
+ List<MetadataIndex> deviceMetaDataList = flushAllChunkMetadataList(chunkMetadataListMap);
TsFileMetadata tsFileMetaData = new TsFileMetadata();
- tsFileMetaData.setDeviceMetadataIndex(deviceMetaDataMap);
+ tsFileMetaData.setDeviceMetadataIndex(deviceMetaDataList);
tsFileMetaData.setVersionInfo(versionInfo);
tsFileMetaData.setTotalChunkNum(totalChunkNum);
tsFileMetaData.setInvalidChunkNum(invalidChunkNum);
@@ -274,9 +280,10 @@ public class TsFileIOWriter {
/**
* Flush ChunkMetadataList and TimeseriesMetaData
+ *
* @return DeviceMetaDataMap in TsFileMetaData
*/
- private Map<String, Pair<Long, Integer>> flushAllChunkMetadataList(
+ private List<MetadataIndex> flushAllChunkMetadataList(
Map<Path, List<ChunkMetadata>> chunkMetadataListMap) throws IOException {
// convert ChunkMetadataList to this field
@@ -304,22 +311,119 @@ public class TsFileIOWriter {
deviceTimeseriesMetadataMap.computeIfAbsent(device, k -> new ArrayList<>())
.add(timeseriesMetaData);
}
- // create DeviceMetaDataMap device -> Pair<TimeseriesMetaDataOffset, TimeseriesMetaDataLength>
- Map<String, Pair<Long, Integer>> deviceMetadataMap = new HashMap<>();
+
+ // create TsFileMetadata
+ Map<String, Queue<MetadataIndex>> deviceMetadataIndexMap = new TreeMap<>();
+ int maxNumOfIndexItems = config.getMaxNumberOfIndexItemsInNode();
+
+ // for timeseriesMetadata of each device
for (Map.Entry<String, List<TimeseriesMetadata>> entry : deviceTimeseriesMetadataMap
.entrySet()) {
- String device = entry.getKey();
- List<TimeseriesMetadata> timeseriesMetadataList = entry.getValue();
- long offsetOfFirstTimeseriesMetaDataInDevice = out.getPosition();
- int size = 0;
- for (TimeseriesMetadata timeseriesMetaData : timeseriesMetadataList) {
- size += timeseriesMetaData.serializeTo(out.wrapAsStream());
+ if (entry.getValue().size() == 0) {
+ continue;
+ }
+ Queue<MetadataIndex> measurementMetadataIndexQueue = new ArrayDeque<>();
+ TimeseriesMetadata timeseriesMetadata;
+ for (int i = 0; i < entry.getValue().size(); i++) {
+ timeseriesMetadata = entry.getValue().get(i);
+ if (i % maxNumOfIndexItems == 0) {
+ measurementMetadataIndexQueue
+ .add(new MetadataIndex(timeseriesMetadata.getMeasurementId(), out.getPosition(),
+ ChildMetadataIndexType.MEASUREMENT));
+ }
+ timeseriesMetadata.serializeTo(out.wrapAsStream());
+ }
+ measurementMetadataIndexQueue
+ .add(new MetadataIndex("", out.getPosition(), ChildMetadataIndexType.MEASUREMENT));
+
+ int queueSize = measurementMetadataIndexQueue.size();
+ MetadataIndex metadataIndex;
+ while (queueSize > maxNumOfIndexItems) {
+ for (int i = 0; i < queueSize; i++) {
+ metadataIndex = measurementMetadataIndexQueue.poll();
+ if (i % maxNumOfIndexItems == 0) {
+ if (i != 0) {
+ addEmptyMetadataIndex(ChildMetadataIndexType.MEASUREMENT_INDEX);
+ }
+ // add next measurement index item to parent node
+ measurementMetadataIndexQueue.add(new MetadataIndex(entry.getKey(),
+ metadataIndex.getOffset(), ChildMetadataIndexType.MEASUREMENT_INDEX));
+ }
+ metadataIndex.serializeTo(out.wrapAsStream());
+ }
+ addEmptyMetadataIndex(ChildMetadataIndexType.MEASUREMENT);
+ queueSize = measurementMetadataIndexQueue.size();
+ }
+
+ deviceMetadataIndexMap.put(entry.getKey(), measurementMetadataIndexQueue);
+ }
+
+ List<MetadataIndex> metadataIndexList = new ArrayList<>();
+ // if not exceed the max child nodes num, ignore the device index and directly point to the measurement
+ if (deviceMetadataIndexMap.size() < maxNumOfIndexItems) {
+ for (Map.Entry<String, Queue<MetadataIndex>> entry : deviceMetadataIndexMap.entrySet()) {
+ metadataIndexList.add(new MetadataIndex(entry.getKey(), out.getPosition(),
+ ChildMetadataIndexType.MEASUREMENT_INDEX));
+ for (MetadataIndex metadataIndex : entry.getValue()) {
+ new MetadataIndex(metadataIndex.getName(), metadataIndex.getOffset(),
+ ChildMetadataIndexType.MEASUREMENT).serializeTo(out.wrapAsStream());
+ }
+ }
+ metadataIndexList
+ .add(new MetadataIndex("", out.getPosition(), ChildMetadataIndexType.MEASUREMENT_INDEX));
+ return metadataIndexList;
+ }
+
+ // else, build level index for devices
+ Queue<MetadataIndex> deviceMetadaIndexQueue = new ArrayDeque<>();
+ for (Map.Entry<String, Queue<MetadataIndex>> entry : deviceMetadataIndexMap.entrySet()) {
+ deviceMetadaIndexQueue.add(new MetadataIndex(entry.getKey(), out.getPosition(),
+ ChildMetadataIndexType.DEVICE));
+ for (MetadataIndex measurementMetadataIndex : entry.getValue()) {
+ new MetadataIndex(measurementMetadataIndex.getName(), measurementMetadataIndex.getOffset(),
+ ChildMetadataIndexType.MEASUREMENT).serializeTo(out.wrapAsStream());
}
- deviceMetadataMap
- .put(device, new Pair<>(offsetOfFirstTimeseriesMetaDataInDevice, size));
}
+
+ int queueSize = deviceMetadaIndexQueue.size();
+ MetadataIndex deviceMetadataIndex;
+ while (queueSize > maxNumOfIndexItems) {
+ for (int i = 0; i < queueSize; i++) {
+ deviceMetadataIndex = deviceMetadaIndexQueue.poll();
+ if (i % maxNumOfIndexItems == 0) {
+ if (i != 0) {
+ new MetadataIndex(deviceMetadataIndex.getName(), deviceMetadataIndex.getOffset(),
+ ChildMetadataIndexType.DEVICE).serializeTo(out.wrapAsStream());
+ ;
+ }
+ // add next device index item to parent node
+ deviceMetadaIndexQueue.add(new MetadataIndex(deviceMetadataIndex.getName(),
+ out.getPosition(), ChildMetadataIndexType.DEVICE
+ ));
+ }
+ deviceMetadataIndex.serializeTo(out.wrapAsStream());
+ }
+ addEmptyMetadataIndex(ChildMetadataIndexType.DEVICE);
+ queueSize = deviceMetadaIndexQueue.size();
+ }
+ deviceMetadaIndexQueue.forEach(
+ metadataIndex -> metadataIndex
+ .setChildMetadataIndexType(ChildMetadataIndexType.DEVICE_INDEX));
+ metadataIndexList.addAll(deviceMetadaIndexQueue);
+ metadataIndexList
+ .add(new MetadataIndex("", out.getPosition(), ChildMetadataIndexType.DEVICE_INDEX));
+
// return
- return deviceMetadataMap;
+ return metadataIndexList;
+ }
+
+ /**
+ * add an empty index item for easily calculating the end position
+ *
+ * @param type child metadata index type
+ */
+ private void addEmptyMetadataIndex(ChildMetadataIndexType type) throws IOException {
+ new MetadataIndex("", out.getPosition(), type).serializeTo(out.wrapAsStream());
}
/**
@@ -422,8 +526,7 @@ public class TsFileIOWriter {
}
/**
- * write MetaMarker.VERSION with version
- * Then, cache offset-version in versionInfo
+ * write MetaMarker.VERSION with version Then, cache offset-version in versionInfo
*/
public void writeVersion(long version) throws IOException {
ReadWriteIOUtils.write(MetaMarker.VERSION, out.wrapAsStream());
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/TestHelper.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/TestHelper.java
index d805f68..be8b3b0 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/TestHelper.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/TestHelper.java
@@ -19,16 +19,16 @@
package org.apache.iotdb.tsfile.file.metadata.utils;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.header.PageHeaderTest;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.ChildMetadataIndexType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.utils.MetadataIndex;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -41,10 +41,11 @@ public class TestHelper {
return metaData;
}
- private static Map<String, Pair<Long, Integer>> generateDeviceMetaDataIndex() {
- Map<String, Pair<Long, Integer>> deviceMetaDataIndex = new HashMap<>();
+ private static List<MetadataIndex> generateDeviceMetaDataIndex() {
+ List<MetadataIndex> deviceMetaDataIndex = new ArrayList<>();
for (int i = 0; i < 5; i++) {
- deviceMetaDataIndex.put("d" + i, new Pair<Long, Integer>((long) i * 5, 5));
+ deviceMetaDataIndex
+ .add(new MetadataIndex("d" + i, (long) i * 5, ChildMetadataIndexType.MEASUREMENT));
}
return deviceMetaDataIndex;
}
@@ -60,7 +61,7 @@ public class TestHelper {
public static MeasurementSchema createSimpleMeasurementSchema(String measurementuid) {
return new MeasurementSchema(measurementuid, TSDataType.INT64, TSEncoding.RLE);
}
-
+
public static TimeseriesMetadata createSimpleTimseriesMetaData(String measurementuid) {
Statistics<?> statistics = Statistics.getStatsByType(PageHeaderTest.DATA_TYPE);
statistics.setEmpty(false);
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/Utils.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/Utils.java
index c39294b..92c8359 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/Utils.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/Utils.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.utils.MetadataIndex;
import org.apache.iotdb.tsfile.utils.Pair;
import org.junit.Assert;
@@ -123,8 +124,8 @@ public class Utils {
.isTwoObjectsNotNULL(metadata1.getDeviceMetadataIndex(), metadata2.getDeviceMetadataIndex(),
"Delta object metadata list")) {
- Map<String, Pair<Long, Integer>> deviceMetaDataMap1 = metadata1.getDeviceMetadataIndex();
- Map<String, Pair<Long, Integer>> deviceMetaDataMap2 = metadata2.getDeviceMetadataIndex();
+ List<MetadataIndex> deviceMetaDataMap1 = metadata1.getDeviceMetadataIndex();
+ List<MetadataIndex> deviceMetaDataMap2 = metadata2.getDeviceMetadataIndex();
return deviceMetaDataMap1.size() == deviceMetaDataMap2.size();
}
}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
index 29afae4..dc416d9 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
@@ -109,6 +109,7 @@ public class TsFileIOWriterTest {
// FileMetaData
TsFileMetadata metaData = reader.readFileMetadata();
- Assert.assertEquals(1, metaData.getDeviceMetadataIndex().size());
+ // with an empty end MetadataIndex
+ Assert.assertEquals(2, metaData.getDeviceMetadataIndex().size());
}
}