You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/01/22 02:58:17 UTC
[incubator-iotdb] branch delete_dev4 updated: add QueryContext to
cache Modifications in the same query fix default value of deletedAt
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch delete_dev4
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/delete_dev4 by this push:
new faae7a2 add QueryContext to cache Modifications in the same query fix default value of deletedAt
faae7a2 is described below
commit faae7a25174098e29d1fcbbcef47b70860c60787
Author: 江天 <jt...@163.com>
AuthorDate: Tue Jan 22 10:57:37 2019 +0800
add QueryContext to cache Modifications in the same query
fix default value of deletedAt
---
.../iotdb/db/engine/filenode/FileNodeManager.java | 5 +-
.../db/engine/filenode/FileNodeProcessor.java | 13 ++--
.../db/engine/overflow/ioV2/OverflowProcessor.java | 26 ++++---
.../db/engine/overflow/ioV2/OverflowResource.java | 5 +-
.../iotdb/db/query/context/QueryContext.java | 84 ++++++++++++++++++++++
.../db/query/control/QueryDataSourceManager.java | 6 +-
.../executor/EngineExecutorWithTimeGenerator.java | 16 +++--
.../EngineExecutorWithoutTimeGenerator.java | 16 +++--
.../iotdb/db/query/executor/EngineQueryRouter.java | 9 ++-
.../db/query/factory/SeriesReaderFactory.java | 13 ++--
.../query/reader/sequence/SealedTsFilesReader.java | 22 ++++--
.../query/reader/sequence/SequenceDataReader.java | 7 +-
.../query/timegenerator/EngineNodeConstructor.java | 21 +++---
.../query/timegenerator/EngineTimeGenerator.java | 9 +--
.../java/org/apache/iotdb/db/utils/QueryUtils.java | 16 -----
.../engine/modification/DeletionFileNodeTest.java | 7 +-
.../overflow/ioV2/OverflowProcessorTest.java | 25 ++++---
.../engine/overflow/ioV2/OverflowResourceTest.java | 8 ++-
.../db/integration/IoTDBEngineTimeGeneratorIT.java | 12 +++-
.../iotdb/tsfile/file/metadata/ChunkMetaData.java | 2 +-
.../org/apache/iotdb/tsfile/read/common/Chunk.java | 2 +-
.../tsfile/read/reader/chunk/ChunkReader.java | 2 +-
.../iotdb/tsfile/read/reader/page/PageReader.java | 2 +-
23 files changed, 226 insertions(+), 102 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index 0e0c92e..db99896 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -58,6 +58,7 @@ import org.apache.iotdb.db.monitor.StatMonitor;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.ServiceType;
@@ -640,7 +641,7 @@ public class FileNodeManager implements IStatistic, IService {
/**
* query data.
*/
- public QueryDataSource query(SingleSeriesExpression seriesExpression)
+ public QueryDataSource query(SingleSeriesExpression seriesExpression, QueryContext context)
throws FileNodeManagerException {
String deviceId = seriesExpression.getSeriesPath().getDevice();
String measurementId = seriesExpression.getSeriesPath().getMeasurement();
@@ -661,7 +662,7 @@ public class FileNodeManager implements IStatistic, IService {
}
try {
queryDataSource = fileNodeProcessor
- .query(deviceId, measurementId, seriesExpression.getFilter());
+ .query(deviceId, measurementId, seriesExpression.getFilter(), context);
} catch (FileNodeProcessorException e) {
LOGGER.error("Query error: the deviceId {}, the measurementId {}", deviceId, measurementId,
e);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index d9b4bd5..442992c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -73,6 +73,7 @@ import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.monitor.IStatistic;
import org.apache.iotdb.db.monitor.MonitorConstants;
import org.apache.iotdb.db.monitor.StatMonitor;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
import org.apache.iotdb.db.query.reader.IReader;
import org.apache.iotdb.db.utils.MemUtils;
@@ -750,7 +751,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
* query data.
*/
public <T extends Comparable<T>> QueryDataSource query(String deviceId, String measurementId,
- Filter filter)
+ Filter filter, QueryContext context)
throws FileNodeProcessorException {
// query overflow data
TSDataType dataType = null;
@@ -761,7 +762,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
OverflowSeriesDataSource overflowSeriesDataSource;
try {
- overflowSeriesDataSource = overflowProcessor.query(deviceId, measurementId, filter, dataType);
+ overflowSeriesDataSource = overflowProcessor.query(deviceId, measurementId, filter, dataType,
+ context);
} catch (IOException e) {
e.printStackTrace();
throw new FileNodeProcessorException(e);
@@ -797,7 +799,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
.queryBufferWriteData(deviceId, measurementId, dataType);
try {
- List<Modification> pathModifications = QueryUtils.getPathModifications(
+ List<Modification> pathModifications = context.getPathModifications(
currentIntervalFileNode.getModFile(), deviceId
+ IoTDBConstant.PATH_SEPARATOR + measurementId
);
@@ -1510,6 +1512,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
// modifications are blocked before mergeModification is created to avoid
// losing some modification.
mergeDeleteLock.lock();
+ QueryContext context = new QueryContext();
try {
for (String deviceId : backupIntervalFile.getStartTimeMap().keySet()) {
// query one deviceId
@@ -1537,14 +1540,14 @@ public class FileNodeProcessor extends Processor implements IStatistic {
String measurementId = path.getMeasurement();
TSDataType dataType = mManager.getSeriesType(path.getFullPath());
OverflowSeriesDataSource overflowSeriesDataSource = overflowProcessor.queryMerge(deviceId,
- measurementId, dataType, true);
+ measurementId, dataType, true, context);
Filter timeFilter = FilterFactory
.and(TimeFilter.gtEq(backupIntervalFile.getStartTime(deviceId)),
TimeFilter.ltEq(backupIntervalFile.getEndTime(deviceId)));
SingleSeriesExpression seriesFilter = new SingleSeriesExpression(path, timeFilter);
IReader seriesReader = SeriesReaderFactory.getInstance()
.createSeriesReaderForMerge(backupIntervalFile,
- overflowSeriesDataSource, seriesFilter);
+ overflowSeriesDataSource, seriesFilter, context);
try {
if (!seriesReader.hasNext()) {
LOGGER.debug(
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessor.java
index f258b5f..27817fc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessor.java
@@ -49,6 +49,7 @@ import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.utils.FlushStatus;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.OverflowProcessorException;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
@@ -265,7 +266,7 @@ public class OverflowProcessor extends Processor {
* @throws IOException
*/
public OverflowSeriesDataSource query(String deviceId, String measurementId, Filter filter,
- TSDataType dataType)
+ TSDataType dataType, QueryContext context)
throws IOException {
queryFlushLock.lock();
try {
@@ -277,14 +278,14 @@ public class OverflowProcessor extends Processor {
// work file
Pair<String, List<ChunkMetaData>> insertInDiskWork = queryWorkDataInOverflowInsert(deviceId,
measurementId,
- dataType);
+ dataType, context);
if (insertInDiskWork.left != null) {
overflowInsertFileList
.add(0, new OverflowInsertFile(insertInDiskWork.left, insertInDiskWork.right));
}
// merge file
Pair<String, List<ChunkMetaData>> insertInDiskMerge = queryMergeDataInOverflowInsert(deviceId,
- measurementId, dataType);
+ measurementId, dataType, context);
if (insertInDiskMerge.left != null) {
overflowInsertFileList
.add(0, new OverflowInsertFile(insertInDiskMerge.left, insertInDiskMerge.right));
@@ -345,10 +346,11 @@ public class OverflowProcessor extends Processor {
*/
private Pair<String, List<ChunkMetaData>> queryWorkDataInOverflowInsert(String deviceId,
String measurementId,
- TSDataType dataType) {
+ TSDataType dataType,
+ QueryContext context) {
Pair<String, List<ChunkMetaData>> pair = new Pair<String, List<ChunkMetaData>>(
workResource.getInsertFilePath(),
- workResource.getInsertMetadatas(deviceId, measurementId, dataType));
+ workResource.getInsertMetadatas(deviceId, measurementId, dataType, context));
return pair;
}
@@ -361,19 +363,20 @@ public class OverflowProcessor extends Processor {
* @return MergeSeriesDataSource
*/
public MergeSeriesDataSource queryMerge(String deviceId, String measurementId,
- TSDataType dataType) {
+ TSDataType dataType, QueryContext context) {
Pair<String, List<ChunkMetaData>> mergeInsert = queryMergeDataInOverflowInsert(deviceId,
measurementId,
- dataType);
+ dataType, context);
return new MergeSeriesDataSource(new OverflowInsertFile(mergeInsert.left, mergeInsert.right));
}
public OverflowSeriesDataSource queryMerge(String deviceId, String measurementId,
TSDataType dataType,
- boolean isMerge) {
+ boolean isMerge,
+ QueryContext context) {
Pair<String, List<ChunkMetaData>> mergeInsert = queryMergeDataInOverflowInsert(deviceId,
measurementId,
- dataType);
+ dataType, context);
OverflowSeriesDataSource overflowSeriesDataSource = new OverflowSeriesDataSource(
new Path(deviceId + "." + measurementId));
overflowSeriesDataSource.setReadableMemChunk(null);
@@ -393,13 +396,14 @@ public class OverflowProcessor extends Processor {
*/
private Pair<String, List<ChunkMetaData>> queryMergeDataInOverflowInsert(String deviceId,
String measurementId,
- TSDataType dataType) {
+ TSDataType dataType,
+ QueryContext context) {
if (!isMerge) {
return new Pair<String, List<ChunkMetaData>>(null, null);
}
Pair<String, List<ChunkMetaData>> pair = new Pair<String, List<ChunkMetaData>>(
mergeResource.getInsertFilePath(),
- mergeResource.getInsertMetadatas(deviceId, measurementId, dataType));
+ mergeResource.getInsertMetadatas(deviceId, measurementId, dataType, context));
return pair;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowResource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowResource.java
index 2231917..f7e8575 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowResource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowResource.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.version.VersionController;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
@@ -166,7 +167,7 @@ public class OverflowResource {
}
public List<ChunkMetaData> getInsertMetadatas(String deviceId, String measurementId,
- TSDataType dataType) {
+ TSDataType dataType, QueryContext context) {
List<ChunkMetaData> chunkMetaDatas = new ArrayList<>();
if (insertMetadatas.containsKey(deviceId)) {
if (insertMetadatas.get(deviceId).containsKey(measurementId)) {
@@ -179,7 +180,7 @@ public class OverflowResource {
}
}
try {
- List<Modification> modifications = QueryUtils.getPathModifications(modificationFile,
+ List<Modification> modifications = context.getPathModifications(modificationFile,
deviceId + IoTDBConstant.PATH_SEPARATOR + measurementId);
QueryUtils.modifyChunkMetaData(chunkMetaDatas, modifications);
} catch (IOException e) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java b/iotdb/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
new file mode 100644
index 0000000..e05d850
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.context;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+
+/**
+ * QueryContext contains the shared information with in a query.
+ */
+public class QueryContext {
+
+ /**
+ * The outer key is the path of a ModificationFile, the inner key in the name of a timeseries and
+ * the value is the Modifications of a timeseries in this file.
+ */
+ private Map<String, Map<String, List<Modification>>> filePathModCache = new HashMap<>();
+ /**
+ * The key is the path of a ModificationFile and the value is all Modifications in this file.
+ * We use this field because each call of Modification.getModifications() return a copy of the
+ * Modifications, and we do not want it to create multiple copies within a query.
+ */
+ private Map<String, List<Modification>> fileModCache = new HashMap<>();
+
+ /**
+ * Find the modifications of timeseries 'path' in 'modFile'. If they are not in the cache, read
+ * them from 'modFile' and put then into the cache.
+ * @param modFile
+ * @param path
+ * @return
+ * @throws IOException
+ */
+ public List<Modification> getPathModifications(ModificationFile modFile, String path)
+ throws IOException {
+
+ Map<String, List<Modification>> fileModifications =
+ filePathModCache.computeIfAbsent(modFile.getFilePath(), k -> new HashMap<>());
+ List<Modification> pathModifications = fileModifications.get(path);
+
+ if (pathModifications == null) {
+ List<Modification> allModifications = fileModCache.get(modFile.getFilePath());
+ if (allModifications == null) {
+ allModifications = (List<Modification>) modFile.getModifications();
+ fileModCache.put(modFile.getFilePath(), allModifications);
+ }
+ pathModifications = new ArrayList<>();
+ if (allModifications.size() > 0) {
+ List<Modification> finalPathModifications = pathModifications;
+ allModifications.forEach(modification -> {
+ if (modification.getPath().equals(path)) {
+ finalPathModifications.add(modification);
+ }
+ });
+ }
+ fileModifications.put(path, pathModifications);
+ }
+
+ return pathModifications;
+ }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryDataSourceManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryDataSourceManager.java
index 2b1ff81..c2e8505 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryDataSourceManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryDataSourceManager.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.control;
import org.apache.iotdb.db.engine.filenode.FileNodeManager;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
@@ -32,11 +33,12 @@ public class QueryDataSourceManager {
private static FileNodeManager fileNodeManager = FileNodeManager.getInstance();
- public static QueryDataSource getQueryDataSource(long jobId, Path selectedPath)
+ public static QueryDataSource getQueryDataSource(long jobId, Path selectedPath,
+ QueryContext context)
throws FileNodeManagerException {
SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(selectedPath, null);
- QueryDataSource queryDataSource = fileNodeManager.query(singleSeriesExpression);
+ QueryDataSource queryDataSource = fileNodeManager.query(singleSeriesExpression, context);
// add used files to current thread request cached map
OpenedFilePathsManager.getInstance()
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
index 53d73d1..224aed7 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryDataSourceManager;
import org.apache.iotdb.db.query.control.QueryTokenManager;
import org.apache.iotdb.db.query.dataset.EngineDataSetWithTimeGenerator;
@@ -58,8 +59,9 @@ public class EngineExecutorWithTimeGenerator {
* @return QueryDataSet object
* @throws IOException IOException
* @throws FileNodeManagerException FileNodeManagerException
+ * @param context
*/
- public QueryDataSet execute() throws IOException, FileNodeManagerException {
+ public QueryDataSet execute(QueryContext context) throws IOException, FileNodeManagerException {
QueryTokenManager.getInstance()
.beginQueryOfGivenQueryPaths(jobId, queryExpression.getSelectedSeries());
@@ -67,10 +69,10 @@ public class EngineExecutorWithTimeGenerator {
.beginQueryOfGivenExpression(jobId, queryExpression.getExpression());
EngineTimeGenerator timestampGenerator = new EngineTimeGenerator(jobId,
- queryExpression.getExpression());
+ queryExpression.getExpression(), context);
List<EngineReaderByTimeStamp> readersOfSelectedSeries = getReadersOfSelectedPaths(
- queryExpression.getSelectedSeries());
+ queryExpression.getSelectedSeries(), context);
List<TSDataType> dataTypes = new ArrayList<>();
@@ -87,20 +89,22 @@ public class EngineExecutorWithTimeGenerator {
readersOfSelectedSeries);
}
- private List<EngineReaderByTimeStamp> getReadersOfSelectedPaths(List<Path> paths)
+ private List<EngineReaderByTimeStamp> getReadersOfSelectedPaths(List<Path> paths,
+ QueryContext context)
throws IOException, FileNodeManagerException {
List<EngineReaderByTimeStamp> readersOfSelectedSeries = new ArrayList<>();
for (Path path : paths) {
- QueryDataSource queryDataSource = QueryDataSourceManager.getQueryDataSource(jobId, path);
+ QueryDataSource queryDataSource = QueryDataSourceManager.getQueryDataSource(jobId, path,
+ context);
PriorityMergeReaderByTimestamp mergeReaderByTimestamp = new PriorityMergeReaderByTimestamp();
// reader for sequence data
SequenceDataReader tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
- null);
+ null, context);
mergeReaderByTimestamp.addReaderWithPriority(tsFilesReader, 1);
// reader for unSequence data
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
index 231cd86..36eb2b1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryDataSourceManager;
import org.apache.iotdb.db.query.control.QueryTokenManager;
import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator;
@@ -54,8 +55,9 @@ public class EngineExecutorWithoutTimeGenerator {
/**
* with global time filter.
+ * @param context
*/
- public QueryDataSet executeWithGlobalTimeFilter()
+ public QueryDataSet executeWithGlobalTimeFilter(QueryContext context)
throws IOException, FileNodeManagerException, PathErrorException {
Filter timeFilter = ((GlobalTimeExpression) queryExpression.getExpression()).getFilter();
@@ -68,7 +70,8 @@ public class EngineExecutorWithoutTimeGenerator {
for (Path path : queryExpression.getSelectedSeries()) {
- QueryDataSource queryDataSource = QueryDataSourceManager.getQueryDataSource(jobId, path);
+ QueryDataSource queryDataSource = QueryDataSourceManager.getQueryDataSource(jobId, path,
+ context);
// add data type
dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
@@ -77,7 +80,7 @@ public class EngineExecutorWithoutTimeGenerator {
// sequence reader for one sealed tsfile
SequenceDataReader tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
- timeFilter);
+ timeFilter, context);
priorityReader.addReaderWithPriority(tsFilesReader, 1);
// unseq reader for all chunk groups in unSeqFile
@@ -96,7 +99,7 @@ public class EngineExecutorWithoutTimeGenerator {
/**
* without filter.
*/
- public QueryDataSet executeWithoutFilter()
+ public QueryDataSet executeWithoutFilter(QueryContext context)
throws IOException, FileNodeManagerException, PathErrorException {
List<IReader> readersOfSelectedSeries = new ArrayList<>();
@@ -107,7 +110,8 @@ public class EngineExecutorWithoutTimeGenerator {
for (Path path : queryExpression.getSelectedSeries()) {
- QueryDataSource queryDataSource = QueryDataSourceManager.getQueryDataSource(jobId, path);
+ QueryDataSource queryDataSource = QueryDataSourceManager.getQueryDataSource(jobId, path,
+ context);
// add data type
dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
@@ -116,7 +120,7 @@ public class EngineExecutorWithoutTimeGenerator {
// sequence insert data
SequenceDataReader tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
- null);
+ null, context);
priorityReader.addReaderWithPriority(tsFilesReader, 1);
// unseq insert data
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
index 187b47a..c72ba91 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.OpenedFilePathsManager;
import org.apache.iotdb.db.query.control.QueryTokenManager;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
@@ -59,6 +60,8 @@ public class EngineQueryRouter {
QueryTokenManager.getInstance().setJobIdForCurrentRequestThread(jobId);
OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(jobId);
+ QueryContext context = new QueryContext();
+
if (queryExpression.hasQueryFilter()) {
try {
IExpression optimizedExpression = ExpressionOptimizer.getInstance()
@@ -69,12 +72,12 @@ public class EngineQueryRouter {
EngineExecutorWithoutTimeGenerator engineExecutor =
new EngineExecutorWithoutTimeGenerator(
jobId, queryExpression);
- return engineExecutor.executeWithGlobalTimeFilter();
+ return engineExecutor.executeWithGlobalTimeFilter(context);
} else {
EngineExecutorWithTimeGenerator engineExecutor = new EngineExecutorWithTimeGenerator(
jobId,
queryExpression);
- return engineExecutor.execute();
+ return engineExecutor.execute(context);
}
} catch (QueryFilterOptimizationException | PathErrorException e) {
@@ -85,7 +88,7 @@ public class EngineQueryRouter {
EngineExecutorWithoutTimeGenerator engineExecutor = new EngineExecutorWithoutTimeGenerator(
jobId,
queryExpression);
- return engineExecutor.executeWithoutFilter();
+ return engineExecutor.executeWithoutFilter(context);
} catch (PathErrorException e) {
throw new IOException(e);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
index 40ba72d..9c473b2 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.engine.filenode.IntervalFileNode;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.reader.IReader;
import org.apache.iotdb.db.query.reader.mem.MemChunkReaderWithFilter;
@@ -142,7 +143,8 @@ public class SeriesReaderFactory {
*/
public IReader createSeriesReaderForMerge(IntervalFileNode intervalFileNode,
OverflowSeriesDataSource overflowSeriesDataSource,
- SingleSeriesExpression singleSeriesExpression)
+ SingleSeriesExpression singleSeriesExpression,
+ QueryContext context)
throws IOException {
logger.debug("Create seriesReaders for merge. SeriesFilter = {}. TsFilePath = {}",
@@ -153,7 +155,7 @@ public class SeriesReaderFactory {
// Sequence reader
IReader seriesInTsFileReader = createSealedTsFileReaderForMerge(intervalFileNode,
- singleSeriesExpression);
+ singleSeriesExpression, context);
priorityMergeReader.addReaderWithPriority(seriesInTsFileReader, 1);
// UnSequence merge reader
@@ -165,7 +167,8 @@ public class SeriesReaderFactory {
}
private IReader createSealedTsFileReaderForMerge(IntervalFileNode fileNode,
- SingleSeriesExpression singleSeriesExpression)
+ SingleSeriesExpression singleSeriesExpression,
+ QueryContext context)
throws IOException {
TsFileSequenceReader tsFileSequenceReader = FileReaderManager.getInstance()
.get(fileNode.getFilePath(), false);
@@ -174,14 +177,14 @@ public class SeriesReaderFactory {
List<ChunkMetaData> metaDataList = metadataQuerier
.getChunkMetaDataList(singleSeriesExpression.getSeriesPath());
- List<Modification> modifications = QueryUtils.getPathModifications(fileNode.getModFile(),
+ List<Modification> modifications = context.getPathModifications(fileNode.getModFile(),
singleSeriesExpression.getSeriesPath().getFullPath());
QueryUtils.modifyChunkMetaData(metaDataList, modifications);
FileSeriesReader seriesInTsFileReader = new FileSeriesReaderWithFilter(chunkLoader,
metaDataList,
singleSeriesExpression.getFilter());
- return new SealedTsFilesReader(seriesInTsFileReader);
+ return new SealedTsFilesReader(seriesInTsFileReader, context);
}
private static class SeriesReaderFactoryHelper {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java
index 0d4ca1a..f32cc14 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java
@@ -28,6 +28,7 @@ import java.util.List;
import org.apache.iotdb.db.engine.filenode.IntervalFileNode;
import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.reader.IReader;
import org.apache.iotdb.db.utils.QueryUtils;
@@ -54,26 +55,32 @@ public class SealedTsFilesReader implements IReader {
private Filter filter;
private BatchData data;
private boolean hasCachedData;
+ private QueryContext context;
- public SealedTsFilesReader(Path seriesPath, List<IntervalFileNode> sealedTsFiles, Filter filter) {
- this(seriesPath, sealedTsFiles);
+ public SealedTsFilesReader(Path seriesPath, List<IntervalFileNode> sealedTsFiles, Filter filter,
+ QueryContext context) {
+ this(seriesPath, sealedTsFiles, context);
this.filter = filter;
+
}
/**
* init with seriesPath and sealedTsFiles.
*/
- public SealedTsFilesReader(Path seriesPath, List<IntervalFileNode> sealedTsFiles) {
+ public SealedTsFilesReader(Path seriesPath, List<IntervalFileNode> sealedTsFiles,
+ QueryContext context) {
this.seriesPath = seriesPath;
this.sealedTsFiles = sealedTsFiles;
this.usedIntervalFileIndex = 0;
this.seriesReader = null;
this.hasCachedData = false;
+ this.context = context;
}
- public SealedTsFilesReader(FileSeriesReader seriesReader) {
+ public SealedTsFilesReader(FileSeriesReader seriesReader, QueryContext context) {
this.seriesReader = seriesReader;
sealedTsFiles = new ArrayList<>();
+ this.context = context;
}
@Override
@@ -107,7 +114,7 @@ public class SealedTsFilesReader implements IReader {
if (seriesReader == null || !seriesReader.hasNextBatch()) {
IntervalFileNode fileNode = sealedTsFiles.get(usedIntervalFileIndex++);
if (singleTsFileSatisfied(fileNode)) {
- initSingleTsFileReader(fileNode);
+ initSingleTsFileReader(fileNode, context);
} else {
continue;
}
@@ -166,7 +173,8 @@ public class SealedTsFilesReader implements IReader {
return true;
}
- private void initSingleTsFileReader(IntervalFileNode fileNode) throws IOException {
+ private void initSingleTsFileReader(IntervalFileNode fileNode, QueryContext context)
+ throws IOException {
// to avoid too many opened files
TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
@@ -175,7 +183,7 @@ public class SealedTsFilesReader implements IReader {
MetadataQuerierByFileImpl metadataQuerier = new MetadataQuerierByFileImpl(tsFileReader);
List<ChunkMetaData> metaDataList = metadataQuerier.getChunkMetaDataList(seriesPath);
- List<Modification> pathModifications = QueryUtils.getPathModifications(fileNode.getModFile(),
+ List<Modification> pathModifications = context.getPathModifications(fileNode.getModFile(),
seriesPath.getFullPath());
if (pathModifications.size() > 0) {
QueryUtils.modifyChunkMetaData(metaDataList, pathModifications);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java
index c4d549d..5d694c0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.reader.IReader;
import org.apache.iotdb.db.query.reader.mem.MemChunkReaderWithFilter;
import org.apache.iotdb.db.query.reader.mem.MemChunkReaderWithoutFilter;
@@ -45,7 +46,8 @@ public class SequenceDataReader implements IReader {
/**
* init with globalSortedSeriesDataSource and filter.
*/
- public SequenceDataReader(GlobalSortedSeriesDataSource sources, Filter filter)
+ public SequenceDataReader(GlobalSortedSeriesDataSource sources, Filter filter,
+ QueryContext context)
throws IOException {
seriesReaders = new ArrayList<>();
@@ -55,7 +57,8 @@ public class SequenceDataReader implements IReader {
// add reader for sealed TsFiles
if (sources.hasSealedTsFiles()) {
seriesReaders.add(
- new SealedTsFilesReader(sources.getSeriesPath(), sources.getSealedTsFiles(), filter));
+ new SealedTsFilesReader(sources.getSeriesPath(), sources.getSealedTsFiles(), filter,
+ context));
}
// add reader for unSealed TsFile
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
index 7e17ac2..8ee11d4 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
@@ -25,6 +25,7 @@ import static org.apache.iotdb.tsfile.read.expression.ExpressionType.SERIES;
import java.io.IOException;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryDataSourceManager;
import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
import org.apache.iotdb.db.query.reader.IReader;
@@ -55,19 +56,20 @@ public class EngineNodeConstructor {
* @throws IOException IOException
* @throws FileNodeManagerException FileNodeManagerException
*/
- public Node construct(IExpression expression) throws IOException, FileNodeManagerException {
+ public Node construct(IExpression expression, QueryContext context)
+ throws IOException, FileNodeManagerException {
if (expression.getType() == SERIES) {
- return new EngineLeafNode(generateSeriesReader((SingleSeriesExpression) expression));
+ return new EngineLeafNode(generateSeriesReader((SingleSeriesExpression) expression, context));
} else {
Node leftChild;
Node rightChild;
if (expression.getType() == OR) {
- leftChild = this.construct(((IBinaryExpression) expression).getLeft());
- rightChild = this.construct(((IBinaryExpression) expression).getRight());
+ leftChild = this.construct(((IBinaryExpression) expression).getLeft(), context);
+ rightChild = this.construct(((IBinaryExpression) expression).getRight(), context);
return new OrNode(leftChild, rightChild);
} else if (expression.getType() == AND) {
- leftChild = this.construct(((IBinaryExpression) expression).getLeft());
- rightChild = this.construct(((IBinaryExpression) expression).getRight());
+ leftChild = this.construct(((IBinaryExpression) expression).getLeft(), context);
+ rightChild = this.construct(((IBinaryExpression) expression).getRight(), context);
return new AndNode(leftChild, rightChild);
} else {
throw new UnSupportedDataTypeException(
@@ -76,11 +78,12 @@ public class EngineNodeConstructor {
}
}
- private IReader generateSeriesReader(SingleSeriesExpression singleSeriesExpression)
+ private IReader generateSeriesReader(SingleSeriesExpression singleSeriesExpression,
+ QueryContext context)
throws IOException, FileNodeManagerException {
QueryDataSource queryDataSource = QueryDataSourceManager.getQueryDataSource(jobId,
- singleSeriesExpression.getSeriesPath());
+ singleSeriesExpression.getSeriesPath(), context);
Filter filter = singleSeriesExpression.getFilter();
@@ -88,7 +91,7 @@ public class EngineNodeConstructor {
// reader for all sequence data
SequenceDataReader tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
- filter);
+ filter, context);
priorityReader.addReaderWithPriority(tsFilesReader, 1);
// reader for all unSequence data
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java
index a3374df..6ad11e6 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.timegenerator;
import java.io.IOException;
import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
@@ -38,16 +39,16 @@ public class EngineTimeGenerator implements TimeGenerator {
/**
* Constructor of EngineTimeGenerator.
*/
- public EngineTimeGenerator(long jobId, IExpression expression)
+ public EngineTimeGenerator(long jobId, IExpression expression, QueryContext context)
throws IOException, FileNodeManagerException {
this.jobId = jobId;
this.expression = expression;
- initNode();
+ initNode(context);
}
- private void initNode() throws IOException, FileNodeManagerException {
+ private void initNode(QueryContext context) throws IOException, FileNodeManagerException {
EngineNodeConstructor engineNodeConstructor = new EngineNodeConstructor(jobId);
- this.operatorNode = engineNodeConstructor.construct(expression);
+ this.operatorNode = engineNodeConstructor.construct(expression, context);
}
@Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
index 98c9df6..bd3b633 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
@@ -65,20 +65,4 @@ public class QueryUtils {
// remove chunks that are completely deleted
chunkMetaData.removeIf(metaData -> metaData.getDeletedAt() >= metaData.getEndTime());
}
-
- public static List<Modification> getPathModifications(ModificationFile modFile, String path)
- throws IOException {
- // TODO: use query context to avoid multiple calls of getModifications()
- Collection<Modification> allModifications = modFile.getModifications();
- List<Modification> pathModifications = new ArrayList<>();
- if (allModifications.size() > 0) {
- allModifications.forEach(modification -> {
- if (modification.getPath().equals(path)) {
- pathModifications.add(modification);
- }
- });
- }
- return pathModifications;
- }
-
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
index 8b833d0..0f645be 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.MetadataArgsErrorException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -96,7 +97,8 @@ public class DeletionFileNodeTest {
SingleSeriesExpression expression = new SingleSeriesExpression(new Path(processorName,
measurements[5]), null);
- QueryDataSource dataSource = FileNodeManager.getInstance().query(expression);
+ QueryContext context = new QueryContext();
+ QueryDataSource dataSource = FileNodeManager.getInstance().query(expression, context);
Iterator<TimeValuePair> timeValuePairs =
dataSource.getSeqDataSource().getReadableChunk().getIterator();
int count = 0;
@@ -173,7 +175,8 @@ public class DeletionFileNodeTest {
SingleSeriesExpression expression = new SingleSeriesExpression(new Path(processorName,
measurements[5]), null);
- QueryDataSource dataSource = FileNodeManager.getInstance().query(expression);
+ QueryContext context = new QueryContext();
+ QueryDataSource dataSource = FileNodeManager.getInstance().query(expression, context);
Iterator<TimeValuePair> timeValuePairs =
dataSource.getOverflowSeriesDataSource().getReadableMemChunk().getIterator();
int count = 0;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessorTest.java
index 887c5b5..cf14ee7 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessorTest.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.engine.querycontext.MergeSeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
import org.apache.iotdb.db.engine.version.SysTimeVersionController;
import org.apache.iotdb.db.exception.OverflowProcessorException;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.junit.After;
@@ -82,9 +83,10 @@ public class OverflowProcessorTest {
"0").exists());
assertEquals(false, processor.isFlush());
assertEquals(false, processor.isMerge());
+ QueryContext context = new QueryContext();
// write update data
OverflowSeriesDataSource overflowSeriesDataSource = processor.query(OverflowTestUtils.deviceId1,
- OverflowTestUtils.measurementId1, null, OverflowTestUtils.dataType1);
+ OverflowTestUtils.measurementId1, null, OverflowTestUtils.dataType1, context);
assertEquals(OverflowTestUtils.dataType1, overflowSeriesDataSource.getDataType());
Assert.assertEquals(true, overflowSeriesDataSource.getReadableMemChunk().isEmpty());
assertEquals(1, overflowSeriesDataSource.getOverflowInsertFileList().size());
@@ -99,7 +101,7 @@ public class OverflowProcessorTest {
assertEquals(false, processor.isFlush());
overflowSeriesDataSource = processor
.query(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1, null,
- OverflowTestUtils.dataType1);
+ OverflowTestUtils.dataType1, context);
assertEquals(OverflowTestUtils.dataType1, overflowSeriesDataSource.getDataType());
Assert.assertEquals(false, overflowSeriesDataSource.getReadableMemChunk().isEmpty());
assertEquals(1, overflowSeriesDataSource.getOverflowInsertFileList().size());
@@ -114,7 +116,7 @@ public class OverflowProcessorTest {
processor.close();
overflowSeriesDataSource = processor
.query(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1, null,
- OverflowTestUtils.dataType1);
+ OverflowTestUtils.dataType1, context);
Assert.assertEquals(true, overflowSeriesDataSource.getReadableMemChunk().isEmpty());
assertEquals(1, overflowSeriesDataSource.getOverflowInsertFileList().size());
assertEquals(1,
@@ -122,7 +124,7 @@ public class OverflowProcessorTest {
processor.switchWorkToMerge();
overflowSeriesDataSource = processor
.query(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1, null,
- OverflowTestUtils.dataType1);
+ OverflowTestUtils.dataType1, context);
assertEquals(2, overflowSeriesDataSource.getOverflowInsertFileList().size());
assertEquals(1,
overflowSeriesDataSource.getOverflowInsertFileList().get(0).getChunkMetaDataList().size());
@@ -131,12 +133,12 @@ public class OverflowProcessorTest {
assertEquals(true, processor.isMerge());
assertEquals(false, processor.canBeClosed());
MergeSeriesDataSource mergeSeriesDataSource = processor.queryMerge(OverflowTestUtils.deviceId1,
- OverflowTestUtils.measurementId1, OverflowTestUtils.dataType1);
+ OverflowTestUtils.measurementId1, OverflowTestUtils.dataType1, context);
assertEquals(1, mergeSeriesDataSource.getInsertFile().getChunkMetaDataList().size());
processor.switchMergeToWork();
overflowSeriesDataSource = processor
.query(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1, null,
- OverflowTestUtils.dataType1);
+ OverflowTestUtils.dataType1, context);
processor.close();
processor.clear();
}
@@ -147,9 +149,10 @@ public class OverflowProcessorTest {
SysTimeVersionController.INSTANCE);
OverflowTestUtils.produceInsertData(processor);
processor.close();
+ QueryContext context = new QueryContext();
// test query
OverflowSeriesDataSource overflowSeriesDataSource = processor.query(OverflowTestUtils.deviceId1,
- OverflowTestUtils.measurementId1, null, OverflowTestUtils.dataType2);
+ OverflowTestUtils.measurementId1, null, OverflowTestUtils.dataType2, context);
Assert.assertEquals(true, overflowSeriesDataSource.getReadableMemChunk().isEmpty());
assertEquals(0,
overflowSeriesDataSource.getOverflowInsertFileList().get(0).getChunkMetaDataList().size());
@@ -166,11 +169,12 @@ public class OverflowProcessorTest {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
+ QueryContext context = new QueryContext();
processor.query(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1, null,
- OverflowTestUtils.dataType1);
+ OverflowTestUtils.dataType1, context);
OverflowTestUtils.produceInsertData(processor);
processor.query(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1, null,
- OverflowTestUtils.dataType2);
+ OverflowTestUtils.dataType2, context);
processor.close();
processor.clear();
}
@@ -188,9 +192,10 @@ public class OverflowProcessorTest {
// recovery query
assertEquals(false, overflowProcessor.isMerge());
overflowProcessor.switchWorkToMerge();
+ QueryContext context = new QueryContext();
OverflowSeriesDataSource overflowSeriesDataSource = overflowProcessor
.query(OverflowTestUtils.deviceId1,
- OverflowTestUtils.measurementId1, null, OverflowTestUtils.dataType1);
+ OverflowTestUtils.measurementId1, null, OverflowTestUtils.dataType1, context);
Assert.assertEquals(true, overflowSeriesDataSource.getReadableMemChunk().isEmpty());
assertEquals(2, overflowSeriesDataSource.getOverflowInsertFileList().size());
overflowProcessor.switchMergeToWork();
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowResourceTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowResourceTest.java
index 6a7d26d..2fac518 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowResourceTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowResourceTest.java
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.util.List;
import org.apache.iotdb.db.engine.version.SysTimeVersionController;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.junit.After;
@@ -63,14 +64,15 @@ public class OverflowResourceTest {
@Test
public void testOverflowInsert() throws IOException {
OverflowTestUtils.produceInsertData(support);
+ QueryContext context = new QueryContext();
work.flush(OverflowTestUtils.getFileSchema(), support.getMemTabale(), null, "processorName");
List<ChunkMetaData> chunkMetaDatas = work.getInsertMetadatas(OverflowTestUtils.deviceId1,
- OverflowTestUtils.measurementId1, OverflowTestUtils.dataType2);
+ OverflowTestUtils.measurementId1, OverflowTestUtils.dataType2, context);
assertEquals(0, chunkMetaDatas.size());
work.appendMetadatas();
chunkMetaDatas = work
.getInsertMetadatas(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
- OverflowTestUtils.dataType1);
+ OverflowTestUtils.dataType1, context);
assertEquals(1, chunkMetaDatas.size());
ChunkMetaData chunkMetaData = chunkMetaDatas.get(0);
assertEquals(OverflowTestUtils.dataType1, chunkMetaData.getTsDataType());
@@ -86,7 +88,7 @@ public class OverflowResourceTest {
work = new OverflowResource(filePath, dataPath, SysTimeVersionController.INSTANCE);
chunkMetaDatas = work
.getInsertMetadatas(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
- OverflowTestUtils.dataType1);
+ OverflowTestUtils.dataType1, context);
assertEquals(1, chunkMetaDatas.size());
chunkMetaData = chunkMetaDatas.get(0);
assertEquals(OverflowTestUtils.dataType1, chunkMetaData.getTsDataType());
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java
index c7ca523..ef5ba95 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java
@@ -28,6 +28,7 @@ import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.OpenedFilePathsManager;
import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
import org.apache.iotdb.db.service.IoTDB;
@@ -203,7 +204,9 @@ public class IoTDBEngineTimeGeneratorIT {
SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(pd0s0,
FilterFactory.and(valueGtEq, timeGt));
OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(0);
- EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, singleSeriesExpression);
+ QueryContext context = new QueryContext();
+ EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, singleSeriesExpression,
+ context);
int cnt = 0;
while (timeGenerator.hasNext()) {
@@ -227,7 +230,9 @@ public class IoTDBEngineTimeGeneratorIT {
OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(0);
IExpression singleSeriesExpression = new SingleSeriesExpression(pd1s0, valueGtEq);
- EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, singleSeriesExpression);
+ QueryContext context = new QueryContext();
+ EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, singleSeriesExpression,
+ context);
int cnt = 0;
while (timeGenerator.hasNext()) {
@@ -260,7 +265,8 @@ public class IoTDBEngineTimeGeneratorIT {
.and(singleSeriesExpression1, singleSeriesExpression2);
OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(0);
- EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, andExpression);
+ QueryContext context = new QueryContext();
+ EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, andExpression, context);
int cnt = 0;
while (timeGenerator.hasNext()) {
long time = timeGenerator.next();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java
index afe14d8..a2b4c1a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java
@@ -57,7 +57,7 @@ public class ChunkMetaData {
/**
* All data with timestamp <= deletedAt are considered deleted.
*/
- private long deletedAt;
+ private long deletedAt = -1;
private TsDigest valuesStatistics;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
index 9ac1ef6..f4ad125 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
@@ -28,7 +28,7 @@ public class Chunk {
private ChunkHeader chunkHeader;
private ByteBuffer chunkData;
- private long deletedAt;
+ private long deletedAt = -1;
public Chunk(ChunkHeader header, ByteBuffer buffer) {
this.chunkHeader = header;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
index d531645..ec5d1aa 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
@@ -47,7 +47,7 @@ public abstract class ChunkReader {
private Filter filter;
private BatchData data;
- protected long deletedAt;
+ protected long deletedAt = -1;
public ChunkReader(Chunk chunk) {
this(chunk, null);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index 82944ac..0166849 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -48,7 +48,7 @@ public class PageReader {
private Filter filter = null;
- private long deletedAt;
+ private long deletedAt = -1;
public PageReader(ByteBuffer pageData, TSDataType dataType, Decoder valueDecoder,
Decoder timeDecoder,