You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/01/29 02:22:08 UTC
[iotdb] 01/01: Refine the lock granularity of the query
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch QueryImprovement
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f9a678e4bf55d6aee8741f86352bf09566cf5551
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Sat Jan 29 10:21:01 2022 +0800
Refine the lock granularity of the query
---
.../org/apache/iotdb/db/metadata/MManager.java | 18 ++---
.../query/dataset/groupby/GroupByFillDataSet.java | 23 +++---
.../groupby/GroupByWithValueFilterDataSet.java | 11 +--
.../groupby/GroupByWithoutValueFilterDataSet.java | 47 ++++++------
.../db/query/executor/AggregationExecutor.java | 38 ++++-----
.../iotdb/db/query/executor/FillQueryExecutor.java | 89 +++++++++++-----------
.../iotdb/db/query/executor/LastQueryExecutor.java | 32 ++++----
.../db/query/executor/RawDataQueryExecutor.java | 72 ++++++++---------
.../query/timegenerator/ServerTimeGenerator.java | 3 +-
9 files changed, 169 insertions(+), 164 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 4c66193..ad55efd 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -843,18 +843,18 @@ public class MManager {
// init QueryDataSource Cache
QueryResourceManager.getInstance()
.initQueryDataSourceCache(processorToSeriesMap, context, null);
-
- allMatchedNodes =
- allMatchedNodes.stream()
- .sorted(
- Comparator.comparingLong(
- (MeasurementMNode mNode) -> MTree.getLastTimeStamp(mNode, context))
- .reversed()
- .thenComparing(MNode::getFullPath))
- .collect(toList());
} finally {
StorageEngine.getInstance().mergeUnLock(list);
}
+
+ allMatchedNodes =
+ allMatchedNodes.stream()
+ .sorted(
+ Comparator.comparingLong(
+ (MeasurementMNode mNode) -> MTree.getLastTimeStamp(mNode, context))
+ .reversed()
+ .thenComparing(MNode::getFullPath))
+ .collect(toList());
} catch (StorageEngineException | QueryProcessException e) {
throw new MetadataException(e);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
index 23e6d0f..cf5c14c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
@@ -137,20 +137,21 @@ public class GroupByFillDataSet extends QueryDataSet {
// init QueryDataSource Cache
QueryResourceManager.getInstance()
.initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
- for (int i = 0; i < paths.size(); i++) {
- PreviousFill fill = previousFillExecutors[i];
- firstNotNullTV[i] = fill.getFillResult();
- TimeValuePair timeValuePair = firstNotNullTV[i];
- previousValue[i] = null;
- previousTime[i] = Long.MAX_VALUE;
- if (ascending && timeValuePair != null && timeValuePair.getValue() != null) {
- previousValue[i] = timeValuePair.getValue().getValue();
- previousTime[i] = timeValuePair.getTimestamp();
- }
- }
} finally {
StorageEngine.getInstance().mergeUnLock(lockList);
}
+
+ for (int i = 0; i < paths.size(); i++) {
+ PreviousFill fill = previousFillExecutors[i];
+ firstNotNullTV[i] = fill.getFillResult();
+ TimeValuePair timeValuePair = firstNotNullTV[i];
+ previousValue[i] = null;
+ previousTime[i] = Long.MAX_VALUE;
+ if (ascending && timeValuePair != null && timeValuePair.getValue() != null) {
+ previousValue[i] = timeValuePair.getValue().getValue();
+ previousTime[i] = timeValuePair.getTimestamp();
+ }
+ }
}
private void initLastTimeArray(QueryContext context, GroupByTimeFillPlan groupByFillPlan)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index 339ba09..0eaa004 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -105,14 +105,15 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
// init QueryDataSource Cache
QueryResourceManager.getInstance()
.initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
- for (int i = 0; i < paths.size(); i++) {
- PartialPath path = (PartialPath) paths.get(i);
- allDataReaderList.add(
- getReaderByTime(path, groupByTimePlan, dataTypes.get(i), context, null));
- }
} finally {
StorageEngine.getInstance().mergeUnLock(lockList);
}
+
+ for (int i = 0; i < paths.size(); i++) {
+ PartialPath path = (PartialPath) paths.get(i);
+ allDataReaderList.add(
+ getReaderByTime(path, groupByTimePlan, dataTypes.get(i), context, null));
+ }
}
protected TimeGenerator getTimeGenerator(QueryContext context, RawDataQueryPlan queryPlan)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index 5628666..65480a4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -102,32 +102,33 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
// init QueryDataSource Cache
QueryResourceManager.getInstance()
.initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
- // init resultIndexes, group result indexes by path
- for (int i = 0; i < paths.size(); i++) {
- PartialPath path = (PartialPath) paths.get(i);
- if (!pathExecutors.containsKey(path)) {
- // init GroupByExecutor
- pathExecutors.put(
- path,
- getGroupByExecutor(
- path,
- groupByTimePlan.getAllMeasurementsInDevice(path.getDevice()),
- dataTypes.get(i),
- context,
- timeFilter.copy(),
- null,
- groupByTimePlan.isAscending()));
- resultIndexes.put(path, new ArrayList<>());
- }
- resultIndexes.get(path).add(i);
- AggregateResult aggrResult =
- AggregateResultFactory.getAggrResultByName(
- groupByTimePlan.getDeduplicatedAggregations().get(i), dataTypes.get(i), ascending);
- pathExecutors.get(path).addAggregateResult(aggrResult);
- }
} finally {
StorageEngine.getInstance().mergeUnLock(lockList);
}
+
+ // init resultIndexes, group result indexes by path
+ for (int i = 0; i < paths.size(); i++) {
+ PartialPath path = (PartialPath) paths.get(i);
+ if (!pathExecutors.containsKey(path)) {
+ // init GroupByExecutor
+ pathExecutors.put(
+ path,
+ getGroupByExecutor(
+ path,
+ groupByTimePlan.getAllMeasurementsInDevice(path.getDevice()),
+ dataTypes.get(i),
+ context,
+ timeFilter.copy(),
+ null,
+ groupByTimePlan.isAscending()));
+ resultIndexes.put(path, new ArrayList<>());
+ }
+ resultIndexes.get(path).add(i);
+ AggregateResult aggrResult =
+ AggregateResultFactory.getAggrResultByName(
+ groupByTimePlan.getDeduplicatedAggregations().get(i), dataTypes.get(i), ascending);
+ pathExecutors.get(path).addAggregateResult(aggrResult);
+ }
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 6ce3328..9bd6f9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -115,19 +115,19 @@ public class AggregationExecutor {
// init QueryDataSource Cache
QueryResourceManager.getInstance()
.initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
-
- for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
- aggregateOneSeries(
- entry,
- aggregateResultList,
- aggregationPlan.getAllMeasurementsInDevice(entry.getKey().getDevice()),
- timeFilter,
- context);
- }
} finally {
StorageEngine.getInstance().mergeUnLock(lockList);
}
+ for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
+ aggregateOneSeries(
+ entry,
+ aggregateResultList,
+ aggregationPlan.getAllMeasurementsInDevice(entry.getKey().getDevice()),
+ timeFilter,
+ context);
+ }
+
return constructDataSet(Arrays.asList(aggregateResultList), aggregationPlan);
}
@@ -367,20 +367,20 @@ public class AggregationExecutor {
QueryResourceManager.getInstance()
.initQueryDataSourceCache(
processorToSeriesMap, context, timestampGenerator.getTimeFilter());
-
- for (int i = 0; i < selectedSeries.size(); i++) {
- PartialPath path = selectedSeries.get(i);
- List<Integer> indexes = pathToAggrIndexesMap.remove(path);
- if (indexes != null) {
- IReaderByTimestamp seriesReaderByTimestamp =
- getReaderByTime(path, queryPlan, dataTypes.get(i), context);
- readerToAggrIndexesMap.put(seriesReaderByTimestamp, indexes);
- }
- }
} finally {
StorageEngine.getInstance().mergeUnLock(lockList);
}
+ for (int i = 0; i < selectedSeries.size(); i++) {
+ PartialPath path = selectedSeries.get(i);
+ List<Integer> indexes = pathToAggrIndexesMap.remove(path);
+ if (indexes != null) {
+ IReaderByTimestamp seriesReaderByTimestamp =
+ getReaderByTime(path, queryPlan, dataTypes.get(i), context);
+ readerToAggrIndexesMap.put(seriesReaderByTimestamp, indexes);
+ }
+ }
+
List<AggregateResult> aggregateResults = new ArrayList<>();
for (int i = 0; i < selectedSeries.size(); i++) {
AggregateResult result =
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
index 662267e..20681d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
@@ -89,53 +89,54 @@ public class FillQueryExecutor {
// init QueryDataSource Cache
QueryResourceManager.getInstance()
.initQueryDataSourceCache(processorToSeriesMap, context, contructTimeFilter());
- List<TimeValuePair> timeValuePairs = getTimeValuePairs(context);
- long defaultFillInterval = IoTDBDescriptor.getInstance().getConfig().getDefaultFillInterval();
- for (int i = 0; i < selectedSeries.size(); i++) {
- PartialPath path = selectedSeries.get(i);
- TSDataType dataType = dataTypes.get(i);
-
- if (timeValuePairs.get(i) != null) {
- // No need to fill
- record.addField(timeValuePairs.get(i).getValue().getValue(), dataType);
- continue;
- }
+ } finally {
+ StorageEngine.getInstance().mergeUnLock(lockList);
+ }
- IFill fill;
- if (!typeIFillMap.containsKey(dataType)) {
- switch (dataType) {
- case INT32:
- case INT64:
- case FLOAT:
- case DOUBLE:
- case BOOLEAN:
- case TEXT:
- fill = new PreviousFill(dataType, queryTime, defaultFillInterval);
- break;
- default:
- throw new UnsupportedDataTypeException("unsupported data type " + dataType);
- }
- } else {
- fill = typeIFillMap.get(dataType).copy();
- }
- fill =
- configureFill(
- fill,
- path,
- dataType,
- queryTime,
- plan.getAllMeasurementsInDevice(path.getDevice()),
- context);
-
- TimeValuePair timeValuePair = fill.getFillResult();
- if (timeValuePair == null || timeValuePair.getValue() == null) {
- record.addField(null);
- } else {
- record.addField(timeValuePair.getValue().getValue(), dataType);
+ List<TimeValuePair> timeValuePairs = getTimeValuePairs(context);
+ long defaultFillInterval = IoTDBDescriptor.getInstance().getConfig().getDefaultFillInterval();
+ for (int i = 0; i < selectedSeries.size(); i++) {
+ PartialPath path = selectedSeries.get(i);
+ TSDataType dataType = dataTypes.get(i);
+
+ if (timeValuePairs.get(i) != null) {
+ // No need to fill
+ record.addField(timeValuePairs.get(i).getValue().getValue(), dataType);
+ continue;
+ }
+
+ IFill fill;
+ if (!typeIFillMap.containsKey(dataType)) {
+ switch (dataType) {
+ case INT32:
+ case INT64:
+ case FLOAT:
+ case DOUBLE:
+ case BOOLEAN:
+ case TEXT:
+ fill = new PreviousFill(dataType, queryTime, defaultFillInterval);
+ break;
+ default:
+ throw new UnsupportedDataTypeException("unsupported data type " + dataType);
}
+ } else {
+ fill = typeIFillMap.get(dataType).copy();
+ }
+ fill =
+ configureFill(
+ fill,
+ path,
+ dataType,
+ queryTime,
+ plan.getAllMeasurementsInDevice(path.getDevice()),
+ context);
+
+ TimeValuePair timeValuePair = fill.getFillResult();
+ if (timeValuePair == null || timeValuePair.getValue() == null) {
+ record.addField(null);
+ } else {
+ record.addField(timeValuePair.getValue().getValue(), dataType);
}
- } finally {
- StorageEngine.getInstance().mergeUnLock(lockList);
}
SingleDataSet dataSet = new SingleDataSet(selectedSeries, dataTypes);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
index f34f9f6..dfa9ed0 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
@@ -183,26 +183,26 @@ public class LastQueryExecutor {
// init QueryDataSource Cache
QueryResourceManager.getInstance()
.initQueryDataSourceCache(processorToSeriesMap, context, filter);
-
- for (int i = 0; i < nonCachedPaths.size(); i++) {
- QueryDataSource dataSource =
- QueryResourceManager.getInstance()
- .getQueryDataSource(nonCachedPaths.get(i), context, filter);
- LastPointReader lastReader =
- new LastPointReader(
- nonCachedPaths.get(i),
- nonCachedDataTypes.get(i),
- deviceMeasurementsMap.get(nonCachedPaths.get(i).getDevice()),
- context,
- dataSource,
- Long.MAX_VALUE,
- filter);
- readerList.add(lastReader);
- }
} finally {
StorageEngine.getInstance().mergeUnLock(lockList);
}
+ for (int i = 0; i < nonCachedPaths.size(); i++) {
+ QueryDataSource dataSource =
+ QueryResourceManager.getInstance()
+ .getQueryDataSource(nonCachedPaths.get(i), context, filter);
+ LastPointReader lastReader =
+ new LastPointReader(
+ nonCachedPaths.get(i),
+ nonCachedDataTypes.get(i),
+ deviceMeasurementsMap.get(nonCachedPaths.get(i).getDevice()),
+ context,
+ dataSource,
+ Long.MAX_VALUE,
+ filter);
+ readerList.add(lastReader);
+ }
+
// Compute Last result for the rest series paths by scanning Tsfiles
int index = 0;
for (int i = 0; i < resultContainer.size(); i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
index 92372be..0dc5ac8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
@@ -112,30 +112,31 @@ public class RawDataQueryExecutor {
// init QueryDataSource cache
QueryResourceManager.getInstance()
.initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
- for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
- PartialPath path = queryPlan.getDeduplicatedPaths().get(i);
- TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i);
-
- QueryDataSource queryDataSource =
- QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
- timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
-
- ManagedSeriesReader reader =
- new SeriesRawDataBatchReader(
- path,
- queryPlan.getAllMeasurementsInDevice(path.getDevice()),
- dataType,
- context,
- queryDataSource,
- timeFilter,
- null,
- null,
- queryPlan.isAscending());
- readersOfSelectedSeries.add(reader);
- }
} finally {
StorageEngine.getInstance().mergeUnLock(lockList);
}
+
+ for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
+ PartialPath path = queryPlan.getDeduplicatedPaths().get(i);
+ TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i);
+
+ QueryDataSource queryDataSource =
+ QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
+ timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+ ManagedSeriesReader reader =
+ new SeriesRawDataBatchReader(
+ path,
+ queryPlan.getAllMeasurementsInDevice(path.getDevice()),
+ dataType,
+ context,
+ queryDataSource,
+ timeFilter,
+ null,
+ null,
+ queryPlan.isAscending());
+ readersOfSelectedSeries.add(reader);
+ }
return readersOfSelectedSeries;
}
@@ -185,23 +186,24 @@ public class RawDataQueryExecutor {
// init QueryDataSource Cache
QueryResourceManager.getInstance()
.initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
- for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
- if (cached.get(i)) {
- readersOfSelectedSeries.add(null);
- continue;
- }
- PartialPath path = queryPlan.getDeduplicatedPaths().get(i);
- IReaderByTimestamp seriesReaderByTimestamp =
- getReaderByTimestamp(
- path,
- queryPlan.getAllMeasurementsInDevice(path.getDevice()),
- queryPlan.getDeduplicatedDataTypes().get(i),
- context);
- readersOfSelectedSeries.add(seriesReaderByTimestamp);
- }
} finally {
StorageEngine.getInstance().mergeUnLock(lockList);
}
+
+ for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
+ if (cached.get(i)) {
+ readersOfSelectedSeries.add(null);
+ continue;
+ }
+ PartialPath path = queryPlan.getDeduplicatedPaths().get(i);
+ IReaderByTimestamp seriesReaderByTimestamp =
+ getReaderByTimestamp(
+ path,
+ queryPlan.getAllMeasurementsInDevice(path.getDevice()),
+ queryPlan.getDeduplicatedDataTypes().get(i),
+ context);
+ readersOfSelectedSeries.add(seriesReaderByTimestamp);
+ }
return readersOfSelectedSeries;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java b/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
index 8e4acc4..287483b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
@@ -90,11 +90,10 @@ public class ServerTimeGenerator extends TimeGenerator {
// init QueryDataSource Cache
QueryResourceManager.getInstance()
.initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
-
- operatorNode = construct(expression);
} finally {
StorageEngine.getInstance().mergeUnLock(lockList);
}
+ operatorNode = construct(expression);
}
private Filter getPathListAndConstructTimeFilterFromExpression(