You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/03/15 01:19:20 UTC
[iotdb] 03/05: need haonan
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch TYQuery
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9e466ca06a9f038ca57e16bc919791e393a15085
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Sun Mar 14 19:52:08 2021 +0800
need haonan
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 4 +-
.../iotdb/db/engine/memtable/AbstractMemTable.java | 13 +++
.../apache/iotdb/db/engine/memtable/IMemTable.java | 9 ++
.../db/engine/querycontext/QueryDataSource.java | 11 +--
.../db/engine/querycontext/ReadOnlyMemChunk.java | 5 ++
.../engine/storagegroup/StorageGroupProcessor.java | 99 ++++++++++------------
.../db/engine/storagegroup/TsFileProcessor.java | 95 +++++++++++++--------
.../db/engine/storagegroup/TsFileResource.java | 40 ++++-----
.../org/apache/iotdb/db/metadata/MManager.java | 65 ++++++++------
.../iotdb/db/query/context/QueryContext.java | 11 ++-
.../chunk/metadata/DiskChunkMetadataLoader.java | 5 ++
.../chunk/metadata/MemChunkMetadataLoader.java | 5 ++
.../org/apache/iotdb/db/utils/FileLoaderUtils.java | 1 -
.../storagegroup/StorageGroupProcessorTest.java | 23 +++--
.../iotdb/db/engine/storagegroup/TTLTest.java | 34 ++++----
.../engine/storagegroup/TsFileProcessorTest.java | 42 ++++-----
.../reader/series/SeriesAggregateReaderTest.java | 20 ++---
.../reader/series/SeriesReaderByTimestampTest.java | 1 -
.../tsfile/file/metadata/TimeseriesMetadata.java | 4 +
.../file/metadata/VectorTimeSeriesMetadata.java | 29 ++++---
.../read/controller/IChunkMetadataLoader.java | 2 +
21 files changed, 285 insertions(+), 233 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index b774e52..2b5cd02 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -665,10 +665,8 @@ public class StorageEngine implements IService {
throws StorageEngineException, QueryProcessException {
PartialPath fullPath = (PartialPath) seriesExpression.getSeriesPath();
PartialPath deviceId = fullPath.getDevicePath();
- String measurementId = seriesExpression.getSeriesPath().getMeasurement();
StorageGroupProcessor storageGroupProcessor = getProcessor(deviceId);
- return storageGroupProcessor.query(
- deviceId, measurementId, context, filePathsManager, seriesExpression.getFilter());
+ return storageGroupProcessor.query(fullPath, context, filePathsManager, seriesExpression.getFilter());
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 3538aa1..4d68a78 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -274,7 +274,20 @@ public abstract class AbstractMemTable implements IMemTable {
measurement, dataType, encoding, chunkCopy, props, curSize, deletionList);
}
+
+ // TODO BY HAONAN HOU
@Override
+ public ReadOnlyMemChunk query(
+ String deviceId,
+ String measurement,
+ IMeasurementSchema schema,
+ long timeLowerBound,
+ List<TimeRange> deletionList)
+ throws IOException, QueryProcessException, MetadataException {
+ return null;
+ }
+
+ @Override
public void delete(
PartialPath originalPath, PartialPath devicePath, long startTimestamp, long endTimestamp) {
Map<String, IWritableMemChunk> deviceMap = memTableMap.get(devicePath.getFullPath());
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index b3fb16a..e9e9c33 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -94,6 +94,15 @@ public interface IMemTable {
List<TimeRange> deletionList)
throws IOException, QueryProcessException, MetadataException;
+
+ ReadOnlyMemChunk query(
+ String deviceId,
+ String measurement,
+ IMeasurementSchema schema,
+ long timeLowerBound,
+ List<TimeRange> deletionList)
+ throws IOException, QueryProcessException, MetadataException;
+
/** putBack all the memory resources. */
void clear();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
index b3dfb98..a3fc963 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
@@ -19,16 +19,13 @@
package org.apache.iotdb.db.engine.querycontext;
+import java.util.List;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
-import java.util.List;
-
public class QueryDataSource {
- private PartialPath seriesPath;
private List<TsFileResource> seqResources;
private List<TsFileResource> unseqResources;
@@ -36,18 +33,12 @@ public class QueryDataSource {
private long dataTTL = Long.MAX_VALUE;
public QueryDataSource(
- PartialPath seriesPath,
List<TsFileResource> seqResources,
List<TsFileResource> unseqResources) {
- this.seriesPath = seriesPath;
this.seqResources = seqResources;
this.unseqResources = unseqResources;
}
- public PartialPath getSeriesPath() {
- return seriesPath;
- }
-
public List<TsFileResource> getSeqResources() {
return seqResources;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
index 8236e15..c635968 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
@@ -59,6 +59,7 @@ public class ReadOnlyMemChunk {
private int chunkDataSize;
+ // TODO BY HAONAN HOU
public ReadOnlyMemChunk(
String measurementUid,
TSDataType dataType,
@@ -124,6 +125,9 @@ public class ReadOnlyMemChunk {
case DOUBLE:
statsByType.update(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble());
break;
+ case VECTOR:
+ statsByType.update(timeValuePair.getTimestamp());
+ break;
default:
throw new QueryProcessException("Unsupported data type:" + dataType);
}
@@ -143,6 +147,7 @@ public class ReadOnlyMemChunk {
return !chunkPointReader.hasNextTimeValuePair();
}
+ // TODO BY HAONAN HOU
public ChunkMetadata getChunkMetaData() {
return cachedMetaData;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 1f9d80e..e67b691 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -18,6 +18,38 @@
*/
package org.apache.iotdb.db.engine.storagegroup;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
+import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -77,43 +109,9 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
-
-import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
-import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
-import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
-
/**
* For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one
* TsFileProcessor in the working status. <br>
@@ -1535,8 +1533,7 @@ public class StorageGroupProcessor {
// TODO need a read lock, please consider the concurrency with flush manager threads.
public QueryDataSource query(
- PartialPath deviceId,
- String measurementId,
+ PartialPath fullPath,
QueryContext context,
QueryFileManager filePathsManager,
Filter timeFilter)
@@ -1547,8 +1544,7 @@ public class StorageGroupProcessor {
getFileResourceListForQuery(
tsFileManagement.getTsFileList(true),
upgradeSeqFileList,
- deviceId,
- measurementId,
+ fullPath,
context,
timeFilter,
true);
@@ -1556,12 +1552,11 @@ public class StorageGroupProcessor {
getFileResourceListForQuery(
tsFileManagement.getTsFileList(false),
upgradeUnseqFileList,
- deviceId,
- measurementId,
+ fullPath,
context,
timeFilter,
false);
- QueryDataSource dataSource = new QueryDataSource(deviceId, seqResources, unseqResources);
+ QueryDataSource dataSource = new QueryDataSource(seqResources, unseqResources);
// used files should be added before mergeLock is unlocked, or they may be deleted by
// running merge
// is null only in tests
@@ -1592,24 +1587,24 @@ public class StorageGroupProcessor {
private List<TsFileResource> getFileResourceListForQuery(
Collection<TsFileResource> tsFileResources,
List<TsFileResource> upgradeTsFileResources,
- PartialPath deviceId,
- String measurementId,
+ PartialPath fullPath,
QueryContext context,
Filter timeFilter,
boolean isSeq)
throws MetadataException {
+ String deviceId = fullPath.getDevice();
if (config.isDebugOn()) {
DEBUG_LOGGER.info(
"Path: {}.{}, get tsfile list: {} isSeq: {} timefilter: {}",
- deviceId.getFullPath(),
- measurementId,
+ deviceId,
+ fullPath.getMeasurement(),
tsFileResources,
isSeq,
(timeFilter == null ? "null" : timeFilter));
}
- IMeasurementSchema schema = IoTDB.metaManager.getSeriesSchema(deviceId, measurementId);
+ IMeasurementSchema schema = IoTDB.metaManager.getSeriesSchema(fullPath);
List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
long timeLowerBound =
@@ -1618,7 +1613,7 @@ public class StorageGroupProcessor {
// for upgrade files and old files must be closed
for (TsFileResource tsFileResource : upgradeTsFileResources) {
- if (!tsFileResource.isSatisfied(deviceId.getFullPath(), timeFilter, isSeq, dataTTL)) {
+ if (!tsFileResource.isSatisfied(deviceId, timeFilter, isSeq, dataTTL)) {
continue;
}
closeQueryLock.readLock().lock();
@@ -1630,7 +1625,7 @@ public class StorageGroupProcessor {
}
for (TsFileResource tsFileResource : tsFileResources) {
- if (!tsFileResource.isSatisfied(deviceId.getFullPath(), timeFilter, isSeq, dataTTL)) {
+ if (!tsFileResource.isSatisfied(fullPath.getDevice(), timeFilter, isSeq, dataTTL)) {
continue;
}
closeQueryLock.readLock().lock();
@@ -1641,11 +1636,9 @@ public class StorageGroupProcessor {
tsFileResource
.getUnsealedFileProcessor()
.query(
- deviceId.getFullPath(),
- measurementId,
- schema.getType(),
- schema.getEncodingType(),
- schema.getProps(),
+ deviceId,
+ fullPath.getMeasurement(),
+ schema,
context,
tsfileResourcesForQuery);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 1e705ab..773e5dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -18,6 +18,16 @@
*/
package org.apache.iotdb.db.engine.storagegroup;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -57,28 +67,18 @@ import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.VectorChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
@SuppressWarnings("java:S1135") // ignore todos
public class TsFileProcessor {
@@ -91,7 +91,9 @@ public class TsFileProcessor {
private StorageGroupInfo storageGroupInfo;
private TsFileProcessorInfo tsFileProcessorInfo;
- /** sync this object in query() and asyncTryToFlush() */
+ /**
+ * sync this object in query() and asyncTryToFlush()
+ */
private final ConcurrentLinkedDeque<IMemTable> flushingMemTables = new ConcurrentLinkedDeque<>();
private List<Pair<Modification, IMemTable>> modsToMemtable = new ArrayList<>();
@@ -113,7 +115,9 @@ public class TsFileProcessor {
private IMemTable workMemTable;
- /** this callback is called before the workMemtable is added into the flushingMemTables. */
+ /**
+ * this callback is called before the workMemtable is added into the flushingMemTables.
+ */
private final UpdateEndTimeCallBack updateLatestFlushTimeCallback;
private WriteLogNode logNode;
@@ -220,9 +224,9 @@ public class TsFileProcessor {
* the range [start, end)
*
* @param insertTabletPlan insert a tablet of a device
- * @param start start index of rows to be inserted in insertTabletPlan
- * @param end end index of rows to be inserted in insertTabletPlan
- * @param results result array
+ * @param start start index of rows to be inserted in insertTabletPlan
+ * @param end end index of rows to be inserted in insertTabletPlan
+ * @param results result array
*/
public void insertTablet(
InsertTabletPlan insertTabletPlan, int start, int end, TSStatus[] results)
@@ -637,7 +641,9 @@ public class TsFileProcessor {
}
}
- /** put the working memtable into flushing list and set the working memtable to null */
+ /**
+ * put the working memtable into flushing list and set the working memtable to null
+ */
public void asyncFlush() {
flushQueryLock.writeLock().lock();
if (logger.isDebugEnabled()) {
@@ -704,7 +710,9 @@ public class TsFileProcessor {
FlushManager.getInstance().registerTsFileProcessor(this);
}
- /** put back the memtable to MemTablePool and make metadata in writer visible */
+ /**
+ * put back the memtable to MemTablePool and make metadata in writer visible
+ */
private void releaseFlushedMemTable(IMemTable memTable) {
flushQueryLock.writeLock().lock();
if (logger.isDebugEnabled()) {
@@ -1043,18 +1051,14 @@ public class TsFileProcessor {
* memtables and then compact them into one TimeValuePairSorter). Then get the related
* ChunkMetadata of data on disk.
*
- * @param deviceId device id
+ * @param deviceId device id
* @param measurementId measurements id
- * @param dataType data type
- * @param encoding encoding
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void query(
String deviceId,
String measurementId,
- TSDataType dataType,
- TSEncoding encoding,
- Map<String, String> props,
+ IMeasurementSchema schema,
QueryContext context,
List<TsFileResource> tsfileResourcesForQuery)
throws IOException, MetadataException {
@@ -1078,9 +1082,7 @@ public class TsFileProcessor {
flushingMemTable.query(
deviceId,
measurementId,
- dataType,
- encoding,
- props,
+ schema,
context.getQueryTimeLowerBound(),
deletionList);
if (memChunk != null) {
@@ -1092,9 +1094,7 @@ public class TsFileProcessor {
workMemTable.query(
deviceId,
measurementId,
- dataType,
- encoding,
- props,
+ schema,
context.getQueryTimeLowerBound(),
null);
if (memChunk != null) {
@@ -1108,8 +1108,33 @@ public class TsFileProcessor {
modificationFile,
new PartialPath(deviceId + IoTDBConstant.PATH_SEPARATOR + measurementId));
- List<ChunkMetadata> chunkMetadataList =
- writer.getVisibleMetadataList(deviceId, measurementId, dataType);
+ List<IChunkMetadata> chunkMetadataList = new ArrayList<>();
+ if (schema instanceof VectorMeasurementSchema) {
+ List<ChunkMetadata> timeChunkMetadataList =
+ writer.getVisibleMetadataList(deviceId, measurementId, schema.getType());
+ List<List<ChunkMetadata>> valueChunkMetadataList = new ArrayList<>();
+ List<String> valueMeasurementIdList = schema.getValueMeasurementIdList();
+ List<TSDataType> valueDataTypeList = schema.getValueTSDataTypeList();
+ for (int i = 0; i < valueMeasurementIdList.size(); i++) {
+ valueChunkMetadataList.add(writer
+ .getVisibleMetadataList(deviceId, valueMeasurementIdList.get(i),
+ valueDataTypeList.get(i)));
+ }
+
+
+ for (int i = 0; i < timeChunkMetadataList.size(); i++) {
+ List<IChunkMetadata> valueChunkMetadata = new ArrayList<>();
+ for (List<ChunkMetadata> chunkMetadata : valueChunkMetadataList) {
+ valueChunkMetadata.add(chunkMetadata.get(i));
+ }
+ chunkMetadataList
+ .add(new VectorChunkMetadata(timeChunkMetadataList.get(i), valueChunkMetadata));
+ }
+ } else {
+ chunkMetadataList =
+ new ArrayList<>(writer.getVisibleMetadataList(deviceId, measurementId, schema.getType()));
+ }
+
QueryUtils.modifyChunkMetaData(chunkMetadataList, modifications);
chunkMetadataList.removeIf(context::chunkNotSatisfy);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 6d4f58b..57472f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -18,6 +18,20 @@
*/
package org.apache.iotdb.db.engine.storagegroup;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.modification.ModificationFile;
@@ -32,7 +46,6 @@ import org.apache.iotdb.db.service.UpgradeSevice;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -41,25 +54,9 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.file.FileAlreadyExistsException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Random;
-import java.util.Set;
-
@SuppressWarnings("java:S1135") // ignore todos
public class TsFileResource {
@@ -103,7 +100,7 @@ public class TsFileResource {
* Chunk metadata list of unsealed tsfile. Only be set in a temporal TsFileResource in a query
* process.
*/
- private List<ChunkMetadata> chunkMetadataList;
+ private List<IChunkMetadata> chunkMetadataList;
/** Mem chunk data. Only be set in a temporal TsFileResource in a query process. */
private List<ReadOnlyMemChunk> readOnlyMemChunk;
@@ -184,7 +181,7 @@ public class TsFileResource {
/** unsealed TsFile */
public TsFileResource(
List<ReadOnlyMemChunk> readOnlyMemChunk,
- List<ChunkMetadata> chunkMetadataList,
+ List<IChunkMetadata> chunkMetadataList,
TsFileResource originTsFileResource)
throws IOException {
this.file = originTsFileResource.file;
@@ -206,7 +203,6 @@ public class TsFileResource {
}
private void generateTimeSeriesMetadata() throws IOException {
- timeSeriesMetadata = new TimeseriesMetadata();
timeSeriesMetadata.setOffsetOfChunkMetaDataList(-1);
timeSeriesMetadata.setDataSizeOfChunkMetaDataList(-1);
@@ -223,7 +219,7 @@ public class TsFileResource {
Statistics<?> seriesStatistics =
Statistics.getStatsByType(timeSeriesMetadata.getTSDataType());
// flush chunkMetadataList one by one
- for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+ for (IChunkMetadata chunkMetadata : chunkMetadataList) {
seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
}
@@ -234,7 +230,7 @@ public class TsFileResource {
}
timeSeriesMetadata.setStatistics(seriesStatistics);
} else {
- timeSeriesMetadata = null;
+ this.timeSeriesMetadata = null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 76703a8..f6c6974 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -18,6 +18,32 @@
*/
package org.apache.iotdb.db.metadata;
+import static java.util.stream.Collectors.toList;
+import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
@@ -73,37 +99,9 @@ import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static java.util.stream.Collectors.toList;
-import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
-
/**
* This class takes the responsibility of serialization of all the metadata info and persistent it
* into files. This class contains all the interfaces to modify the metadata for delta system. All
@@ -1015,6 +1013,17 @@ public class MManager {
return null;
}
+ // TODO BY ZESONG SUN
+ public IMeasurementSchema getSeriesSchema(PartialPath fullPath)
+ throws MetadataException {
+ MNode node = mtree.getNodeByPath(fullPath.getDevicePath());
+ MNode leaf = node.getChild(fullPath.getMeasurement());
+ if (leaf != null) {
+ return ((MeasurementMNode) leaf).getSchema();
+ }
+ return null;
+ }
+
/**
* Get child node path in the next level of the given path.
*
diff --git a/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java b/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
index 3febfa9..0c53d08 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
@@ -19,16 +19,15 @@
package org.apache.iotdb.db.query.context;
-import org.apache.iotdb.db.engine.modification.Modification;
-import org.apache.iotdb.db.engine.modification.ModificationFile;
-import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
/** QueryContext contains the shared information with in a query. */
public class QueryContext {
@@ -95,7 +94,7 @@ public class QueryContext {
this.queryTimeLowerBound = queryTimeLowerBound;
}
- public boolean chunkNotSatisfy(ChunkMetadata chunkMetaData) {
+ public boolean chunkNotSatisfy(IChunkMetadata chunkMetaData) {
return chunkMetaData.getEndTime() < queryTimeLowerBound;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java
index 3a6fc2a..4ac0080 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java
@@ -72,6 +72,11 @@ public class DiskChunkMetadataLoader implements IChunkMetadataLoader {
return chunkMetadataList;
}
+ @Override
+ public boolean isMemChunkMetadataLoader() {
+ return false;
+ }
+
public static void setDiskChunkLoader(
List<IChunkMetadata> chunkMetadataList,
TsFileResource resource,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java
index 8a22d0b..cf022e3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java
@@ -73,4 +73,9 @@ public class MemChunkMetadataLoader implements IChunkMetadataLoader {
}
return chunkMetadataList;
}
+
+ @Override
+ public boolean isMemChunkMetadataLoader() {
+ return true;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 050de31..0f9297a 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -157,7 +157,6 @@ public class FileLoaderUtils {
*/
public static List<IPageReader> loadPageReaderList(
IChunkMetadata chunkMetaData, Filter timeFilter) throws IOException {
- // TODO memory Vector chunk metadata
if (chunkMetaData == null) {
throw new IOException("Can't init null chunkMeta");
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index e68c4af..6b64b13 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -125,9 +126,7 @@ public class StorageGroupProcessorTest {
tsfileProcessor.query(
deviceId,
measurementId,
- TSDataType.INT32,
- TSEncoding.RLE,
- Collections.emptyMap(),
+ new MeasurementSchema(measurementId, TSDataType.INT32, TSEncoding.RLE, CompressionType.UNCOMPRESSED, Collections.emptyMap()),
new QueryContext(),
tsfileResourcesForQuery);
}
@@ -162,7 +161,7 @@ public class StorageGroupProcessorTest {
}
processor.syncCloseAllWorkingTsFileProcessors();
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId), measurementId, context, null, null);
+ processor.query(new PartialPath(deviceId, measurementId), context, null, null);
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
@@ -191,7 +190,7 @@ public class StorageGroupProcessorTest {
processor.syncCloseAllWorkingTsFileProcessors();
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId), measurementId, context, null, null);
+ processor.query(new PartialPath(deviceId, measurementId), context, null, null);
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
}
@@ -252,7 +251,7 @@ public class StorageGroupProcessorTest {
processor.syncCloseAllWorkingTsFileProcessors();
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId), measurementId, context, null, null);
+ processor.query(new PartialPath(deviceId, measurementId), context, null, null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
Assert.assertEquals(1, queryDataSource.getUnseqResources().size());
@@ -282,7 +281,7 @@ public class StorageGroupProcessorTest {
processor.syncCloseAllWorkingTsFileProcessors();
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId), measurementId, context, null, null);
+ processor.query(new PartialPath(deviceId, measurementId), context, null, null);
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
Assert.assertEquals(10, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
@@ -322,7 +321,7 @@ public class StorageGroupProcessorTest {
}
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId), measurementId, context, null, null);
+ processor.query(new PartialPath(deviceId, measurementId), context, null, null);
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
@@ -404,7 +403,7 @@ public class StorageGroupProcessorTest {
}
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId), measurementId, context, null, null);
+ processor.query(new PartialPath(deviceId, measurementId), context, null, null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
@@ -486,7 +485,7 @@ public class StorageGroupProcessorTest {
}
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId), measurementId, context, null, null);
+ processor.query(new PartialPath(deviceId, measurementId), context, null, null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
@@ -568,7 +567,7 @@ public class StorageGroupProcessorTest {
}
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId), measurementId, context, null, null);
+ processor.query(new PartialPath(deviceId, measurementId), context, null, null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
@@ -606,7 +605,7 @@ public class StorageGroupProcessorTest {
}
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId), measurementId, context, null, null);
+ processor.query(new PartialPath(deviceId, measurementId), context, null, null);
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index 1943ef3..a76f09c 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -20,6 +20,19 @@
package org.apache.iotdb.db.engine.storagegroup;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
@@ -54,25 +67,10 @@ import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
public class TTLTest {
private String sg1 = "root.TTL_SG1";
@@ -223,7 +221,7 @@ public class TTLTest {
// files before ttl
QueryDataSource dataSource =
storageGroupProcessor.query(
- new PartialPath(sg1), s1, EnvironmentUtils.TEST_QUERY_CONTEXT, null, null);
+ new PartialPath(sg1, s1), EnvironmentUtils.TEST_QUERY_CONTEXT, null, null);
List<TsFileResource> seqResource = dataSource.getSeqResources();
List<TsFileResource> unseqResource = dataSource.getUnseqResources();
assertEquals(4, seqResource.size());
@@ -234,7 +232,7 @@ public class TTLTest {
// files after ttl
dataSource =
storageGroupProcessor.query(
- new PartialPath(sg1), s1, EnvironmentUtils.TEST_QUERY_CONTEXT, null, null);
+ new PartialPath(sg1, s1), EnvironmentUtils.TEST_QUERY_CONTEXT, null, null);
seqResource = dataSource.getSeqResources();
unseqResource = dataSource.getUnseqResources();
assertTrue(seqResource.size() < 4);
@@ -269,7 +267,7 @@ public class TTLTest {
storageGroupProcessor.setDataTTL(0);
dataSource =
storageGroupProcessor.query(
- new PartialPath(sg1), s1, EnvironmentUtils.TEST_QUERY_CONTEXT, null, null);
+ new PartialPath(sg1, s1), EnvironmentUtils.TEST_QUERY_CONTEXT, null, null);
seqResource = dataSource.getSeqResources();
unseqResource = dataSource.getUnseqResources();
assertEquals(0, seqResource.size());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
index f972a83..276cacd 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
@@ -18,6 +18,15 @@
*/
package org.apache.iotdb.db.engine.storagegroup;
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
@@ -30,30 +39,21 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import static junit.framework.TestCase.assertTrue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
public class TsFileProcessorTest {
private TsFileProcessor processor;
@@ -104,7 +104,7 @@ public class TsFileProcessorTest {
SystemInfo.getInstance().reportStorageGroupStatus(sgInfo);
List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
processor.query(
- deviceId, measurementId, dataType, encoding, props, context, tsfileResourcesForQuery);
+ deviceId, measurementId, new MeasurementSchema(measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props), context, tsfileResourcesForQuery);
assertTrue(tsfileResourcesForQuery.isEmpty());
for (int i = 1; i <= 100; i++) {
@@ -116,7 +116,7 @@ public class TsFileProcessorTest {
// query data in memory
tsfileResourcesForQuery.clear();
processor.query(
- deviceId, measurementId, dataType, encoding, props, context, tsfileResourcesForQuery);
+ deviceId, measurementId, new MeasurementSchema(measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props), context, tsfileResourcesForQuery);
assertFalse(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk().isEmpty());
List<ReadOnlyMemChunk> memChunks = tsfileResourcesForQuery.get(0).getReadOnlyMemChunk();
for (ReadOnlyMemChunk chunk : memChunks) {
@@ -134,7 +134,7 @@ public class TsFileProcessorTest {
tsfileResourcesForQuery.clear();
processor.query(
- deviceId, measurementId, dataType, encoding, props, context, tsfileResourcesForQuery);
+ deviceId, measurementId, new MeasurementSchema(measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props), context, tsfileResourcesForQuery);
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk().isEmpty());
assertEquals(1, tsfileResourcesForQuery.get(0).getChunkMetadataList().size());
assertEquals(
@@ -166,7 +166,7 @@ public class TsFileProcessorTest {
SystemInfo.getInstance().reportStorageGroupStatus(sgInfo);
List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
processor.query(
- deviceId, measurementId, dataType, encoding, props, context, tsfileResourcesForQuery);
+ deviceId, measurementId, new MeasurementSchema(measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props), context, tsfileResourcesForQuery);
assertTrue(tsfileResourcesForQuery.isEmpty());
for (int i = 1; i <= 100; i++) {
@@ -178,7 +178,7 @@ public class TsFileProcessorTest {
// query data in memory
tsfileResourcesForQuery.clear();
processor.query(
- deviceId, measurementId, dataType, encoding, props, context, tsfileResourcesForQuery);
+ deviceId, measurementId, new MeasurementSchema(measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props), context, tsfileResourcesForQuery);
assertFalse(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk().isEmpty());
int num = 1;
List<ReadOnlyMemChunk> memChunks = tsfileResourcesForQuery.get(0).getReadOnlyMemChunk();
@@ -197,7 +197,7 @@ public class TsFileProcessorTest {
tsfileResourcesForQuery.clear();
processor.query(
- deviceId, measurementId, dataType, encoding, props, context, tsfileResourcesForQuery);
+ deviceId, measurementId, new MeasurementSchema(measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props), context, tsfileResourcesForQuery);
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk().isEmpty());
assertEquals(1, tsfileResourcesForQuery.get(0).getChunkMetadataList().size());
assertEquals(
@@ -254,7 +254,7 @@ public class TsFileProcessorTest {
SystemInfo.getInstance().reportStorageGroupStatus(sgInfo);
List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
processor.query(
- deviceId, measurementId, dataType, encoding, props, context, tsfileResourcesForQuery);
+ deviceId, measurementId, new MeasurementSchema(measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props), context, tsfileResourcesForQuery);
assertTrue(tsfileResourcesForQuery.isEmpty());
for (int flushId = 0; flushId < 10; flushId++) {
@@ -269,7 +269,7 @@ public class TsFileProcessorTest {
tsfileResourcesForQuery.clear();
processor.query(
- deviceId, measurementId, dataType, encoding, props, context, tsfileResourcesForQuery);
+ deviceId, measurementId, new MeasurementSchema(measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props), context, tsfileResourcesForQuery);
assertFalse(tsfileResourcesForQuery.isEmpty());
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk().isEmpty());
assertEquals(10, tsfileResourcesForQuery.get(0).getChunkMetadataList().size());
@@ -302,7 +302,7 @@ public class TsFileProcessorTest {
List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
processor.query(
- deviceId, measurementId, dataType, encoding, props, context, tsfileResourcesForQuery);
+ deviceId, measurementId, new MeasurementSchema(measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props), context, tsfileResourcesForQuery);
assertTrue(tsfileResourcesForQuery.isEmpty());
for (int i = 1; i <= 100; i++) {
@@ -314,7 +314,7 @@ public class TsFileProcessorTest {
// query data in memory
tsfileResourcesForQuery.clear();
processor.query(
- deviceId, measurementId, dataType, encoding, props, context, tsfileResourcesForQuery);
+ deviceId, measurementId, new MeasurementSchema(measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props), context, tsfileResourcesForQuery);
assertFalse(tsfileResourcesForQuery.isEmpty());
assertFalse(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk().isEmpty());
List<ReadOnlyMemChunk> memChunks = tsfileResourcesForQuery.get(0).getReadOnlyMemChunk();
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
index 55eea5c..a713157 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
@@ -19,6 +19,15 @@
package org.apache.iotdb.db.query.reader.series;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -33,19 +42,10 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static org.junit.Assert.*;
-
public class SeriesAggregateReaderTest {
private static final String SERIES_READER_TEST_SG = "root.seriesReaderTest";
@@ -71,7 +71,7 @@ public class SeriesAggregateReaderTest {
PartialPath path = new PartialPath(SERIES_READER_TEST_SG + ".device0.sensor0");
Set<String> allSensors = new HashSet<>();
allSensors.add("sensor0");
- QueryDataSource queryDataSource = new QueryDataSource(path, seqResources, unseqResources);
+ QueryDataSource queryDataSource = new QueryDataSource(seqResources, unseqResources);
SeriesAggregateReader seriesReader =
new SeriesAggregateReader(
path,
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
index 4e71afa..129f18d 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
@@ -63,7 +63,6 @@ public class SeriesReaderByTimestampTest {
public void test() throws IOException, IllegalPathException {
QueryDataSource dataSource =
new QueryDataSource(
- new PartialPath(SERIES_READER_TEST_SG + ".device0.sensor0"),
seqResources,
unseqResources);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
index 10efcb7..e86fcb2 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
@@ -189,6 +189,10 @@ public class TimeseriesMetadata implements Accountable, ITimeSeriesMetadata {
this.chunkMetadataLoader = chunkMetadataLoader;
}
+ public IChunkMetadataLoader getChunkMetadataLoader() {
+ return chunkMetadataLoader;
+ }
+
@Override
public List<IChunkMetadata> loadChunkMetadataList() throws IOException {
return chunkMetadataLoader.loadChunkMetadataList(this);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java
index 20a25df..bfa8e74 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java
@@ -62,23 +62,26 @@ public class VectorTimeSeriesMetadata implements ITimeSeriesMetadata {
@Override
public List<IChunkMetadata> loadChunkMetadataList() throws IOException {
- List<IChunkMetadata> timeChunkMetadata = timeseriesMetadata.loadChunkMetadataList();
- List<List<IChunkMetadata>> valueChunkMetadataList = new ArrayList<>();
- for (TimeseriesMetadata metadata : valueTimeseriesMetadataList) {
- valueChunkMetadataList.add(metadata.loadChunkMetadataList());
- }
+ if (timeseriesMetadata.getChunkMetadataLoader().isMemChunkMetadataLoader()) {
+ return timeseriesMetadata.loadChunkMetadataList();
+ } else {
+ List<IChunkMetadata> timeChunkMetadata = timeseriesMetadata.loadChunkMetadataList();
+ List<List<IChunkMetadata>> valueChunkMetadataList = new ArrayList<>();
+ for (TimeseriesMetadata metadata : valueTimeseriesMetadataList) {
+ valueChunkMetadataList.add(metadata.loadChunkMetadataList());
+ }
- List<IChunkMetadata> res = new ArrayList<>();
+ List<IChunkMetadata> res = new ArrayList<>();
- for (int i = 0; i < timeChunkMetadata.size(); i++) {
- List<IChunkMetadata> chunkMetadataList = new ArrayList<>();
- for (List<IChunkMetadata> chunkMetadata : valueChunkMetadataList) {
- chunkMetadataList.add(chunkMetadata.get(i));
+ for (int i = 0; i < timeChunkMetadata.size(); i++) {
+ List<IChunkMetadata> chunkMetadataList = new ArrayList<>();
+ for (List<IChunkMetadata> chunkMetadata : valueChunkMetadataList) {
+ chunkMetadataList.add(chunkMetadata.get(i));
+ }
+ res.add(new VectorChunkMetadata(timeChunkMetadata.get(i), chunkMetadataList));
}
- res.add(new VectorChunkMetadata(timeChunkMetadata.get(i), chunkMetadataList));
+ return res;
}
-
- return res;
}
@Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IChunkMetadataLoader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IChunkMetadataLoader.java
index 093219d..71371a4 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IChunkMetadataLoader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IChunkMetadataLoader.java
@@ -29,4 +29,6 @@ public interface IChunkMetadataLoader {
/** read all chunk metadata of one time series in one file. */
List<IChunkMetadata> loadChunkMetadataList(ITimeSeriesMetadata timeseriesMetadata)
throws IOException;
+
+ boolean isMemChunkMetadataLoader();
}