You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/01/02 05:11:41 UTC
[iotdb] branch rel/0.12 updated: [To rel/0.12] [IOTDB-2221] Accelerate query by reducing the number of files in QueryDataSource (#4650)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 75894dc [To rel/0.12] [IOTDB-2221] Accelerate query by reducing the number of files in QueryDataSource (#4650)
75894dc is described below
commit 75894dc0ce5d7e5198c7274766592e24115078b6
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Sun Jan 2 13:11:00 2022 +0800
[To rel/0.12] [IOTDB-2221] Accelerate query by reducing the number of files in QueryDataSource (#4650)
---
.../cluster/query/ClusterUDTFQueryExecutor.java | 4 +-
.../iotdb/cluster/query/LocalQueryExecutor.java | 10 +-
.../cluster/server/member/DataGroupMemberTest.java | 18 ++-
.../org/apache/iotdb/db/engine/StorageEngine.java | 31 +-----
.../engine/storagegroup/StorageGroupProcessor.java | 20 +++-
.../db/engine/storagegroup/TsFileResource.java | 46 ++++++--
.../org/apache/iotdb/db/metadata/MManager.java | 20 +++-
.../db/query/control/QueryResourceManager.java | 21 +++-
.../query/dataset/groupby/GroupByFillDataSet.java | 21 ++--
.../groupby/GroupByWithValueFilterDataSet.java | 20 ++--
.../groupby/GroupByWithoutValueFilterDataSet.java | 20 ++--
.../dataset/groupby/LocalGroupByExecutor.java | 5 +-
.../db/query/executor/AggregationExecutor.java | 35 ++++--
.../iotdb/db/query/executor/FillQueryExecutor.java | 15 ++-
.../iotdb/db/query/executor/LastQueryExecutor.java | 16 ++-
.../db/query/executor/RawDataQueryExecutor.java | 38 +++++--
.../iotdb/db/query/executor/UDTFQueryExecutor.java | 4 +-
.../iotdb/db/query/reader/series/SeriesReader.java | 28 +----
.../query/timegenerator/ServerTimeGenerator.java | 54 +++++++--
.../iotdb/db/engine/merge/ConcurrentMergeTest.java | 1 +
.../engine/modification/DeletionFileNodeTest.java | 40 ++++---
.../storagegroup/StorageGroupProcessorTest.java | 9 ++
.../iotdb/db/engine/storagegroup/TTLTest.java | 3 +
.../IoTDBQueryWithComplexValueFilterIT.java | 122 +++++++++++++++++++++
.../read/query/timegenerator/TimeGenerator.java | 3 +
.../query/timegenerator/TsFileTimeGenerator.java | 6 +
.../tsfile/read/reader/FakedTimeGenerator.java | 6 +
27 files changed, 459 insertions(+), 157 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterUDTFQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterUDTFQueryExecutor.java
index 764437e..ad05be8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterUDTFQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterUDTFQueryExecutor.java
@@ -67,7 +67,7 @@ public class ClusterUDTFQueryExecutor extends ClusterDataQueryExecutor {
new ArrayList<>(udtfPlan.getDeduplicatedPaths()),
timestampGenerator.hasOrNode());
List<IReaderByTimestamp> readersOfSelectedSeries =
- initSeriesReaderByTimestamp(context, udtfPlan, cached);
+ initSeriesReaderByTimestamp(context, udtfPlan, cached, timestampGenerator.getTimeFilter());
return new UDTFAlignByTimeDataSet(
context,
udtfPlan,
@@ -98,7 +98,7 @@ public class ClusterUDTFQueryExecutor extends ClusterDataQueryExecutor {
new ArrayList<>(udtfPlan.getDeduplicatedPaths()),
timestampGenerator.hasOrNode());
List<IReaderByTimestamp> readersOfSelectedSeries =
- initSeriesReaderByTimestamp(context, udtfPlan, cached);
+ initSeriesReaderByTimestamp(context, udtfPlan, cached, timestampGenerator.getTimeFilter());
return new UDTFNonAlignDataSet(
context,
udtfPlan,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index a2fc36a..67f2a0b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -60,6 +60,7 @@ import org.apache.iotdb.db.query.factory.AggregateResultFactory;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.SerializeUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -773,7 +774,14 @@ public class LocalQueryExecutor {
aggregationTypeOrdinals,
queryContext,
ascending);
- if (!executor.isEmpty()) {
+ boolean isEmpty;
+ try {
+ isEmpty = executor.isEmpty();
+ } catch (IOException e) {
+ logger.error("Something wrong happened", e);
+ throw new QueryProcessException(e, TSStatusCode.INTERNAL_SERVER_ERROR.ordinal());
+ }
+ if (!isEmpty) {
long executorId = queryManager.registerGroupByExecutor(executor);
logger.debug(
"{}: Build a GroupByExecutor of {} for {}, executorId: {}",
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index ceff919..0bdf5d3 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -1164,18 +1164,16 @@ public class DataGroupMemberTest extends BaseMember {
request.timeFilterBytes.position(0);
new DataAsyncService(dataGroupMember).getGroupByExecutor(request, handler);
executorId = resultRef.get();
- // TODO: This test is uncompleted because of shared QueryDataSource (IOTDB-2101)
- // assertEquals(-1L, (long) executorId);
+ assertEquals(-1L, (long) executorId);
// fetch result
- // aggrResultRef = new AtomicReference<>();
- // aggrResultHandler = new GenericHandler<>(TestUtils.getNode(0), aggrResultRef);
- // new DataAsyncService(dataGroupMember)
- // .getGroupByResult(TestUtils.getNode(30), executorId, 0, 20,
- // aggrResultHandler);
- //
- // byteBuffers = aggrResultRef.get();
- // assertNull(byteBuffers);
+ aggrResultRef = new AtomicReference<>();
+ aggrResultHandler = new GenericHandler<>(TestUtils.getNode(0), aggrResultRef);
+ new DataAsyncService(dataGroupMember)
+ .getGroupByResult(TestUtils.getNode(30), executorId, 0, 20, aggrResultHandler);
+
+ byteBuffers = aggrResultRef.get();
+ assertNull(byteBuffers);
} finally {
dataGroupMember.closeLogManager();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 33d48ca..a83e2fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -45,8 +45,6 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.IoTDB;
@@ -56,7 +54,6 @@ import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.FilePathUtils;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -71,7 +68,6 @@ import java.util.Arrays;
import java.util.Comparator;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -1041,26 +1037,8 @@ public class StorageEngine implements IService {
}
/** get all merge lock of the storage group processor related to the query */
- public List<StorageGroupProcessor> mergeLock(List<PartialPath> pathList)
- throws StorageEngineException {
- Set<StorageGroupProcessor> set = new HashSet<>();
- for (PartialPath path : pathList) {
- set.add(getProcessor(path.getDevicePath()));
- }
- List<StorageGroupProcessor> list =
- set.stream()
- .sorted(Comparator.comparing(StorageGroupProcessor::getVirtualStorageGroupId))
- .collect(Collectors.toList());
- list.forEach(StorageGroupProcessor::readLock);
- return list;
- }
-
- /**
- * get all merge lock of the storage group processor related to the query and init QueryDataSource
- */
- public List<StorageGroupProcessor> mergeLockAndInitQueryDataSource(
- List<PartialPath> pathList, QueryContext context, Filter timeFilter)
- throws StorageEngineException, QueryProcessException {
+ public Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>> mergeLock(
+ List<PartialPath> pathList) throws StorageEngineException, QueryProcessException {
Map<StorageGroupProcessor, List<PartialPath>> map = new HashMap<>();
for (PartialPath path : pathList) {
map.computeIfAbsent(getProcessor(path.getDevicePath()), key -> new ArrayList<>()).add(path);
@@ -1071,10 +1049,7 @@ public class StorageEngine implements IService {
.collect(Collectors.toList());
list.forEach(StorageGroupProcessor::readLock);
- // init QueryDataSource
- QueryResourceManager.getInstance().initQueryDataSource(map, context, timeFilter);
-
- return list;
+ return new Pair<>(list, map);
}
/** unlock all merge lock of the storage group processor related to the query */
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index d0b21fd..42d20ed 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1572,8 +1572,19 @@ public class StorageGroupProcessor {
}
}
+ /**
+ * build query data source by searching all tsfile which fit in query filter
+ *
+ * @param pathList data paths
+ * @param context query context
+ * @param timeFilter time filter
+ * @param singleDeviceId selected deviceId (not null only when all the selected series are under
+ * the same device)
+ * @return query data source
+ */
public QueryDataSource query(
List<PartialPath> pathList,
+ String singleDeviceId,
QueryContext context,
QueryFileManager filePathsManager,
Filter timeFilter)
@@ -1585,6 +1596,7 @@ public class StorageGroupProcessor {
tsFileManagement.getTsFileList(true),
upgradeSeqFileList,
pathList,
+ singleDeviceId,
context,
timeFilter,
true);
@@ -1593,6 +1605,7 @@ public class StorageGroupProcessor {
tsFileManagement.getTsFileList(false),
upgradeUnseqFileList,
pathList,
+ singleDeviceId,
context,
timeFilter,
false);
@@ -1642,6 +1655,7 @@ public class StorageGroupProcessor {
Collection<TsFileResource> tsFileResources,
List<TsFileResource> upgradeTsFileResources,
List<PartialPath> pathList,
+ String singleDeviceId,
QueryContext context,
Filter timeFilter,
boolean isSeq)
@@ -1664,7 +1678,8 @@ public class StorageGroupProcessor {
// for upgrade files and old files must be closed
for (TsFileResource tsFileResource : upgradeTsFileResources) {
- if (!tsFileResource.isSatisfied(timeFilter, isSeq, dataTTL, context.isDebug())) {
+ if (!tsFileResource.isSatisfied(
+ singleDeviceId, timeFilter, isSeq, dataTTL, context.isDebug())) {
continue;
}
closeQueryLock.readLock().lock();
@@ -1676,7 +1691,8 @@ public class StorageGroupProcessor {
}
for (TsFileResource tsFileResource : tsFileResources) {
- if (!tsFileResource.isSatisfied(timeFilter, isSeq, dataTTL, context.isDebug())) {
+ if (!tsFileResource.isSatisfied(
+ singleDeviceId, timeFilter, isSeq, dataTTL, context.isDebug())) {
continue;
}
closeQueryLock.readLock().lock();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 5ba3272..2ce4cbf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -608,8 +608,43 @@ public class TsFileResource {
return timeIndex.stillLives(timeLowerBound);
}
+ public boolean isSatisfied(
+ String deviceId, Filter timeFilter, boolean isSeq, long ttl, boolean debug) {
+ if (deviceId == null) {
+ return isSatisfied(timeFilter, isSeq, ttl, debug);
+ }
+
+ if (!getDevices().contains(deviceId)) {
+ if (debug) {
+ DEBUG_LOGGER.info(
+ "Path: {} file {} is not satisfied because of no device!", deviceId, file);
+ }
+ return false;
+ }
+
+ long startTime = getStartTime(deviceId);
+ long endTime = closed || !isSeq ? getEndTime(deviceId) : Long.MAX_VALUE;
+
+ if (!isAlive(endTime, ttl)) {
+ if (debug) {
+ DEBUG_LOGGER.info("file {} is not satisfied because of ttl!", file);
+ }
+ return false;
+ }
+
+ if (timeFilter != null) {
+ boolean res = timeFilter.satisfyStartEndTime(startTime, endTime);
+ if (debug && !res) {
+ DEBUG_LOGGER.info(
+ "Path: {} file {} is not satisfied because of time filter!", deviceId, fsFactory);
+ }
+ return res;
+ }
+ return true;
+ }
+
/** @return true if the TsFile lives beyond TTL */
- public boolean isSatisfied(Filter timeFilter, boolean isSeq, long ttl, boolean debug) {
+ private boolean isSatisfied(Filter timeFilter, boolean isSeq, long ttl, boolean debug) {
long startTime = getFileStartTime();
long endTime = closed || !isSeq ? getFileEndTime() : Long.MAX_VALUE;
@@ -630,14 +665,9 @@ public class TsFileResource {
return true;
}
- /** @return true if the device is contained in the TsFile and it lives beyond TTL */
+ /** @return true if the device is contained in the TsFile */
public boolean isSatisfied(
- String deviceId,
- Filter timeFilter,
- TsFileFilter fileFilter,
- boolean isSeq,
- long ttl,
- boolean debug) {
+ String deviceId, Filter timeFilter, TsFileFilter fileFilter, boolean isSeq, boolean debug) {
if (fileFilter != null && fileFilter.fileNotSatisfy(this)) {
if (debug) {
DEBUG_LOGGER.info(
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 051ac26..089511d 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.*;
import org.apache.iotdb.db.qp.physical.sys.*;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
import org.apache.iotdb.db.rescon.MemTableManager;
@@ -821,13 +822,20 @@ public class MManager {
if (plan.isOrderByHeat()) {
List<StorageGroupProcessor> list;
try {
- list =
- StorageEngine.getInstance()
- .mergeLockAndInitQueryDataSource(
- allMatchedNodes.stream().map(MNode::getPartialPath).collect(toList()),
- context,
- null);
+ Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
+ lockListAndProcessorToSeriesMapPair =
+ StorageEngine.getInstance()
+ .mergeLock(
+ allMatchedNodes.stream().map(MNode::getPartialPath).collect(toList()));
+ list = lockListAndProcessorToSeriesMapPair.left;
+ Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+ lockListAndProcessorToSeriesMapPair.right;
+
try {
+ // init QueryDataSource Cache
+ QueryResourceManager.getInstance()
+ .initQueryDataSourceCache(processorToSeriesMap, context, null);
+
allMatchedNodes =
allMatchedNodes.stream()
.sorted(
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index 858d7ef..0c2ddc5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -41,6 +41,7 @@ import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
/**
* QueryResourceManager manages resource (file streams) used by each query job, and assign Ids to
@@ -110,7 +111,7 @@ public class QueryResourceManager {
* @param processorToSeriesMap Key: processor of the virtual storage group. Value: selected series
* under the virtual storage group
*/
- public void initQueryDataSource(
+ public void initQueryDataSourceCache(
Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap,
QueryContext context,
Filter timeFilter)
@@ -120,11 +121,21 @@ public class QueryResourceManager {
StorageGroupProcessor processor = entry.getKey();
List<PartialPath> pathList = entry.getValue();
+ // when all the selected series are under the same device, the QueryDataSource will be
+ // filtered according to timeIndex
+ Set<String> selectedDeviceIdSet =
+ pathList.stream().map(PartialPath::getDevice).collect(Collectors.toSet());
+
long queryId = context.getQueryId();
String storageGroupPath = processor.getStorageGroupPath();
QueryDataSource cachedQueryDataSource =
- processor.query(pathList, context, filePathsManager, timeFilter);
+ processor.query(
+ pathList,
+ selectedDeviceIdSet.size() == 1 ? selectedDeviceIdSet.iterator().next() : null,
+ context,
+ filePathsManager,
+ timeFilter);
cachedQueryDataSourcesMap
.computeIfAbsent(queryId, k -> new HashMap<>())
.put(storageGroupPath, cachedQueryDataSource);
@@ -150,7 +161,11 @@ public class QueryResourceManager {
StorageEngine.getInstance().getProcessor(selectedPath.getDevicePath());
cachedQueryDataSource =
processor.query(
- Collections.singletonList(selectedPath), context, filePathsManager, timeFilter);
+ Collections.singletonList(selectedPath),
+ selectedPath.getDevice(),
+ context,
+ filePathsManager,
+ timeFilter);
}
// construct QueryDataSource for selectedPath
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
index 6640adb..23e6d0f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.executor.LastQueryExecutor;
import org.apache.iotdb.db.query.executor.fill.IFill;
import org.apache.iotdb.db.query.executor.fill.PreviousFill;
@@ -123,13 +124,19 @@ public class GroupByFillDataSet extends QueryDataSet {
FilterFactory.and(
TimeFilter.gtEq(lowerBound), TimeFilter.ltEq(groupByEngineDataSet.getStartTime()));
- List<StorageGroupProcessor> list =
- StorageEngine.getInstance()
- .mergeLockAndInitQueryDataSource(
- paths.stream().map(path -> (PartialPath) path).collect(Collectors.toList()),
- context,
- timeFilter);
+ Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
+ lockListAndProcessorToSeriesMapPair =
+ StorageEngine.getInstance()
+ .mergeLock(
+ paths.stream().map(path -> (PartialPath) path).collect(Collectors.toList()));
+ List<StorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+ Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+ lockListAndProcessorToSeriesMapPair.right;
+
try {
+ // init QueryDataSource Cache
+ QueryResourceManager.getInstance()
+ .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
for (int i = 0; i < paths.size(); i++) {
PreviousFill fill = previousFillExecutors[i];
firstNotNullTV[i] = fill.getFillResult();
@@ -142,7 +149,7 @@ public class GroupByFillDataSet extends QueryDataSet {
}
}
} finally {
- StorageEngine.getInstance().mergeUnLock(list);
+ StorageEngine.getInstance().mergeUnLock(lockList);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index 4711cbb..339ba09 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -49,6 +49,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
@@ -92,20 +93,25 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
TimeFilter.gtEq(groupByTimePlan.getStartTime()),
TimeFilter.lt(groupByTimePlan.getEndTime()));
- List<StorageGroupProcessor> list =
- StorageEngine.getInstance()
- .mergeLockAndInitQueryDataSource(
- paths.stream().map(p -> (PartialPath) p).collect(Collectors.toList()),
- context,
- timeFilter);
+ Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
+ lockListAndProcessorToSeriesMapPair =
+ StorageEngine.getInstance()
+ .mergeLock(paths.stream().map(p -> (PartialPath) p).collect(Collectors.toList()));
+ List<StorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+ Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+ lockListAndProcessorToSeriesMapPair.right;
+
try {
+ // init QueryDataSource Cache
+ QueryResourceManager.getInstance()
+ .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
for (int i = 0; i < paths.size(); i++) {
PartialPath path = (PartialPath) paths.get(i);
allDataReaderList.add(
getReaderByTime(path, groupByTimePlan, dataTypes.get(i), context, null));
}
} finally {
- StorageEngine.getInstance().mergeUnLock(list);
+ StorageEngine.getInstance().mergeUnLock(lockList);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index 424915f..5628666 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -89,13 +90,18 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
throw new QueryProcessException("TimeFilter cannot be null in GroupBy query.");
}
- List<StorageGroupProcessor> list =
- StorageEngine.getInstance()
- .mergeLockAndInitQueryDataSource(
- paths.stream().map(p -> (PartialPath) p).collect(Collectors.toList()),
- context,
- timeFilter);
+ Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
+ lockListAndProcessorToSeriesMapPair =
+ StorageEngine.getInstance()
+ .mergeLock(paths.stream().map(p -> (PartialPath) p).collect(Collectors.toList()));
+ List<StorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+ Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+ lockListAndProcessorToSeriesMapPair.right;
+
try {
+ // init QueryDataSource Cache
+ QueryResourceManager.getInstance()
+ .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
// init resultIndexes, group result indexes by path
for (int i = 0; i < paths.size(); i++) {
PartialPath path = (PartialPath) paths.get(i);
@@ -120,7 +126,7 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
pathExecutors.get(path).addAggregateResult(aggrResult);
}
} finally {
- StorageEngine.getInstance().mergeUnLock(list);
+ StorageEngine.getInstance().mergeUnLock(lockList);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
index b8cec3d..f31acd2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
@@ -88,9 +88,8 @@ public class LocalGroupByExecutor implements GroupByExecutor {
this.ascending = ascending;
}
- public boolean isEmpty() {
- return queryDataSource.getSeqResources().isEmpty()
- && queryDataSource.getUnseqResources().isEmpty();
+ public boolean isEmpty() throws IOException {
+ return !reader.hasNextFile();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index d7339a8..6ce3328 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -51,6 +51,7 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import org.apache.iotdb.tsfile.utils.Pair;
import java.io.IOException;
import java.util.ArrayList;
@@ -103,11 +104,18 @@ public class AggregationExecutor {
groupAggregationsBySeries(selectedSeries);
AggregateResult[] aggregateResultList = new AggregateResult[selectedSeries.size()];
// TODO-Cluster: group the paths by storage group to reduce communications
- List<StorageGroupProcessor> list =
- StorageEngine.getInstance()
- .mergeLockAndInitQueryDataSource(
- new ArrayList<>(pathToAggrIndexesMap.keySet()), context, timeFilter);
+ Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
+ lockListAndProcessorToSeriesMapPair =
+ StorageEngine.getInstance().mergeLock(new ArrayList<>(pathToAggrIndexesMap.keySet()));
+ List<StorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+ Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+ lockListAndProcessorToSeriesMapPair.right;
+
try {
+ // init QueryDataSource Cache
+ QueryResourceManager.getInstance()
+ .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
+
for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
aggregateOneSeries(
entry,
@@ -117,7 +125,7 @@ public class AggregationExecutor {
context);
}
} finally {
- StorageEngine.getInstance().mergeUnLock(list);
+ StorageEngine.getInstance().mergeUnLock(lockList);
}
return constructDataSet(Arrays.asList(aggregateResultList), aggregationPlan);
@@ -346,9 +354,20 @@ public class AggregationExecutor {
Map<PartialPath, List<Integer>> pathToAggrIndexesMap =
groupAggregationsBySeries(selectedSeries);
Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap = new HashMap<>();
- List<StorageGroupProcessor> list =
- StorageEngine.getInstance().mergeLockAndInitQueryDataSource(selectedSeries, context, null);
+
+ Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
+ lockListAndProcessorToSeriesMapPair =
+ StorageEngine.getInstance().mergeLock(new ArrayList<>(pathToAggrIndexesMap.keySet()));
+ List<StorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+ Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+ lockListAndProcessorToSeriesMapPair.right;
+
try {
+ // init QueryDataSource Cache
+ QueryResourceManager.getInstance()
+ .initQueryDataSourceCache(
+ processorToSeriesMap, context, timestampGenerator.getTimeFilter());
+
for (int i = 0; i < selectedSeries.size(); i++) {
PartialPath path = selectedSeries.get(i);
List<Integer> indexes = pathToAggrIndexesMap.remove(path);
@@ -359,7 +378,7 @@ public class AggregationExecutor {
}
}
} finally {
- StorageEngine.getInstance().mergeUnLock(list);
+ StorageEngine.getInstance().mergeUnLock(lockList);
}
List<AggregateResult> aggregateResults = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
index e43c094..662267e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
import javax.activation.UnsupportedDataTypeException;
@@ -78,10 +79,16 @@ public class FillQueryExecutor {
throws StorageEngineException, QueryProcessException, IOException {
RowRecord record = new RowRecord(queryTime);
- List<StorageGroupProcessor> list =
- StorageEngine.getInstance()
- .mergeLockAndInitQueryDataSource(selectedSeries, context, contructTimeFilter());
+ Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
+ lockListAndProcessorToSeriesMapPair = StorageEngine.getInstance().mergeLock(selectedSeries);
+ List<StorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+ Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+ lockListAndProcessorToSeriesMapPair.right;
+
try {
+ // init QueryDataSource Cache
+ QueryResourceManager.getInstance()
+ .initQueryDataSourceCache(processorToSeriesMap, context, contructTimeFilter());
List<TimeValuePair> timeValuePairs = getTimeValuePairs(context);
long defaultFillInterval = IoTDBDescriptor.getInstance().getConfig().getDefaultFillInterval();
for (int i = 0; i < selectedSeries.size(); i++) {
@@ -128,7 +135,7 @@ public class FillQueryExecutor {
}
}
} finally {
- StorageEngine.getInstance().mergeUnLock(list);
+ StorageEngine.getInstance().mergeUnLock(lockList);
}
SingleDataSet dataSet = new SingleDataSet(selectedSeries, dataTypes);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
index 72b8cf7..f34f9f6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
@@ -172,10 +172,18 @@ public class LastQueryExecutor {
// Acquire query resources for the rest series paths
List<LastPointReader> readerList = new ArrayList<>();
- List<StorageGroupProcessor> list =
- StorageEngine.getInstance()
- .mergeLockAndInitQueryDataSource(nonCachedPaths, context, filter);
+
+ Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
+ lockListAndProcessorToSeriesMapPair = StorageEngine.getInstance().mergeLock(nonCachedPaths);
+ List<StorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+ Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+ lockListAndProcessorToSeriesMapPair.right;
+
try {
+ // init QueryDataSource Cache
+ QueryResourceManager.getInstance()
+ .initQueryDataSourceCache(processorToSeriesMap, context, filter);
+
for (int i = 0; i < nonCachedPaths.size(); i++) {
QueryDataSource dataSource =
QueryResourceManager.getInstance()
@@ -192,7 +200,7 @@ public class LastQueryExecutor {
readerList.add(lastReader);
}
} finally {
- StorageEngine.getInstance().mergeUnLock(list);
+ StorageEngine.getInstance().mergeUnLock(lockList);
}
// Compute Last result for the rest series paths by scanning Tsfiles
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
index 03f8e37..92372be 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
@@ -40,10 +40,12 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import org.apache.iotdb.tsfile.utils.Pair;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths;
@@ -98,10 +100,18 @@ public class RawDataQueryExecutor {
}
List<ManagedSeriesReader> readersOfSelectedSeries = new ArrayList<>();
- List<StorageGroupProcessor> list =
- StorageEngine.getInstance()
- .mergeLockAndInitQueryDataSource(queryPlan.getDeduplicatedPaths(), context, timeFilter);
+
+ Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
+ lockListAndProcessorToSeriesMapPair =
+ StorageEngine.getInstance().mergeLock(queryPlan.getDeduplicatedPaths());
+ List<StorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+ Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+ lockListAndProcessorToSeriesMapPair.right;
+
try {
+ // init QueryDataSource cache
+ QueryResourceManager.getInstance()
+ .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
PartialPath path = queryPlan.getDeduplicatedPaths().get(i);
TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i);
@@ -124,7 +134,7 @@ public class RawDataQueryExecutor {
readersOfSelectedSeries.add(reader);
}
} finally {
- StorageEngine.getInstance().mergeUnLock(list);
+ StorageEngine.getInstance().mergeUnLock(lockList);
}
return readersOfSelectedSeries;
}
@@ -149,7 +159,7 @@ public class RawDataQueryExecutor {
new ArrayList<>(queryPlan.getDeduplicatedPaths()),
timestampGenerator.hasOrNode());
List<IReaderByTimestamp> readersOfSelectedSeries =
- initSeriesReaderByTimestamp(context, queryPlan, cached);
+ initSeriesReaderByTimestamp(context, queryPlan, cached, timestampGenerator.getTimeFilter());
return new RawQueryDataSetWithValueFilter(
queryPlan.getDeduplicatedPaths(),
queryPlan.getDeduplicatedDataTypes(),
@@ -160,13 +170,21 @@ public class RawDataQueryExecutor {
}
protected List<IReaderByTimestamp> initSeriesReaderByTimestamp(
- QueryContext context, RawDataQueryPlan queryPlan, List<Boolean> cached)
+ QueryContext context, RawDataQueryPlan queryPlan, List<Boolean> cached, Filter timeFilter)
throws QueryProcessException, StorageEngineException {
List<IReaderByTimestamp> readersOfSelectedSeries = new ArrayList<>();
- List<StorageGroupProcessor> list =
- StorageEngine.getInstance()
- .mergeLockAndInitQueryDataSource(queryPlan.getDeduplicatedPaths(), context, null);
+
+ Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
+ lockListAndProcessorToSeriesMapPair =
+ StorageEngine.getInstance().mergeLock(queryPlan.getDeduplicatedPaths());
+ List<StorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+ Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+ lockListAndProcessorToSeriesMapPair.right;
+
try {
+ // init QueryDataSource Cache
+ QueryResourceManager.getInstance()
+ .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
if (cached.get(i)) {
readersOfSelectedSeries.add(null);
@@ -182,7 +200,7 @@ public class RawDataQueryExecutor {
readersOfSelectedSeries.add(seriesReaderByTimestamp);
}
} finally {
- StorageEngine.getInstance().mergeUnLock(list);
+ StorageEngine.getInstance().mergeUnLock(lockList);
}
return readersOfSelectedSeries;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/UDTFQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/UDTFQueryExecutor.java
index b0ce81a..6c3156b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/UDTFQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/UDTFQueryExecutor.java
@@ -65,7 +65,7 @@ public class UDTFQueryExecutor extends RawDataQueryExecutor {
new ArrayList<>(udtfPlan.getDeduplicatedPaths()),
timestampGenerator.hasOrNode());
List<IReaderByTimestamp> readersOfSelectedSeries =
- initSeriesReaderByTimestamp(context, udtfPlan, cached);
+ initSeriesReaderByTimestamp(context, udtfPlan, cached, timestampGenerator.getTimeFilter());
return new UDTFAlignByTimeDataSet(
context,
udtfPlan,
@@ -96,7 +96,7 @@ public class UDTFQueryExecutor extends RawDataQueryExecutor {
new ArrayList<>(udtfPlan.getDeduplicatedPaths()),
timestampGenerator.hasOrNode());
List<IReaderByTimestamp> readersOfSelectedSeries =
- initSeriesReaderByTimestamp(context, udtfPlan, cached);
+ initSeriesReaderByTimestamp(context, udtfPlan, cached, timestampGenerator.getTimeFilter());
return new UDTFNonAlignDataSet(
context,
udtfPlan,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index b76d64b..62e9518 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -1157,12 +1157,7 @@ public class SeriesReader {
TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
if (tsFileResource != null
&& tsFileResource.isSatisfied(
- seriesPath.getDevice(),
- timeFilter,
- fileFilter,
- true,
- dataSource.getDataTTL(),
- context.isDebug())) {
+ seriesPath.getDevice(), timeFilter, fileFilter, true, context.isDebug())) {
break;
}
curSeqFileIndex--;
@@ -1176,12 +1171,7 @@ public class SeriesReader {
TsFileResource tsFileResource = dataSource.getUnseqResourceByIndex(curUnseqFileIndex);
if (tsFileResource != null
&& tsFileResource.isSatisfied(
- seriesPath.getDevice(),
- timeFilter,
- fileFilter,
- false,
- dataSource.getDataTTL(),
- context.isDebug())) {
+ seriesPath.getDevice(), timeFilter, fileFilter, false, context.isDebug())) {
break;
}
curUnseqFileIndex++;
@@ -1290,12 +1280,7 @@ public class SeriesReader {
TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
if (tsFileResource != null
&& tsFileResource.isSatisfied(
- seriesPath.getDevice(),
- timeFilter,
- fileFilter,
- true,
- dataSource.getDataTTL(),
- context.isDebug())) {
+ seriesPath.getDevice(), timeFilter, fileFilter, true, context.isDebug())) {
break;
}
curSeqFileIndex++;
@@ -1309,12 +1294,7 @@ public class SeriesReader {
TsFileResource tsFileResource = dataSource.getUnseqResourceByIndex(curUnseqFileIndex);
if (tsFileResource != null
&& tsFileResource.isSatisfied(
- seriesPath.getDevice(),
- timeFilter,
- fileFilter,
- false,
- dataSource.getDataTTL(),
- context.isDebug())) {
+ seriesPath.getDevice(), timeFilter, fileFilter, false, context.isDebug())) {
break;
}
curUnseqFileIndex++;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java b/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
index 8bba620..8e4acc4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
@@ -36,14 +36,17 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
import org.apache.iotdb.tsfile.read.filter.factory.FilterType;
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+import org.apache.iotdb.tsfile.utils.Pair;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
/**
* A timestamp generator for query with filter. e.g. For query clause "select s1, s2 from root where
@@ -54,6 +57,8 @@ public class ServerTimeGenerator extends TimeGenerator {
protected QueryContext context;
protected RawDataQueryPlan queryPlan;
+ private Filter timeFilter;
+
public ServerTimeGenerator(QueryContext context) {
this.context = context;
}
@@ -73,22 +78,53 @@ public class ServerTimeGenerator extends TimeGenerator {
public void serverConstructNode(IExpression expression)
throws IOException, StorageEngineException, QueryProcessException {
List<PartialPath> pathList = new ArrayList<>();
- getPartialPathFromExpression(expression, pathList);
- List<StorageGroupProcessor> list =
- StorageEngine.getInstance().mergeLockAndInitQueryDataSource(pathList, context, null);
+ timeFilter = getPathListAndConstructTimeFilterFromExpression(expression, pathList);
+
+ Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
+ lockListAndProcessorToSeriesMapPair = StorageEngine.getInstance().mergeLock(pathList);
+ List<StorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+ Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+ lockListAndProcessorToSeriesMapPair.right;
+
try {
+ // init QueryDataSource Cache
+ QueryResourceManager.getInstance()
+ .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
+
operatorNode = construct(expression);
} finally {
- StorageEngine.getInstance().mergeUnLock(list);
+ StorageEngine.getInstance().mergeUnLock(lockList);
}
}
- private void getPartialPathFromExpression(IExpression expression, List<PartialPath> pathList) {
+ private Filter getPathListAndConstructTimeFilterFromExpression(
+ IExpression expression, List<PartialPath> pathList) {
if (expression.getType() == ExpressionType.SERIES) {
pathList.add((PartialPath) ((SingleSeriesExpression) expression).getSeriesPath());
+ return getTimeFilter(((SingleSeriesExpression) expression).getFilter());
} else {
- getPartialPathFromExpression(((IBinaryExpression) expression).getLeft(), pathList);
- getPartialPathFromExpression(((IBinaryExpression) expression).getRight(), pathList);
+ Filter leftTimeFilter =
+ getTimeFilter(
+ getPathListAndConstructTimeFilterFromExpression(
+ ((IBinaryExpression) expression).getLeft(), pathList));
+ Filter rightTimeFilter =
+ getTimeFilter(
+ getPathListAndConstructTimeFilterFromExpression(
+ ((IBinaryExpression) expression).getRight(), pathList));
+
+ if (expression instanceof AndFilter) {
+ if (leftTimeFilter != null && rightTimeFilter != null) {
+ return FilterFactory.and(leftTimeFilter, rightTimeFilter);
+ } else if (leftTimeFilter != null) {
+ return leftTimeFilter;
+ } else return rightTimeFilter;
+ } else {
+ if (leftTimeFilter != null && rightTimeFilter != null) {
+ return FilterFactory.or(leftTimeFilter, rightTimeFilter);
+ } else {
+ return null;
+ }
+ }
}
}
@@ -148,4 +184,8 @@ public class ServerTimeGenerator extends TimeGenerator {
protected boolean isAscending() {
return queryPlan.isAscending();
}
+
+ public Filter getTimeFilter() {
+ return timeFilter;
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/ConcurrentMergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/ConcurrentMergeTest.java
index 4e369fc..13ee263 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/ConcurrentMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/ConcurrentMergeTest.java
@@ -117,6 +117,7 @@ public class ConcurrentMergeTest {
QueryDataSource queryDataSource =
processor.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
context,
null,
null);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
index 1f48367..7886b43 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DoubleDataPoint;
@@ -55,6 +56,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT;
import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_JOB_ID;
@@ -125,13 +127,19 @@ public class DeletionFileNodeTest {
SingleSeriesExpression expression =
new SingleSeriesExpression(
new PartialPath(processorName + TsFileConstant.PATH_SEPARATOR + measurements[5]), null);
- List<StorageGroupProcessor> list =
- StorageEngine.getInstance()
- .mergeLockAndInitQueryDataSource(
- Collections.singletonList((PartialPath) expression.getSeriesPath()),
- TEST_QUERY_CONTEXT,
- null);
+
+ Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
+ lockListAndProcessorToSeriesMapPair =
+ StorageEngine.getInstance()
+ .mergeLock(Collections.singletonList((PartialPath) expression.getSeriesPath()));
+ List<StorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+ Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+ lockListAndProcessorToSeriesMapPair.right;
+
try {
+ // init QueryDataSource Cache
+ QueryResourceManager.getInstance()
+ .initQueryDataSourceCache(processorToSeriesMap, TEST_QUERY_CONTEXT, null);
QueryDataSource dataSource =
QueryResourceManager.getInstance()
.getQueryDataSource(
@@ -150,7 +158,7 @@ public class DeletionFileNodeTest {
assertEquals(50, count);
QueryResourceManager.getInstance().endQuery(TEST_QUERY_JOB_ID);
} finally {
- StorageEngine.getInstance().mergeUnLock(list);
+ StorageEngine.getInstance().mergeUnLock(lockList);
}
}
@@ -253,14 +261,18 @@ public class DeletionFileNodeTest {
new SingleSeriesExpression(
new PartialPath(processorName + TsFileConstant.PATH_SEPARATOR + measurements[5]), null);
- List<StorageGroupProcessor> list =
- StorageEngine.getInstance()
- .mergeLockAndInitQueryDataSource(
- Collections.singletonList((PartialPath) expression.getSeriesPath()),
- TEST_QUERY_CONTEXT,
- null);
+ Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
+ lockListAndProcessorToSeriesMapPair =
+ StorageEngine.getInstance()
+ .mergeLock(Collections.singletonList((PartialPath) expression.getSeriesPath()));
+ List<StorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+ Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+ lockListAndProcessorToSeriesMapPair.right;
try {
+ // init QueryDataSource Cache
+ QueryResourceManager.getInstance()
+ .initQueryDataSourceCache(processorToSeriesMap, TEST_QUERY_CONTEXT, null);
QueryDataSource dataSource =
QueryResourceManager.getInstance()
.getQueryDataSource(
@@ -280,7 +292,7 @@ public class DeletionFileNodeTest {
QueryResourceManager.getInstance().endQuery(TEST_QUERY_JOB_ID);
} finally {
- StorageEngine.getInstance().mergeUnLock(list);
+ StorageEngine.getInstance().mergeUnLock(lockList);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index b632143..7214a7b 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -168,6 +168,7 @@ public class StorageGroupProcessorTest {
QueryDataSource queryDataSource =
processor.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
context,
null,
null);
@@ -201,6 +202,7 @@ public class StorageGroupProcessorTest {
QueryDataSource queryDataSource =
processor.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
context,
null,
null);
@@ -266,6 +268,7 @@ public class StorageGroupProcessorTest {
QueryDataSource queryDataSource =
processor.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
context,
null,
null);
@@ -300,6 +303,7 @@ public class StorageGroupProcessorTest {
QueryDataSource queryDataSource =
processor.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
context,
null,
null);
@@ -343,6 +347,7 @@ public class StorageGroupProcessorTest {
QueryDataSource queryDataSource =
processor.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
context,
null,
null);
@@ -428,6 +433,7 @@ public class StorageGroupProcessorTest {
QueryDataSource queryDataSource =
processor.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
context,
null,
null);
@@ -513,6 +519,7 @@ public class StorageGroupProcessorTest {
QueryDataSource queryDataSource =
processor.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
context,
null,
null);
@@ -598,6 +605,7 @@ public class StorageGroupProcessorTest {
QueryDataSource queryDataSource =
processor.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
context,
null,
null);
@@ -640,6 +648,7 @@ public class StorageGroupProcessorTest {
QueryDataSource queryDataSource =
processor.query(
Collections.singletonList(new PartialPath(deviceId, measurementId)),
+ deviceId,
context,
null,
null);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index 651f290..f906c03 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -224,6 +224,7 @@ public class TTLTest {
QueryDataSource dataSource =
storageGroupProcessor.query(
Collections.singletonList(new PartialPath(sg1, s1)),
+ sg1,
EnvironmentUtils.TEST_QUERY_CONTEXT,
null,
null);
@@ -238,6 +239,7 @@ public class TTLTest {
dataSource =
storageGroupProcessor.query(
Collections.singletonList(new PartialPath(sg1, s1)),
+ sg1,
EnvironmentUtils.TEST_QUERY_CONTEXT,
null,
null);
@@ -276,6 +278,7 @@ public class TTLTest {
dataSource =
storageGroupProcessor.query(
Collections.singletonList(new PartialPath(sg1, s1)),
+ sg1,
EnvironmentUtils.TEST_QUERY_CONTEXT,
null,
null);
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryWithComplexValueFilterIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryWithComplexValueFilterIT.java
new file mode 100644
index 0000000..4185775
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryWithComplexValueFilterIT.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static org.junit.Assert.fail;
+
+public class IoTDBQueryWithComplexValueFilterIT {
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.envSetUp();
+ prepareData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void testRawQuery1() throws ClassNotFoundException {
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select s1 from root.sg1.d1 where (time > 400 and s1 <= 600) or (s2 > 300 and time <= 500)");
+ Assert.assertTrue(hasResultSet);
+
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+
+ while (resultSet.next()) {
+ cnt++;
+ }
+
+ Assert.assertEquals(300, cnt);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRawQuery2() throws ClassNotFoundException {
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "select s1 from root.sg1.d1 where (time > 400 and s1 <= 600) and (s2 > 300 and time <= 500)");
+ Assert.assertTrue(hasResultSet);
+
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+
+ while (resultSet.next()) {
+ cnt++;
+ }
+
+ Assert.assertEquals(100, cnt);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static void prepareData() throws ClassNotFoundException {
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ statement.execute("create storage group root.sg1");
+ statement.execute("create timeseries root.sg1.d1.s1 with datatype=INT32,encoding=PLAIN");
+ statement.execute("create timeseries root.sg1.d1.s2 with datatype=DOUBLE,encoding=PLAIN");
+ for (int i = 0; i < 1000; i++) {
+ statement.execute(
+ String.format(
+ "insert into root.sg1.d1(time,s1,s2) values(%d,%d,%f)", i, i, (double) i));
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java
index 0da8b94..344e564 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.tsfile.read.expression.ExpressionType;
import org.apache.iotdb.tsfile.read.expression.IBinaryExpression;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.timegenerator.node.AndNode;
import org.apache.iotdb.tsfile.read.query.timegenerator.node.LeafNode;
import org.apache.iotdb.tsfile.read.query.timegenerator.node.Node;
@@ -132,4 +133,6 @@ public abstract class TimeGenerator {
}
protected abstract boolean isAscending();
+
+ public abstract Filter getTimeFilter();
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TsFileTimeGenerator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TsFileTimeGenerator.java
index c68fc74..d38e7f7 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TsFileTimeGenerator.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TsFileTimeGenerator.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
import org.apache.iotdb.tsfile.read.controller.IMetadataQuerier;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader;
@@ -55,4 +56,9 @@ public class TsFileTimeGenerator extends TimeGenerator {
protected boolean isAscending() {
return true;
}
+
+ @Override
+ public Filter getTimeFilter() {
+ return null;
+ }
}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/FakedTimeGenerator.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/FakedTimeGenerator.java
index 85b30c0..288c53b 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/FakedTimeGenerator.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/FakedTimeGenerator.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
@@ -61,6 +62,11 @@ public class FakedTimeGenerator extends TimeGenerator {
return true;
}
+ @Override
+ public Filter getTimeFilter() {
+ return null;
+ }
+
@Test
public void testTimeGenerator() throws IOException {
FakedTimeGenerator fakedTimeGenerator = new FakedTimeGenerator();