You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/05/11 03:34:55 UTC
[iotdb] branch master updated: [IOTDB-1366] Refactor MetadataIndex
in TsFile for Vector (#3081)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d6fe7b0 [IOTDB-1366] Refactor MetadataIndex in TsFile for Vector (#3081)
d6fe7b0 is described below
commit d6fe7b00c5f383f9ec57e1d9b2cb0e902089f6a1
Author: Zesong Sun <sz...@mails.tsinghua.edu.cn>
AuthorDate: Tue May 11 11:34:38 2021 +0800
[IOTDB-1366] Refactor MetadataIndex in TsFile for Vector (#3081)
Refactor MetadataIndex in TsFile for Vector (#3081)
---
.../cluster/query/reader/ClusterReaderFactory.java | 3 +-
.../query/groupby/MergeGroupByExecutorTest.java | 23 ++--
.../query/groupby/RemoteGroupByExecutorTest.java | 23 ++--
.../cluster/server/member/DataGroupMemberTest.java | 28 +++-
.../cluster/server/member/MetaGroupMemberTest.java | 15 ++-
.../db/engine/cache/TimeSeriesMetadataCache.java | 121 +++++++++++++++++
.../db/engine/storagegroup/TsFileResource.java | 15 +--
.../db/query/executor/fill/LastPointReader.java | 51 +++-----
.../query/reader/series/SeriesAggregateReader.java | 2 +-
.../reader/series/SeriesRawDataBatchReader.java | 4 +-
.../iotdb/db/query/reader/series/SeriesReader.java | 7 +-
.../reader/series/SeriesReaderByTimestamp.java | 2 +-
.../query/reader/series/SeriesReaderFactory.java | 94 --------------
.../db/query/reader/series/VectorSeriesReader.java | 144 ---------------------
.../org/apache/iotdb/db/utils/FileLoaderUtils.java | 94 +++++++++++++-
.../iotdb/db/utils/datastructure/VectorTVList.java | 6 +-
.../tsfile/common/constant/TsFileConstant.java | 3 +
.../iotdb/tsfile/file/metadata/ChunkMetadata.java | 11 ++
.../iotdb/tsfile/file/metadata/IChunkMetadata.java | 4 +
.../tsfile/file/metadata/ITimeSeriesMetadata.java | 3 +
.../file/metadata/MetadataIndexConstructor.java | 54 ++++++--
.../tsfile/file/metadata/MetadataIndexNode.java | 2 +-
.../tsfile/file/metadata/TimeseriesMetadata.java | 20 ++-
.../tsfile/file/metadata/VectorChunkMetadata.java | 10 ++
.../file/metadata/VectorTimeSeriesMetadata.java | 16 +++
.../iotdb/tsfile/read/TsFileSequenceReader.java | 80 ++++++++----
.../tsfile/v2/read/TsFileSequenceReaderForV2.java | 6 +-
.../iotdb/tsfile/write/chunk/TimeChunkWriter.java | 3 +-
.../iotdb/tsfile/write/chunk/ValueChunkWriter.java | 3 +-
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 121 ++++++++++++-----
30 files changed, 554 insertions(+), 414 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index bd45755..261f5cd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -63,7 +63,6 @@ import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
import org.apache.iotdb.db.query.reader.series.SeriesRawDataPointReader;
import org.apache.iotdb.db.query.reader.series.SeriesReader;
import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
-import org.apache.iotdb.db.query.reader.series.SeriesReaderFactory;
import org.apache.iotdb.db.utils.SerializeUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -515,7 +514,7 @@ public class ClusterReaderFactory {
((SlotPartitionTable) metaGroupMember.getPartitionTable()).getNodeSlots(header);
QueryDataSource queryDataSource =
QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
- return SeriesReaderFactory.createSeriesReader(
+ return new SeriesReader(
path,
allSensors,
dataType,
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/MergeGroupByExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/MergeGroupByExecutorTest.java
index e04e5a6..5e74cde 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/MergeGroupByExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/MergeGroupByExecutorTest.java
@@ -38,8 +38,9 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.junit.Test;
import java.io.IOException;
-import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
public class MergeGroupByExecutorTest extends BaseQueryTest {
@@ -52,16 +53,12 @@ public class MergeGroupByExecutorTest extends BaseQueryTest {
new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
try {
Filter timeFilter = null;
+ Set<String> deviceMeasurements = new HashSet<>();
+ deviceMeasurements.add(path.getMeasurement());
MergeGroupByExecutor groupByExecutor =
new MergeGroupByExecutor(
- path,
- Collections.singleton(path.getMeasurement()),
- dataType,
- context,
- timeFilter,
- testMetaMember,
- true);
+ path, deviceMeasurements, dataType, context, timeFilter, testMetaMember, true);
AggregationType[] types = AggregationType.values();
for (AggregationType type : types) {
groupByExecutor.addAggregateResult(
@@ -91,16 +88,12 @@ public class MergeGroupByExecutorTest extends BaseQueryTest {
new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
try {
Filter timeFilter = TimeFilter.gtEq(3);
+ Set<String> deviceMeasurements = new HashSet<>();
+ deviceMeasurements.add(path.getMeasurement());
MergeGroupByExecutor groupByExecutor =
new MergeGroupByExecutor(
- path,
- Collections.singleton(path.getMeasurement()),
- dataType,
- context,
- timeFilter,
- testMetaMember,
- true);
+ path, deviceMeasurements, dataType, context, timeFilter, testMetaMember, true);
AggregationType[] types = AggregationType.values();
for (AggregationType type : types) {
groupByExecutor.addAggregateResult(
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java
index dac449a..617ba6b 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java
@@ -41,8 +41,9 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import static org.junit.Assert.assertTrue;
@@ -61,17 +62,13 @@ public class RemoteGroupByExecutorTest extends BaseQueryTest {
for (int i = 0; i < AggregationType.values().length; i++) {
aggregationTypes.add(i);
}
+ Set<String> deviceMeasurements = new HashSet<>();
+ deviceMeasurements.add(path.getMeasurement());
ClusterReaderFactory readerFactory = new ClusterReaderFactory(testMetaMember);
List<GroupByExecutor> groupByExecutors =
readerFactory.getGroupByExecutors(
- path,
- Collections.singleton(path.getMeasurement()),
- dataType,
- context,
- timeFilter,
- aggregationTypes,
- true);
+ path, deviceMeasurements, dataType, context, timeFilter, aggregationTypes, true);
for (int i = 0; i < groupByExecutors.size(); i++) {
GroupByExecutor groupByExecutor = groupByExecutors.get(i);
@@ -123,17 +120,13 @@ public class RemoteGroupByExecutorTest extends BaseQueryTest {
for (int i = 0; i < AggregationType.values().length; i++) {
aggregationTypes.add(i);
}
+ Set<String> deviceMeasurements = new HashSet<>();
+ deviceMeasurements.add(path.getMeasurement());
ClusterReaderFactory readerFactory = new ClusterReaderFactory(testMetaMember);
List<GroupByExecutor> groupByExecutors =
readerFactory.getGroupByExecutors(
- path,
- Collections.singleton(path.getMeasurement()),
- dataType,
- context,
- timeFilter,
- aggregationTypes,
- true);
+ path, deviceMeasurements, dataType, context, timeFilter, aggregationTypes, true);
for (int i = 0; i < groupByExecutors.size(); i++) {
GroupByExecutor groupByExecutor = groupByExecutors.get(i);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index 4d092cf..ca6050e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -106,6 +106,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -115,6 +116,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.iotdb.cluster.common.TestUtils.getTestMeasurement;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -703,7 +705,7 @@ public class DataGroupMemberTest extends BaseMember {
InsertRowPlan insertPlan = new InsertRowPlan();
insertPlan.setDeviceId(new PartialPath(TestUtils.getTestSg(0)));
insertPlan.setNeedInferType(true);
- insertPlan.setMeasurements(new String[] {TestUtils.getTestMeasurement(0)});
+ insertPlan.setMeasurements(new String[] {getTestMeasurement(0)});
insertPlan.setDataTypes(new TSDataType[insertPlan.getMeasurements().length]);
for (int i = 0; i < 10; i++) {
insertPlan.setTime(i);
@@ -729,6 +731,10 @@ public class DataGroupMemberTest extends BaseMember {
filter.serialize(dataOutputStream);
request.setTimeFilterBytes(byteArrayOutputStream.toByteArray());
+ Set<String> deviceMeasurements = new HashSet<>();
+ deviceMeasurements.add(getTestMeasurement(0));
+ request.setDeviceMeasurements(deviceMeasurements);
+
AtomicReference<Long> result = new AtomicReference<>();
GenericHandler<Long> handler = new GenericHandler<>(TestUtils.getNode(0), result);
new DataAsyncService(dataGroupMember).querySingleSeries(request, handler);
@@ -765,7 +771,7 @@ public class DataGroupMemberTest extends BaseMember {
InsertRowPlan insertPlan = new InsertRowPlan();
insertPlan.setDeviceId(new PartialPath(TestUtils.getTestSg(0)));
insertPlan.setNeedInferType(true);
- insertPlan.setMeasurements(new String[] {TestUtils.getTestMeasurement(0)});
+ insertPlan.setMeasurements(new String[] {getTestMeasurement(0)});
insertPlan.setDataTypes(new TSDataType[insertPlan.getMeasurements().length]);
for (int i = 0; i < 10; i++) {
insertPlan.setTime(i);
@@ -791,6 +797,10 @@ public class DataGroupMemberTest extends BaseMember {
filter.serialize(dataOutputStream);
request.setTimeFilterBytes(byteArrayOutputStream.toByteArray());
+ Set<String> deviceMeasurements = new HashSet<>();
+ deviceMeasurements.add(getTestMeasurement(0));
+ request.setDeviceMeasurements(deviceMeasurements);
+
AtomicReference<Long> result = new AtomicReference<>();
GenericHandler<Long> handler = new GenericHandler<>(TestUtils.getNode(0), result);
new DataAsyncService(dataGroupMember).querySingleSeries(request, handler);
@@ -827,7 +837,7 @@ public class DataGroupMemberTest extends BaseMember {
InsertRowPlan insertPlan = new InsertRowPlan();
insertPlan.setDeviceId(new PartialPath(TestUtils.getTestSg(0)));
insertPlan.setNeedInferType(true);
- insertPlan.setMeasurements(new String[] {TestUtils.getTestMeasurement(0)});
+ insertPlan.setMeasurements(new String[] {getTestMeasurement(0)});
insertPlan.setDataTypes(new TSDataType[insertPlan.getMeasurements().length]);
for (int i = 0; i < 10; i++) {
insertPlan.setTime(i);
@@ -853,6 +863,9 @@ public class DataGroupMemberTest extends BaseMember {
filter.serialize(dataOutputStream);
request.setTimeFilterBytes(byteArrayOutputStream.toByteArray());
request.setAscending(true);
+ Set<String> deviceMeasurements = new HashSet<>();
+ deviceMeasurements.add(getTestMeasurement(0));
+ request.setDeviceMeasurements(deviceMeasurements);
AtomicReference<Long> result = new AtomicReference<>();
GenericHandler<Long> handler = new GenericHandler<>(TestUtils.getNode(0), result);
@@ -890,7 +903,7 @@ public class DataGroupMemberTest extends BaseMember {
InsertRowPlan insertPlan = new InsertRowPlan();
insertPlan.setDeviceId(new PartialPath(TestUtils.getTestSg(0)));
insertPlan.setNeedInferType(true);
- insertPlan.setMeasurements(new String[] {TestUtils.getTestMeasurement(0)});
+ insertPlan.setMeasurements(new String[] {getTestMeasurement(0)});
insertPlan.setDataTypes(new TSDataType[insertPlan.getMeasurements().length]);
for (int i = 0; i < 10; i++) {
insertPlan.setTime(i);
@@ -916,6 +929,9 @@ public class DataGroupMemberTest extends BaseMember {
filter.serialize(dataOutputStream);
request.setTimeFilterBytes(byteArrayOutputStream.toByteArray());
request.setAscending(true);
+ Set<String> deviceMeasurements = new HashSet<>();
+ deviceMeasurements.add(getTestMeasurement(0));
+ request.setDeviceMeasurements(deviceMeasurements);
AtomicReference<Long> result = new AtomicReference<>();
GenericHandler<Long> handler = new GenericHandler<>(TestUtils.getNode(0), result);
@@ -1114,7 +1130,9 @@ public class DataGroupMemberTest extends BaseMember {
request.setQueryId(queryContext.getQueryId());
request.setRequestor(TestUtils.getNode(0));
request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal());
- request.setDeviceMeasurements(Collections.singleton(TestUtils.getTestMeasurement(0)));
+ Set<String> deviceMeasurements = new HashSet<>();
+ deviceMeasurements.add(getTestMeasurement(0));
+ request.setDeviceMeasurements(deviceMeasurements);
request.setAscending(true);
DataGroupMember dataGroupMember;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 87cc093..475bef2 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -118,8 +118,10 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -132,7 +134,6 @@ import static org.apache.iotdb.cluster.server.NodeCharacter.LEADER;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -885,11 +886,14 @@ public class MetaGroupMemberTest extends BaseMember {
for (int i = 0; i < 10; i++) {
times[i] = i;
}
+ Set<String> deviceMeasurements = new HashSet<>();
+ deviceMeasurements.add(TestUtils.getTestMeasurement(0));
+
for (int i = 0; i < 10; i++) {
IReaderByTimestamp readerByTimestamp =
readerFactory.getReaderByTimestamp(
new PartialPath(TestUtils.getTestSeries(i, 0)),
- Collections.singleton(TestUtils.getTestMeasurement(0)),
+ deviceMeasurements,
TSDataType.DOUBLE,
context,
true);
@@ -943,11 +947,14 @@ public class MetaGroupMemberTest extends BaseMember {
try {
ClusterReaderFactory readerFactory = new ClusterReaderFactory(testMetaMember);
+ Set<String> deviceMeasurements = new HashSet<>();
+ deviceMeasurements.add(TestUtils.getTestMeasurement(0));
+
for (int i = 0; i < 10; i++) {
ManagedSeriesReader reader =
readerFactory.getSeriesReader(
new PartialPath(TestUtils.getTestSeries(i, 0)),
- Collections.singleton(TestUtils.getTestMeasurement(0)),
+ deviceMeasurements,
TSDataType.DOUBLE,
TimeFilter.gtEq(5),
ValueFilter.ltEq(8.0),
@@ -999,7 +1006,7 @@ public class MetaGroupMemberTest extends BaseMember {
request.setRegenerateIdentifier(true);
testMetaMember.processValidHeartbeatReq(request, response);
- assertNotEquals(10, response.getFollowerIdentifier());
+ assertTrue(response.getFollowerIdentifier() != 10);
assertTrue(response.isRequirePartitionTable());
request.setPartitionTableBytes(partitionTable.serialize());
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
index 51a05cb..17d3f32 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.ref.WeakReference;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -225,6 +226,126 @@ public class TimeSeriesMetadataCache {
}
}
+ // Suppress synchronize warning
+ // Suppress high Cognitive Complexity warning
+ @SuppressWarnings({"squid:S1860", "squid:S3776"})
+ public List<TimeseriesMetadata> get(
+ TimeSeriesMetadataCacheKey key,
+ List<String> subSensorList,
+ Set<String> allSensors,
+ boolean debug)
+ throws IOException {
+ if (!CACHE_ENABLE) {
+ // bloom filter part
+ TsFileSequenceReader reader = FileReaderManager.getInstance().get(key.filePath, true);
+ BloomFilter bloomFilter = reader.readBloomFilter();
+ if (bloomFilter != null
+ && !bloomFilter.contains(key.device + IoTDBConstant.PATH_SEPARATOR + key.measurement)) {
+ return Collections.emptyList();
+ }
+ return reader.readTimeseriesMetadata(new Path(key.device, key.measurement), subSensorList);
+ }
+
+ cacheRequestNum.incrementAndGet();
+
+ List<TimeseriesMetadata> res = new ArrayList<>();
+
+ getVectorTimeSeriesMetadataListFromCache(key, subSensorList, res);
+
+ if (!res.isEmpty()) {
+ cacheHitNum.incrementAndGet();
+ printCacheLog(true);
+ } else {
+ if (debug) {
+ DEBUG_LOGGER.info(
+ "Cache miss: {}.{} in file: {}", key.device, key.measurement, key.filePath);
+ DEBUG_LOGGER.info("Device: {}, all sensors: {}", key.device, allSensors);
+ }
+ // allow for the parallelism of different devices
+ synchronized (
+ devices.computeIfAbsent(key.device + SEPARATOR + key.filePath, WeakReference::new)) {
+ // double check
+ getVectorTimeSeriesMetadataListFromCache(key, subSensorList, res);
+ if (!res.isEmpty()) {
+ cacheHitNum.incrementAndGet();
+ printCacheLog(true);
+ } else {
+ Path path = new Path(key.device, key.measurement);
+ // bloom filter part
+ TsFileSequenceReader reader = FileReaderManager.getInstance().get(key.filePath, true);
+ BloomFilter bloomFilter = reader.readBloomFilter();
+ if (bloomFilter != null && !bloomFilter.contains(path.getFullPath())) {
+ if (debug) {
+ DEBUG_LOGGER.info("TimeSeries meta data {} is filter by bloomFilter!", key);
+ }
+ return Collections.emptyList();
+ }
+ printCacheLog(false);
+ List<TimeseriesMetadata> timeSeriesMetadataList =
+ reader.readTimeseriesMetadata(path, allSensors);
+ // put TimeSeriesMetadata of all sensors used in this query into cache
+ lock.writeLock().lock();
+ try {
+ timeSeriesMetadataList.forEach(
+ metadata -> {
+ TimeSeriesMetadataCacheKey k =
+ new TimeSeriesMetadataCacheKey(
+ key.filePath, key.device, metadata.getMeasurementId());
+ if (!lruCache.containsKey(k)) {
+ lruCache.put(k, metadata);
+ }
+ });
+ getVectorTimeSeriesMetadataListFromCache(key, subSensorList, res);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+ }
+ }
+ if (res.isEmpty()) {
+ if (debug) {
+ DEBUG_LOGGER.info("The file doesn't have this time series {}.", key);
+ }
+ return Collections.emptyList();
+ } else {
+ if (debug) {
+ DEBUG_LOGGER.info(
+ "Get timeseries: {}.{} metadata in file: {} from cache: {}.",
+ key.device,
+ key.measurement,
+ key.filePath,
+ res);
+ }
+ for (int i = 0; i < res.size(); i++) {
+ res.set(i, new TimeseriesMetadata(res.get(i)));
+ }
+ return res;
+ }
+ }
+
+ private void getVectorTimeSeriesMetadataListFromCache(
+ TimeSeriesMetadataCacheKey key, List<String> subSensorList, List<TimeseriesMetadata> res) {
+ lock.readLock().lock();
+ try {
+ TimeseriesMetadata timeseriesMetadata = lruCache.get(key);
+ if (timeseriesMetadata != null) {
+ res.add(timeseriesMetadata);
+ for (String subSensor : subSensorList) {
+ timeseriesMetadata =
+ lruCache.get(new TimeSeriesMetadataCacheKey(key.filePath, key.device, subSensor));
+ if (timeseriesMetadata != null) {
+ res.add(timeseriesMetadata);
+ } else {
+ res.clear();
+ break;
+ }
+ }
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
private void printCacheLog(boolean isHit) {
if (!logger.isDebugEnabled()) {
return;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 0a36c53..3f46ac2 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -667,19 +667,8 @@ public class TsFileResource {
*
* @return TimeseriesMetadata or the first ValueTimeseriesMetadata in VectorTimeseriesMetadata
*/
- public TimeseriesMetadata getTimeSeriesMetadata() {
- if (timeSeriesMetadata == null) {
- return null;
- }
- if (timeSeriesMetadata instanceof TimeseriesMetadata) {
- return (TimeseriesMetadata) timeSeriesMetadata;
- } else {
- // it's ok for us to return the first value timeseries metadata,
- // because the MemChunkMetadataLoader is not depend on the timeseries metadata
- return ((VectorTimeSeriesMetadata) timeSeriesMetadata)
- .getValueTimeseriesMetadataList()
- .get(0);
- }
+ public ITimeSeriesMetadata getTimeSeriesMetadata() {
+ return timeSeriesMetadata;
}
public void setUpgradedResources(List<TsFileResource> upgradedResources) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java
index e7eb64f..d9e926d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java
@@ -21,14 +21,10 @@ package org.apache.iotdb.db.query.executor.fill;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.metadata.VectorPartialPath;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.utils.FileLoaderUtils;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
-import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
-import org.apache.iotdb.tsfile.file.metadata.VectorTimeSeriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -39,7 +35,6 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;
@@ -58,9 +53,9 @@ public class LastPointReader {
private QueryDataSource dataSource;
- private ChunkMetadata cachedLastChunk;
+ private IChunkMetadata cachedLastChunk;
- private List<TimeseriesMetadata> unseqTimeseriesMetadataList = new ArrayList<>();
+ private List<ITimeSeriesMetadata> unseqTimeseriesMetadataList = new ArrayList<>();
public LastPointReader() {}
@@ -85,10 +80,10 @@ public class LastPointReader {
TimeValuePair resultPoint = retrieveValidLastPointFromSeqFiles();
UnpackOverlappedUnseqFiles(resultPoint.getTimestamp());
- PriorityQueue<ChunkMetadata> sortedChunkMetatdataList = sortUnseqChunkMetadatasByEndtime();
+ PriorityQueue<IChunkMetadata> sortedChunkMetatdataList = sortUnseqChunkMetadatasByEndtime();
while (!sortedChunkMetatdataList.isEmpty()
&& resultPoint.getTimestamp() <= sortedChunkMetatdataList.peek().getEndTime()) {
- ChunkMetadata chunkMetadata = sortedChunkMetatdataList.poll();
+ IChunkMetadata chunkMetadata = sortedChunkMetatdataList.poll();
TimeValuePair chunkLastPoint = getChunkLastPoint(chunkMetadata);
if (chunkLastPoint.getTimestamp() > resultPoint.getTimestamp()
|| (chunkLastPoint.getTimestamp() == resultPoint.getTimestamp()
@@ -108,23 +103,9 @@ public class LastPointReader {
for (int index = seqFileResource.size() - 1; index >= 0; index--) {
TsFileResource resource = seqFileResource.get(index);
ITimeSeriesMetadata timeseriesMetadata;
- if (seriesPath instanceof VectorPartialPath) {
- timeseriesMetadata =
- new VectorTimeSeriesMetadata(
- FileLoaderUtils.loadTimeSeriesMetadata(
- resource, seriesPath, context, timeFilter, deviceMeasurements),
- Collections.singletonList(
- FileLoaderUtils.loadTimeSeriesMetadata(
- resource,
- ((VectorPartialPath) seriesPath).getSubSensorsPathList().get(0),
- context,
- timeFilter,
- deviceMeasurements)));
- } else {
- timeseriesMetadata =
- FileLoaderUtils.loadTimeSeriesMetadata(
- resource, seriesPath, context, timeFilter, deviceMeasurements);
- }
+ timeseriesMetadata =
+ FileLoaderUtils.loadTimeSeriesMetadata(
+ resource, seriesPath, context, timeFilter, deviceMeasurements);
if (timeseriesMetadata != null) {
if (!timeseriesMetadata.isModified()
&& endtimeContainedByTimeFilter(timeseriesMetadata.getStatistics())) {
@@ -135,7 +116,7 @@ public class LastPointReader {
} else {
List<IChunkMetadata> seqChunkMetadataList = timeseriesMetadata.loadChunkMetadataList();
for (int i = seqChunkMetadataList.size() - 1; i >= 0; i--) {
- lastPoint = getChunkLastPoint((ChunkMetadata) seqChunkMetadataList.get(i));
+ lastPoint = getChunkLastPoint(seqChunkMetadataList.get(i));
// last point of this sequence chunk is valid, quit the loop
if (lastPoint.getValue() != null) {
return lastPoint;
@@ -155,7 +136,7 @@ public class LastPointReader {
while (!unseqFileResource.isEmpty()
&& (lBoundTime <= unseqFileResource.peek().getEndTime(seriesPath.getDevice()))) {
- TimeseriesMetadata timeseriesMetadata =
+ ITimeSeriesMetadata timeseriesMetadata =
FileLoaderUtils.loadTimeSeriesMetadata(
unseqFileResource.poll(), seriesPath, context, timeFilter, deviceMeasurements);
@@ -175,7 +156,7 @@ public class LastPointReader {
}
}
- private TimeValuePair getChunkLastPoint(ChunkMetadata chunkMetaData) throws IOException {
+ private TimeValuePair getChunkLastPoint(IChunkMetadata chunkMetaData) throws IOException {
TimeValuePair lastPoint = new TimeValuePair(Long.MIN_VALUE, null);
if (chunkMetaData == null) {
return lastPoint;
@@ -204,7 +185,7 @@ public class LastPointReader {
return lastPoint;
}
- private boolean shouldUpdate(ChunkMetadata cachedChunk, ChunkMetadata newChunk) {
+ private boolean shouldUpdate(IChunkMetadata cachedChunk, IChunkMetadata newChunk) {
return (newChunk.getVersion() > cachedChunk.getVersion())
|| (newChunk.getVersion() == cachedChunk.getVersion()
&& newChunk.getOffsetOfChunkHeader() > cachedChunk.getOffsetOfChunkHeader());
@@ -223,8 +204,8 @@ public class LastPointReader {
return unseqTsFilesSet;
}
- private PriorityQueue<ChunkMetadata> sortUnseqChunkMetadatasByEndtime() throws IOException {
- PriorityQueue<ChunkMetadata> chunkMetadataList =
+ private PriorityQueue<IChunkMetadata> sortUnseqChunkMetadatasByEndtime() throws IOException {
+ PriorityQueue<IChunkMetadata> chunkMetadataList =
new PriorityQueue<>(
(o1, o2) -> {
long endTime1 = o1.getEndTime();
@@ -241,11 +222,9 @@ public class LastPointReader {
? -1
: Long.compare(o2.getOffsetOfChunkHeader(), o1.getOffsetOfChunkHeader()));
});
- for (TimeseriesMetadata timeseriesMetadata : unseqTimeseriesMetadataList) {
+ for (ITimeSeriesMetadata timeseriesMetadata : unseqTimeseriesMetadataList) {
if (timeseriesMetadata != null) {
- for (IChunkMetadata chunkMetadata : timeseriesMetadata.loadChunkMetadataList()) {
- chunkMetadataList.add((ChunkMetadata) chunkMetadata);
- }
+ chunkMetadataList.addAll(timeseriesMetadata.loadChunkMetadataList());
}
}
return chunkMetadataList;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
index a76327d..04cb907 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
@@ -45,7 +45,7 @@ public class SeriesAggregateReader implements IAggregateReader {
TsFileFilter fileFilter,
boolean ascending) {
this.seriesReader =
- SeriesReaderFactory.createSeriesReader(
+ new SeriesReader(
seriesPath,
allSensors,
dataType,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
index d53db04..8698955 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
@@ -58,7 +58,7 @@ public class SeriesRawDataBatchReader implements ManagedSeriesReader {
TsFileFilter fileFilter,
boolean ascending) {
this.seriesReader =
- SeriesReaderFactory.createSeriesReader(
+ new SeriesReader(
seriesPath,
allSensors,
dataType,
@@ -84,7 +84,7 @@ public class SeriesRawDataBatchReader implements ManagedSeriesReader {
Set<String> allSensors = new HashSet<>();
allSensors.add(seriesPath.getMeasurement());
this.seriesReader =
- SeriesReaderFactory.createSeriesReader(
+ new SeriesReader(
seriesPath,
allSensors,
dataType,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 041fa76..3181d10 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -34,7 +34,6 @@ import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
-import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -125,6 +124,7 @@ public class SeriesReader {
boolean ascending) {
this.seriesPath = seriesPath;
this.allSensors = allSensors;
+ this.allSensors.add(seriesPath.getMeasurement());
this.dataType = dataType;
this.context = context;
QueryUtils.filterQueryDataSource(dataSource, fileFilter);
@@ -168,6 +168,7 @@ public class SeriesReader {
boolean ascending) {
this.seriesPath = seriesPath;
this.allSensors = allSensors;
+ this.allSensors.add(seriesPath.getMeasurement());
this.dataType = dataType;
this.context = context;
this.timeFilter = timeFilter;
@@ -935,7 +936,7 @@ public class SeriesReader {
}
protected void unpackSeqTsFileResource() throws IOException {
- TimeseriesMetadata timeseriesMetadata =
+ ITimeSeriesMetadata timeseriesMetadata =
FileLoaderUtils.loadTimeSeriesMetadata(
orderUtils.getNextSeqFileResource(seqFileResource, true),
seriesPath,
@@ -949,7 +950,7 @@ public class SeriesReader {
}
protected void unpackUnseqTsFileResource() throws IOException {
- TimeseriesMetadata timeseriesMetadata =
+ ITimeSeriesMetadata timeseriesMetadata =
FileLoaderUtils.loadTimeSeriesMetadata(
unseqFileResource.remove(0), seriesPath, context, getAnyFilter(), allSensors);
if (timeseriesMetadata != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
index 543c706..27796ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
@@ -48,7 +48,7 @@ public class SeriesReaderByTimestamp implements IReaderByTimestamp {
UnaryFilter timeFilter =
ascending ? TimeFilter.gtEq(Long.MIN_VALUE) : TimeFilter.ltEq(Long.MAX_VALUE);
this.seriesReader =
- SeriesReaderFactory.createSeriesReader(
+ new SeriesReader(
seriesPath,
allSensors,
dataType,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderFactory.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderFactory.java
deleted file mode 100644
index 577e021..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderFactory.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.query.reader.series;
-
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.metadata.VectorPartialPath;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.filter.TsFileFilter;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-
-import java.util.List;
-import java.util.Set;
-
-public class SeriesReaderFactory {
-
- private SeriesReaderFactory() {}
-
- public static SeriesReader createSeriesReader(
- PartialPath seriesPath,
- Set<String> allSensors,
- TSDataType dataType,
- QueryContext context,
- QueryDataSource dataSource,
- Filter timeFilter,
- Filter valueFilter,
- TsFileFilter fileFilter,
- boolean ascending) {
- if (seriesPath instanceof VectorPartialPath) {
- return new VectorSeriesReader(
- seriesPath,
- allSensors,
- dataType,
- context,
- dataSource,
- timeFilter,
- valueFilter,
- fileFilter,
- ascending);
- } else {
- return new SeriesReader(
- seriesPath,
- allSensors,
- dataType,
- context,
- dataSource,
- timeFilter,
- valueFilter,
- fileFilter,
- ascending);
- }
- }
-
- public static SeriesReader createSeriesReader(
- PartialPath seriesPath,
- Set<String> allSensors,
- TSDataType dataType,
- QueryContext context,
- List<TsFileResource> seqFileResource,
- List<TsFileResource> unseqFileResource,
- Filter timeFilter,
- Filter valueFilter,
- boolean ascending) {
- return new SeriesReader(
- seriesPath,
- allSensors,
- dataType,
- context,
- seqFileResource,
- unseqFileResource,
- timeFilter,
- valueFilter,
- ascending);
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesReader.java
deleted file mode 100644
index 6de0150..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesReader.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.query.reader.series;
-
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.metadata.VectorPartialPath;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.filter.TsFileFilter;
-import org.apache.iotdb.db.utils.FileLoaderUtils;
-import org.apache.iotdb.db.utils.TestOnly;
-import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
-import org.apache.iotdb.tsfile.file.metadata.VectorTimeSeriesMetadata;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-public class VectorSeriesReader extends SeriesReader {
-
- private final VectorPartialPath vectorPartialPath;
-
- public VectorSeriesReader(
- PartialPath seriesPath,
- Set<String> allSensors,
- TSDataType dataType,
- QueryContext context,
- QueryDataSource dataSource,
- Filter timeFilter,
- Filter valueFilter,
- TsFileFilter fileFilter,
- boolean ascending) {
- super(
- seriesPath,
- allSensors,
- dataType,
- context,
- dataSource,
- timeFilter,
- valueFilter,
- fileFilter,
- ascending);
- this.allSensors.add(seriesPath.getMeasurement());
- this.vectorPartialPath = (VectorPartialPath) seriesPath;
- }
-
- @TestOnly
- VectorSeriesReader(
- PartialPath seriesPath,
- Set<String> allSensors,
- TSDataType dataType,
- QueryContext context,
- List<TsFileResource> seqFileResource,
- List<TsFileResource> unseqFileResource,
- Filter timeFilter,
- Filter valueFilter,
- boolean ascending) {
- super(
- seriesPath,
- allSensors,
- dataType,
- context,
- seqFileResource,
- unseqFileResource,
- timeFilter,
- valueFilter,
- ascending);
- this.allSensors.add(seriesPath.getMeasurement());
- this.vectorPartialPath = (VectorPartialPath) seriesPath;
- }
-
- @Override
- protected void unpackSeqTsFileResource() throws IOException {
- TsFileResource resource = orderUtils.getNextSeqFileResource(seqFileResource, true);
- TimeseriesMetadata timeseriesMetadata =
- FileLoaderUtils.loadTimeSeriesMetadata(
- resource, vectorPartialPath, context, getAnyFilter(), allSensors);
- if (timeseriesMetadata != null) {
- timeseriesMetadata.setSeq(true);
- List<TimeseriesMetadata> valueTimeseriesMetadataList = new ArrayList<>();
- for (PartialPath subSensor : vectorPartialPath.getSubSensorsPathList()) {
- TimeseriesMetadata valueTimeSeriesMetadata =
- FileLoaderUtils.loadTimeSeriesMetadata(
- resource, subSensor, context, getAnyFilter(), allSensors);
- if (valueTimeSeriesMetadata == null) {
- throw new IOException("File doesn't contains value");
- }
- valueTimeSeriesMetadata.setSeq(true);
- valueTimeseriesMetadataList.add(valueTimeSeriesMetadata);
- }
- VectorTimeSeriesMetadata vectorTimeSeriesMetadata =
- new VectorTimeSeriesMetadata(timeseriesMetadata, valueTimeseriesMetadataList);
- seqTimeSeriesMetadata.add(vectorTimeSeriesMetadata);
- }
- }
-
- @Override
- protected void unpackUnseqTsFileResource() throws IOException {
- TsFileResource resource = unseqFileResource.remove(0);
- TimeseriesMetadata timeseriesMetadata =
- FileLoaderUtils.loadTimeSeriesMetadata(
- resource, vectorPartialPath, context, getAnyFilter(), allSensors);
- if (timeseriesMetadata != null) {
- timeseriesMetadata.setModified(true);
- timeseriesMetadata.setSeq(false);
- List<TimeseriesMetadata> valueTimeseriesMetadataList = new ArrayList<>();
- for (PartialPath subSensor : vectorPartialPath.getSubSensorsPathList()) {
- TimeseriesMetadata valueTimeSeriesMetadata =
- FileLoaderUtils.loadTimeSeriesMetadata(
- resource, subSensor, context, getAnyFilter(), allSensors);
- if (valueTimeSeriesMetadata == null) {
- throw new IOException("File contains value");
- }
- timeseriesMetadata.setModified(true);
- valueTimeSeriesMetadata.setSeq(false);
- valueTimeseriesMetadataList.add(valueTimeSeriesMetadata);
- }
- VectorTimeSeriesMetadata vectorTimeSeriesMetadata =
- new VectorTimeSeriesMetadata(timeseriesMetadata, valueTimeseriesMetadataList);
- unSeqTimeSeriesMetadata.add(vectorTimeSeriesMetadata);
- }
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 6464c4a..b756794 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.VectorPartialPath;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.reader.chunk.MemChunkLoader;
@@ -33,6 +34,7 @@ import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.VectorChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.VectorTimeSeriesMetadata;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
@@ -48,6 +50,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.stream.Collectors;
public class FileLoaderUtils {
@@ -89,14 +92,27 @@ public class FileLoaderUtils {
* @param allSensors measurements queried at the same time of this device
* @param filter any filter, only used to check time range
*/
- public static TimeseriesMetadata loadTimeSeriesMetadata(
+ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+ public static ITimeSeriesMetadata loadTimeSeriesMetadata(
TsFileResource resource,
PartialPath seriesPath,
QueryContext context,
Filter filter,
Set<String> allSensors)
throws IOException {
- TimeseriesMetadata timeSeriesMetadata;
+ // deal with vector
+ if (seriesPath instanceof VectorPartialPath) {
+ return loadVectorTimeSeriesMetadata(
+ resource,
+ seriesPath,
+ ((VectorPartialPath) seriesPath).getSubSensorsPathList(),
+ context,
+ filter,
+ allSensors);
+ }
+
+ // common path
+ ITimeSeriesMetadata timeSeriesMetadata;
if (resource.isClosed()) {
if (!resource.getTsFile().exists()) {
return null;
@@ -140,6 +156,80 @@ public class FileLoaderUtils {
return timeSeriesMetadata;
}
+ private static VectorTimeSeriesMetadata loadVectorTimeSeriesMetadata(
+ TsFileResource resource,
+ PartialPath seriesPath,
+ List<PartialPath> subSensorList,
+ QueryContext context,
+ Filter filter,
+ Set<String> allSensors)
+ throws IOException {
+ VectorTimeSeriesMetadata vectorTimeSeriesMetadata = null;
+ if (resource.isClosed()) {
+ if (!resource.getTsFile().exists()) {
+ return null;
+ }
+ List<TimeseriesMetadata> timeSeriesMetadata =
+ TimeSeriesMetadataCache.getInstance()
+ .get(
+ new TimeSeriesMetadataCache.TimeSeriesMetadataCacheKey(
+ resource.getTsFilePath(),
+ seriesPath.getDevice(),
+ seriesPath.getMeasurement()),
+ subSensorList.stream()
+ .map(PartialPath::getMeasurement)
+ .collect(Collectors.toList()),
+ allSensors,
+ context.isDebug());
+ if (timeSeriesMetadata != null) {
+ timeSeriesMetadata
+ .get(0)
+ .setChunkMetadataLoader(
+ new DiskChunkMetadataLoader(resource, seriesPath, context, filter));
+ for (int i = 1; i < timeSeriesMetadata.size(); i++) {
+ timeSeriesMetadata
+ .get(i)
+ .setChunkMetadataLoader(
+ new DiskChunkMetadataLoader(resource, subSensorList.get(i - 1), context, filter));
+ }
+ vectorTimeSeriesMetadata =
+ new VectorTimeSeriesMetadata(
+ timeSeriesMetadata.get(0),
+ timeSeriesMetadata.subList(1, timeSeriesMetadata.size()));
+ }
+ } else {
+ vectorTimeSeriesMetadata = (VectorTimeSeriesMetadata) resource.getTimeSeriesMetadata();
+ if (vectorTimeSeriesMetadata != null) {
+ vectorTimeSeriesMetadata.setChunkMetadataLoader(
+ new MemChunkMetadataLoader(resource, seriesPath, context, filter));
+ }
+ }
+
+ if (vectorTimeSeriesMetadata != null) {
+ List<Modification> pathModifications =
+ context.getPathModifications(resource.getModFile(), seriesPath);
+ vectorTimeSeriesMetadata.getTimeseriesMetadata().setModified(!pathModifications.isEmpty());
+ if (vectorTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getStartTime()
+ > vectorTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getEndTime()) {
+ return null;
+ }
+ if (filter != null
+ && !filter.satisfyStartEndTime(
+ vectorTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getStartTime(),
+ vectorTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getEndTime())) {
+ return null;
+ }
+ List<TimeseriesMetadata> valueTimeSeriesMetadataList =
+ vectorTimeSeriesMetadata.getValueTimeseriesMetadataList();
+ for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) {
+ pathModifications =
+ context.getPathModifications(resource.getModFile(), subSensorList.get(i));
+ valueTimeSeriesMetadataList.get(i).setModified(!pathModifications.isEmpty());
+ }
+ }
+ return vectorTimeSeriesMetadata;
+ }
+
/**
* load all chunk metadata of one time series in one file.
*
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
index b7726f0..1bb4cfe 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
@@ -510,7 +510,11 @@ public class VectorTVList extends TVList {
pivotIndex = getValueIndex(pos);
}
- /* Get the row index value in index column. */
+ /**
+ * Get the row index value in index column
+ *
+ * @param index row index
+ */
@Override
public int getValueIndex(int index) {
if (index >= size) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/constant/TsFileConstant.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/constant/TsFileConstant.java
index 7b8e024..8a98764 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/constant/TsFileConstant.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/constant/TsFileConstant.java
@@ -30,5 +30,8 @@ public class TsFileConstant {
public static final String PATH_SEPARATER_NO_REGEX = "\\.";
public static final char DOUBLE_QUOTE = '"';
+ public static final byte TIME_COLUMN_MASK = (byte) 0x80;
+ public static final byte VALUE_COLUMN_MASK = (byte) 0x40;
+
private TsFileConstant() {}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
index c0d30a2..642f285 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.tsfile.file.metadata;
import org.apache.iotdb.tsfile.common.cache.Accountable;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.TimeRange;
@@ -321,6 +322,16 @@ public class ChunkMetadata implements Accountable, IChunkMetadata {
return mask;
}
+ @Override
+ public boolean isTimeColumn() {
+ return mask == TsFileConstant.TIME_COLUMN_MASK;
+ }
+
+ @Override
+ public boolean isValueColumn() {
+ return mask == TsFileConstant.VALUE_COLUMN_MASK;
+ }
+
public void setMask(byte mask) {
this.mask = mask;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java
index b81a18b..1930340 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java
@@ -71,4 +71,8 @@ public interface IChunkMetadata {
int serializeTo(OutputStream outputStream, boolean serializeStatistic) throws IOException;
byte getMask();
+
+ boolean isTimeColumn();
+
+ boolean isValueColumn();
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ITimeSeriesMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ITimeSeriesMetadata.java
index b508f57..6bb6ec1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ITimeSeriesMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ITimeSeriesMetadata.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.tsfile.file.metadata;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.controller.IChunkMetadataLoader;
import java.io.IOException;
import java.util.List;
@@ -38,4 +39,6 @@ public interface ITimeSeriesMetadata {
List<IChunkMetadata> loadChunkMetadataList() throws IOException;
List<IChunkMetadata> getChunkMetadataList();
+
+ void setChunkMetadataLoader(IChunkMetadataLoader chunkMetadataLoader);
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
index dcba911..5944316 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
@@ -43,7 +43,7 @@ public class MetadataIndexConstructor {
/**
* Construct metadata index tree
*
- * @param deviceTimeseriesMetadataMap device - >List<TimeseriesMetadata>
+ * @param deviceTimeseriesMetadataMap device => TimeseriesMetadata list
* @param out tsfile output
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
@@ -61,18 +61,54 @@ public class MetadataIndexConstructor {
TimeseriesMetadata timeseriesMetadata;
MetadataIndexNode currentIndexNode =
new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+ int serializedTimeseriesMetadataNum = 0;
for (int i = 0; i < entry.getValue().size(); i++) {
timeseriesMetadata = entry.getValue().get(i);
- // when constructing from leaf node, every "degree number of nodes" are related to an entry
- if (i % config.getMaxDegreeOfIndexNode() == 0) {
- if (currentIndexNode.isFull()) {
- addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
- currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+ if (timeseriesMetadata.isTimeColumn()) {
+ // calculate the number of value columns in this vector
+ int numOfValueColumns = 0;
+ for (int j = i + 1; j < entry.getValue().size(); j++) {
+ if (entry.getValue().get(j).isValueColumn()) {
+ numOfValueColumns++;
+ } else {
+ break;
+ }
}
- currentIndexNode.addEntry(
- new MetadataIndexEntry(timeseriesMetadata.getMeasurementId(), out.getPosition()));
+
+ // only add time column of vector into LEAF_MEASUREMENT node
+ if (currentIndexNode.getChildren().isEmpty()
+ || serializedTimeseriesMetadataNum + numOfValueColumns + 1
+ > config.getMaxDegreeOfIndexNode() * 1.5) {
+ currentIndexNode.addEntry(
+ new MetadataIndexEntry(timeseriesMetadata.getMeasurementId(), out.getPosition()));
+ serializedTimeseriesMetadataNum = 0;
+ }
+
+ timeseriesMetadata.serializeTo(out.wrapAsStream());
+ serializedTimeseriesMetadataNum++;
+ for (int j = 0; j < numOfValueColumns; j++) {
+ i += 1;
+ timeseriesMetadata = entry.getValue().get(i);
+ // value columns of vector should not be added into LEAF_MEASUREMENT node
+ timeseriesMetadata.serializeTo(out.wrapAsStream());
+ serializedTimeseriesMetadataNum++;
+ }
+ } else {
+ // when constructing from leaf node, every "degree number of nodes" are related to an
+ // entry
+ if (serializedTimeseriesMetadataNum == 0
+ || serializedTimeseriesMetadataNum >= config.getMaxDegreeOfIndexNode()) {
+ if (currentIndexNode.isFull()) {
+ addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
+ currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+ }
+ currentIndexNode.addEntry(
+ new MetadataIndexEntry(timeseriesMetadata.getMeasurementId(), out.getPosition()));
+ serializedTimeseriesMetadataNum = 0;
+ }
+ timeseriesMetadata.serializeTo(out.wrapAsStream());
+ serializedTimeseriesMetadataNum++;
}
- timeseriesMetadata.serializeTo(out.wrapAsStream());
}
addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
deviceMetadataIndexMap.put(
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
index 427e9ee..3f6f633 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
@@ -75,7 +75,7 @@ public class MetadataIndexNode {
}
boolean isFull() {
- return children.size() == config.getMaxDegreeOfIndexNode();
+ return children.size() >= config.getMaxDegreeOfIndexNode();
}
MetadataIndexEntry peek() {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
index df285dc..aeca568 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.tsfile.file.metadata;
import org.apache.iotdb.tsfile.common.cache.Accountable;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.controller.IChunkMetadataLoader;
@@ -39,9 +40,14 @@ public class TimeseriesMetadata implements Accountable, ITimeSeriesMetadata {
private long startOffsetOfChunkMetaDataList;
/**
* 0 means this time series has only one chunk, no need to save the statistic again in chunk
- * metadata 1 means this time series has more than one chunk, should save the statistic again in
- * chunk metadata if the 8th bit is 1, it means it is the time column of a vector series if the
- * 7th bit is 1, it means it is the value column of a vector series
+ * metadata;
+ *
+ * <p>1 means this time series has more than one chunk, should save the statistic again in chunk
+ * metadata;
+ *
+ * <p>if the 8th bit is 1, it means it is the time column of a vector series;
+ *
+ * <p>if the 7th bit is 1, it means it is the value column of a vector series
*/
private byte timeSeriesMetadataType;
@@ -141,6 +147,14 @@ public class TimeseriesMetadata implements Accountable, ITimeSeriesMetadata {
return timeSeriesMetadataType;
}
+ public boolean isTimeColumn() {
+ return timeSeriesMetadataType == TsFileConstant.TIME_COLUMN_MASK;
+ }
+
+ public boolean isValueColumn() {
+ return timeSeriesMetadataType == TsFileConstant.VALUE_COLUMN_MASK;
+ }
+
public void setTimeSeriesMetadataType(byte timeSeriesMetadataType) {
this.timeSeriesMetadataType = timeSeriesMetadataType;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java
index d576180..974d9d6 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java
@@ -156,6 +156,16 @@ public class VectorChunkMetadata implements IChunkMetadata {
return 0;
}
+ @Override
+ public boolean isTimeColumn() {
+ return false;
+ }
+
+ @Override
+ public boolean isValueColumn() {
+ return false;
+ }
+
public Chunk getTimeChunk() throws IOException {
return timeChunkMetadata.getChunkLoader().loadChunk((ChunkMetadata) timeChunkMetadata);
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java
index 67e6c0c..f82194d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.tsfile.file.metadata;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.controller.IChunkMetadataLoader;
import java.io.IOException;
import java.util.ArrayList;
@@ -50,6 +51,9 @@ public class VectorTimeSeriesMetadata implements ITimeSeriesMetadata {
@Override
public void setModified(boolean modified) {
timeseriesMetadata.setModified(modified);
+ for (TimeseriesMetadata subSensor : valueTimeseriesMetadataList) {
+ subSensor.setModified(modified);
+ }
}
@Override
@@ -60,6 +64,9 @@ public class VectorTimeSeriesMetadata implements ITimeSeriesMetadata {
@Override
public void setSeq(boolean seq) {
timeseriesMetadata.setSeq(seq);
+ for (TimeseriesMetadata subSensor : valueTimeseriesMetadataList) {
+ subSensor.setSeq(seq);
+ }
}
@Override
@@ -91,7 +98,16 @@ public class VectorTimeSeriesMetadata implements ITimeSeriesMetadata {
return null;
}
+ @Override
+ public void setChunkMetadataLoader(IChunkMetadataLoader chunkMetadataLoader) {
+ timeseriesMetadata.setChunkMetadataLoader(chunkMetadataLoader);
+ }
+
public List<TimeseriesMetadata> getValueTimeseriesMetadataList() {
return valueTimeseriesMetadataList;
}
+
+ public TimeseriesMetadata getTimeseriesMetadata() {
+ return timeseriesMetadata;
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 72b86e7..4992770 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -370,35 +370,50 @@ public class TsFileSequenceReader implements AutoCloseable {
}
/**
- * getChunkMetadataList Find the leaf node that contains path, return all the sensors in that leaf
- * node which are also in allSensors set
+ * Find the leaf node that contains this vector, return all the needed subSensor and time column
+ *
+ * @param path path with time column
+ * @param subSensorList value columns that needed
+ * @return TimeseriesMetadata for the time column and all the needed subSensor, the order of the
+ * element in this list should be the same as subSensorList
*/
- public List<TimeseriesMetadata> readTimeseriesMetadata(Path path, Set<String> allSensors)
+ public List<TimeseriesMetadata> readTimeseriesMetadata(Path path, List<String> subSensorList)
throws IOException {
- readFileMetadata();
- MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex();
- Pair<MetadataIndexEntry, Long> metadataIndexPair =
- getMetadataAndEndOffset(deviceMetadataIndexNode, path.getDevice(), true, true);
+ Pair<MetadataIndexEntry, Long> metadataIndexPair = getLeafMetadataIndexPair(path);
if (metadataIndexPair == null) {
- return null;
+ return Collections.emptyList();
}
+ Map<String, TimeseriesMetadata> timeseriesMetadataMap = new HashMap<>();
ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
- MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode;
- if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
+ while (buffer.hasRemaining()) {
+ TimeseriesMetadata timeseriesMetadata;
try {
- metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer);
+ timeseriesMetadata = TimeseriesMetadata.deserializeFrom(buffer, true);
} catch (BufferOverflowException e) {
- logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file);
+ logger.error(
+ "Something error happened while deserializing TimeseriesMetadata of file {}", file);
throw e;
}
- metadataIndexPair =
- getMetadataAndEndOffset(metadataIndexNode, path.getMeasurement(), false, false);
+ timeseriesMetadataMap.put(timeseriesMetadata.getMeasurementId(), timeseriesMetadata);
+ }
+
+ List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
+ for (String subSensor : subSensorList) {
+ timeseriesMetadataList.add(timeseriesMetadataMap.get(subSensor));
}
+ return timeseriesMetadataList;
+ }
+
+ /* Find the leaf node that contains path, return all the sensors in that leaf node which are also in allSensors set */
+ public List<TimeseriesMetadata> readTimeseriesMetadata(Path path, Set<String> allSensors)
+ throws IOException {
+ Pair<MetadataIndexEntry, Long> metadataIndexPair = getLeafMetadataIndexPair(path);
if (metadataIndexPair == null) {
- return null;
+ return Collections.emptyList();
}
List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
- buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
+
+ ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
while (buffer.hasRemaining()) {
TimeseriesMetadata timeseriesMetadata;
try {
@@ -415,6 +430,30 @@ public class TsFileSequenceReader implements AutoCloseable {
return timeseriesMetadataList;
}
+ /* Get leaf MetadataIndexPair which contains path */
+ private Pair<MetadataIndexEntry, Long> getLeafMetadataIndexPair(Path path) throws IOException {
+ readFileMetadata();
+ MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex();
+ Pair<MetadataIndexEntry, Long> metadataIndexPair =
+ getMetadataAndEndOffset(deviceMetadataIndexNode, path.getDevice(), true, true);
+ if (metadataIndexPair == null) {
+ return null;
+ }
+ ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
+ MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode;
+ if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
+ try {
+ metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer);
+ } catch (BufferOverflowException e) {
+ logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file);
+ throw e;
+ }
+ metadataIndexPair =
+ getMetadataAndEndOffset(metadataIndexNode, path.getMeasurement(), false, false);
+ }
+ return metadataIndexPair;
+ }
+
public List<TimeseriesMetadata> readTimeseriesMetadata(String device, Set<String> measurements)
throws IOException {
readFileMetadata();
@@ -633,7 +672,7 @@ public class TsFileSequenceReader implements AutoCloseable {
}
}
- /** TimeseriesMetadata don't need deserialize chunk metadata list */
+ /* TimeseriesMetadata don't need deserialize chunk metadata list */
public Map<String, List<TimeseriesMetadata>> getAllTimeseriesMetadata() throws IOException {
if (tsFileMetaData == null) {
readFileMetadata();
@@ -659,7 +698,7 @@ public class TsFileSequenceReader implements AutoCloseable {
return timeseriesMetadataMap;
}
- /** This method will only deserialize the TimeseriesMetadata, not including chunk metadata list */
+ /* This method will only deserialize the TimeseriesMetadata, not including chunk metadata list */
private List<TimeseriesMetadata> getDeviceTimeseriesMetadataWithoutChunkMetadata(String device)
throws IOException {
MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
@@ -684,10 +723,7 @@ public class TsFileSequenceReader implements AutoCloseable {
return deviceTimeseriesMetadata;
}
- /**
- * This method will not only deserialize the TimeseriesMetadata, but also all the chunk metadata
- * list meanwhile.
- */
+ /* This method will not only deserialize the TimeseriesMetadata, but also all the chunk metadata list meanwhile. */
private List<TimeseriesMetadata> getDeviceTimeseriesMetadata(String device) throws IOException {
MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
Pair<MetadataIndexEntry, Long> metadataIndexPair =
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
index 2541e83..d07007b 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
@@ -196,10 +196,8 @@ public class TsFileSequenceReaderForV2 extends TsFileSequenceReader implements A
return searchResult >= 0 ? timeseriesMetadataList.get(searchResult) : null;
}
- /**
- * Find the leaf node that contains path, return all the sensors in that leaf node which are also
- * in allSensors set
- */
+ /*Find the leaf node that contains path, return all the sensors in that leaf node which are also
+ in allSensors set */
@SuppressWarnings("squid:S3776")
@Override
public List<TimeseriesMetadata> readTimeseriesMetadata(Path path, Set<String> allSensors)
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
index 522eff5..d0447ed 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.tsfile.write.chunk;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.compress.ICompressor;
import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
@@ -228,7 +229,7 @@ public class TimeChunkWriter {
statistics,
pageBuffer.size(),
numOfPages,
- 0x80);
+ TsFileConstant.TIME_COLUMN_MASK);
long dataOffset = writer.getPos();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
index 17e137d..d117722 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.tsfile.write.chunk;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.compress.ICompressor;
import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
@@ -225,7 +226,7 @@ public class ValueChunkWriter {
statistics,
pageBuffer.size(),
numOfPages,
- 0x40);
+ TsFileConstant.VALUE_COLUMN_MASK);
long dataOffset = writer.getPos();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 41ed225..9fe7453 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -234,6 +234,7 @@ public class TsFileIOWriter {
*
* @throws IOException if I/O error occurs
*/
+ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void endFile() throws IOException {
long metaOffset = out.getPosition();
@@ -241,15 +242,44 @@ public class TsFileIOWriter {
ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
// group ChunkMetadata by series
+ // only contains ordinary path and time column of vector series
Map<Path, List<IChunkMetadata>> chunkMetadataListMap = new TreeMap<>();
+
+ // time column -> ChunkMetadataList TreeMap of value columns in vector
+ Map<Path, Map<Path, List<IChunkMetadata>>> vectorToPathsMap = new HashMap<>();
+
for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
- for (IChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
- Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
- chunkMetadataListMap.computeIfAbsent(series, k -> new ArrayList<>()).add(chunkMetadata);
+ List<ChunkMetadata> chunkMetadatas = chunkGroupMetadata.getChunkMetadataList();
+ int idx = 0;
+ while (idx < chunkMetadatas.size()) {
+ IChunkMetadata chunkMetadata = chunkMetadatas.get(idx);
+ if (chunkMetadata.getMask() == 0) {
+ Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
+ chunkMetadataListMap.computeIfAbsent(series, k -> new ArrayList<>()).add(chunkMetadata);
+ idx++;
+ } else if (chunkMetadata.isTimeColumn()) {
+ // time column of a vector series
+ Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
+ chunkMetadataListMap.computeIfAbsent(series, k -> new ArrayList<>()).add(chunkMetadata);
+ idx++;
+ Map<Path, List<IChunkMetadata>> chunkMetadataListMapInVector = new TreeMap<>();
+
+ // value columns of a vector series
+ while (idx < chunkMetadatas.size() && chunkMetadatas.get(idx).isValueColumn()) {
+ chunkMetadata = chunkMetadatas.get(idx);
+ Path vectorSeries =
+ new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
+ chunkMetadataListMapInVector
+ .computeIfAbsent(vectorSeries, k -> new ArrayList<>())
+ .add(chunkMetadata);
+ idx++;
+ }
+ vectorToPathsMap.put(series, chunkMetadataListMapInVector);
+ }
}
}
- MetadataIndexNode metadataIndex = flushMetadataIndex(chunkMetadataListMap);
+ MetadataIndexNode metadataIndex = flushMetadataIndex(chunkMetadataListMap, vectorToPathsMap);
TsFileMetadata tsFileMetaData = new TsFileMetadata();
tsFileMetaData.setMetadataIndex(metadataIndex);
tsFileMetaData.setMetaOffset(metaOffset);
@@ -290,49 +320,70 @@ public class TsFileIOWriter {
*
* @return MetadataIndexEntry list in TsFileMetadata
*/
- private MetadataIndexNode flushMetadataIndex(Map<Path, List<IChunkMetadata>> chunkMetadataListMap)
+ private MetadataIndexNode flushMetadataIndex(
+ Map<Path, List<IChunkMetadata>> chunkMetadataListMap,
+ Map<Path, Map<Path, List<IChunkMetadata>>> vectorToPathsMap)
throws IOException {
// convert ChunkMetadataList to this field
deviceTimeseriesMetadataMap = new LinkedHashMap<>();
// create device -> TimeseriesMetaDataList Map
for (Map.Entry<Path, List<IChunkMetadata>> entry : chunkMetadataListMap.entrySet()) {
- Path path = entry.getKey();
- String device = path.getDevice();
-
- // create TimeseriesMetaData
- PublicBAOS publicBAOS = new PublicBAOS();
- TSDataType dataType = entry.getValue().get(entry.getValue().size() - 1).getDataType();
- Statistics seriesStatistics = Statistics.getStatsByType(dataType);
-
- int chunkMetadataListLength = 0;
- boolean serializeStatistic = (entry.getValue().size() > 1);
- // flush chunkMetadataList one by one
- for (IChunkMetadata chunkMetadata : entry.getValue()) {
- if (!chunkMetadata.getDataType().equals(dataType)) {
- continue;
- }
- chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, serializeStatistic);
- seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
- }
- TimeseriesMetadata timeseriesMetadata =
- new TimeseriesMetadata(
- (byte)
- ((serializeStatistic ? (byte) 1 : (byte) 0) | entry.getValue().get(0).getMask()),
- chunkMetadataListLength,
- path.getMeasurement(),
- dataType,
- seriesStatistics,
- publicBAOS);
- deviceTimeseriesMetadataMap
- .computeIfAbsent(device, k -> new ArrayList<>())
- .add(timeseriesMetadata);
+ // for ordinary path
+ flushOneChunkMetadata(entry.getKey(), entry.getValue(), vectorToPathsMap);
}
// construct TsFileMetadata and return
return MetadataIndexConstructor.constructMetadataIndex(deviceTimeseriesMetadataMap, out);
}
+ private void flushOneChunkMetadata(
+ Path path,
+ List<IChunkMetadata> chunkMetadataList,
+ Map<Path, Map<Path, List<IChunkMetadata>>> vectorToPathsMap)
+ throws IOException {
+ // create TimeseriesMetaData
+ PublicBAOS publicBAOS = new PublicBAOS();
+ TSDataType dataType = chunkMetadataList.get(chunkMetadataList.size() - 1).getDataType();
+ Statistics seriesStatistics = Statistics.getStatsByType(dataType);
+
+ int chunkMetadataListLength = 0;
+ boolean serializeStatistic = (chunkMetadataList.size() > 1);
+ // flush chunkMetadataList one by one
+ for (IChunkMetadata chunkMetadata : chunkMetadataList) {
+ if (!chunkMetadata.getDataType().equals(dataType)) {
+ continue;
+ }
+ chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, serializeStatistic);
+ seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
+ }
+
+ TimeseriesMetadata timeseriesMetadata =
+ new TimeseriesMetadata(
+ (byte)
+ ((serializeStatistic ? (byte) 1 : (byte) 0) | chunkMetadataList.get(0).getMask()),
+ chunkMetadataListLength,
+ path.getMeasurement(),
+ dataType,
+ seriesStatistics,
+ publicBAOS);
+ deviceTimeseriesMetadataMap
+ .computeIfAbsent(path.getDevice(), k -> new ArrayList<>())
+ .add(timeseriesMetadata);
+
+ // for VECTOR
+ for (IChunkMetadata chunkMetadata : chunkMetadataList) {
+ // chunkMetadata is time column of a vector series
+ if (chunkMetadata.isTimeColumn()) {
+ Map<Path, List<IChunkMetadata>> vectorMap = vectorToPathsMap.get(path);
+
+ for (Map.Entry<Path, List<IChunkMetadata>> entry : vectorMap.entrySet()) {
+ flushOneChunkMetadata(entry.getKey(), entry.getValue(), vectorToPathsMap);
+ }
+ }
+ }
+ }
+
/**
* get the length of normal OutputStream.
*