You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/01/29 02:22:08 UTC

[iotdb] 01/01: Refine the lock granularity of the query

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch QueryImprovement
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f9a678e4bf55d6aee8741f86352bf09566cf5551
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Sat Jan 29 10:21:01 2022 +0800

    Refine the lock granularity of the query
---
 .../org/apache/iotdb/db/metadata/MManager.java     | 18 ++---
 .../query/dataset/groupby/GroupByFillDataSet.java  | 23 +++---
 .../groupby/GroupByWithValueFilterDataSet.java     | 11 +--
 .../groupby/GroupByWithoutValueFilterDataSet.java  | 47 ++++++------
 .../db/query/executor/AggregationExecutor.java     | 38 ++++-----
 .../iotdb/db/query/executor/FillQueryExecutor.java | 89 +++++++++++-----------
 .../iotdb/db/query/executor/LastQueryExecutor.java | 32 ++++----
 .../db/query/executor/RawDataQueryExecutor.java    | 72 ++++++++---------
 .../query/timegenerator/ServerTimeGenerator.java   |  3 +-
 9 files changed, 169 insertions(+), 164 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 4c66193..ad55efd 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -843,18 +843,18 @@ public class MManager {
           // init QueryDataSource Cache
           QueryResourceManager.getInstance()
               .initQueryDataSourceCache(processorToSeriesMap, context, null);
-
-          allMatchedNodes =
-              allMatchedNodes.stream()
-                  .sorted(
-                      Comparator.comparingLong(
-                              (MeasurementMNode mNode) -> MTree.getLastTimeStamp(mNode, context))
-                          .reversed()
-                          .thenComparing(MNode::getFullPath))
-                  .collect(toList());
         } finally {
           StorageEngine.getInstance().mergeUnLock(list);
         }
+
+        allMatchedNodes =
+            allMatchedNodes.stream()
+                .sorted(
+                    Comparator.comparingLong(
+                            (MeasurementMNode mNode) -> MTree.getLastTimeStamp(mNode, context))
+                        .reversed()
+                        .thenComparing(MNode::getFullPath))
+                .collect(toList());
       } catch (StorageEngineException | QueryProcessException e) {
         throw new MetadataException(e);
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
index 23e6d0f..cf5c14c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
@@ -137,20 +137,21 @@ public class GroupByFillDataSet extends QueryDataSet {
       // init QueryDataSource Cache
       QueryResourceManager.getInstance()
           .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
-      for (int i = 0; i < paths.size(); i++) {
-        PreviousFill fill = previousFillExecutors[i];
-        firstNotNullTV[i] = fill.getFillResult();
-        TimeValuePair timeValuePair = firstNotNullTV[i];
-        previousValue[i] = null;
-        previousTime[i] = Long.MAX_VALUE;
-        if (ascending && timeValuePair != null && timeValuePair.getValue() != null) {
-          previousValue[i] = timeValuePair.getValue().getValue();
-          previousTime[i] = timeValuePair.getTimestamp();
-        }
-      }
     } finally {
       StorageEngine.getInstance().mergeUnLock(lockList);
     }
+
+    for (int i = 0; i < paths.size(); i++) {
+      PreviousFill fill = previousFillExecutors[i];
+      firstNotNullTV[i] = fill.getFillResult();
+      TimeValuePair timeValuePair = firstNotNullTV[i];
+      previousValue[i] = null;
+      previousTime[i] = Long.MAX_VALUE;
+      if (ascending && timeValuePair != null && timeValuePair.getValue() != null) {
+        previousValue[i] = timeValuePair.getValue().getValue();
+        previousTime[i] = timeValuePair.getTimestamp();
+      }
+    }
   }
 
   private void initLastTimeArray(QueryContext context, GroupByTimeFillPlan groupByFillPlan)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index 339ba09..0eaa004 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -105,14 +105,15 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
       // init QueryDataSource Cache
       QueryResourceManager.getInstance()
           .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
-      for (int i = 0; i < paths.size(); i++) {
-        PartialPath path = (PartialPath) paths.get(i);
-        allDataReaderList.add(
-            getReaderByTime(path, groupByTimePlan, dataTypes.get(i), context, null));
-      }
     } finally {
       StorageEngine.getInstance().mergeUnLock(lockList);
     }
+
+    for (int i = 0; i < paths.size(); i++) {
+      PartialPath path = (PartialPath) paths.get(i);
+      allDataReaderList.add(
+          getReaderByTime(path, groupByTimePlan, dataTypes.get(i), context, null));
+    }
   }
 
   protected TimeGenerator getTimeGenerator(QueryContext context, RawDataQueryPlan queryPlan)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index 5628666..65480a4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -102,32 +102,33 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
       // init QueryDataSource Cache
       QueryResourceManager.getInstance()
           .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
-      // init resultIndexes, group result indexes by path
-      for (int i = 0; i < paths.size(); i++) {
-        PartialPath path = (PartialPath) paths.get(i);
-        if (!pathExecutors.containsKey(path)) {
-          // init GroupByExecutor
-          pathExecutors.put(
-              path,
-              getGroupByExecutor(
-                  path,
-                  groupByTimePlan.getAllMeasurementsInDevice(path.getDevice()),
-                  dataTypes.get(i),
-                  context,
-                  timeFilter.copy(),
-                  null,
-                  groupByTimePlan.isAscending()));
-          resultIndexes.put(path, new ArrayList<>());
-        }
-        resultIndexes.get(path).add(i);
-        AggregateResult aggrResult =
-            AggregateResultFactory.getAggrResultByName(
-                groupByTimePlan.getDeduplicatedAggregations().get(i), dataTypes.get(i), ascending);
-        pathExecutors.get(path).addAggregateResult(aggrResult);
-      }
     } finally {
       StorageEngine.getInstance().mergeUnLock(lockList);
     }
+
+    // init resultIndexes, group result indexes by path
+    for (int i = 0; i < paths.size(); i++) {
+      PartialPath path = (PartialPath) paths.get(i);
+      if (!pathExecutors.containsKey(path)) {
+        // init GroupByExecutor
+        pathExecutors.put(
+            path,
+            getGroupByExecutor(
+                path,
+                groupByTimePlan.getAllMeasurementsInDevice(path.getDevice()),
+                dataTypes.get(i),
+                context,
+                timeFilter.copy(),
+                null,
+                groupByTimePlan.isAscending()));
+        resultIndexes.put(path, new ArrayList<>());
+      }
+      resultIndexes.get(path).add(i);
+      AggregateResult aggrResult =
+          AggregateResultFactory.getAggrResultByName(
+              groupByTimePlan.getDeduplicatedAggregations().get(i), dataTypes.get(i), ascending);
+      pathExecutors.get(path).addAggregateResult(aggrResult);
+    }
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 6ce3328..9bd6f9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -115,19 +115,19 @@ public class AggregationExecutor {
       // init QueryDataSource Cache
       QueryResourceManager.getInstance()
           .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
-
-      for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
-        aggregateOneSeries(
-            entry,
-            aggregateResultList,
-            aggregationPlan.getAllMeasurementsInDevice(entry.getKey().getDevice()),
-            timeFilter,
-            context);
-      }
     } finally {
       StorageEngine.getInstance().mergeUnLock(lockList);
     }
 
+    for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
+      aggregateOneSeries(
+          entry,
+          aggregateResultList,
+          aggregationPlan.getAllMeasurementsInDevice(entry.getKey().getDevice()),
+          timeFilter,
+          context);
+    }
+
     return constructDataSet(Arrays.asList(aggregateResultList), aggregationPlan);
   }
 
@@ -367,20 +367,20 @@ public class AggregationExecutor {
       QueryResourceManager.getInstance()
           .initQueryDataSourceCache(
               processorToSeriesMap, context, timestampGenerator.getTimeFilter());
-
-      for (int i = 0; i < selectedSeries.size(); i++) {
-        PartialPath path = selectedSeries.get(i);
-        List<Integer> indexes = pathToAggrIndexesMap.remove(path);
-        if (indexes != null) {
-          IReaderByTimestamp seriesReaderByTimestamp =
-              getReaderByTime(path, queryPlan, dataTypes.get(i), context);
-          readerToAggrIndexesMap.put(seriesReaderByTimestamp, indexes);
-        }
-      }
     } finally {
       StorageEngine.getInstance().mergeUnLock(lockList);
     }
 
+    for (int i = 0; i < selectedSeries.size(); i++) {
+      PartialPath path = selectedSeries.get(i);
+      List<Integer> indexes = pathToAggrIndexesMap.remove(path);
+      if (indexes != null) {
+        IReaderByTimestamp seriesReaderByTimestamp =
+            getReaderByTime(path, queryPlan, dataTypes.get(i), context);
+        readerToAggrIndexesMap.put(seriesReaderByTimestamp, indexes);
+      }
+    }
+
     List<AggregateResult> aggregateResults = new ArrayList<>();
     for (int i = 0; i < selectedSeries.size(); i++) {
       AggregateResult result =
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
index 662267e..20681d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
@@ -89,53 +89,54 @@ public class FillQueryExecutor {
       // init QueryDataSource Cache
       QueryResourceManager.getInstance()
           .initQueryDataSourceCache(processorToSeriesMap, context, contructTimeFilter());
-      List<TimeValuePair> timeValuePairs = getTimeValuePairs(context);
-      long defaultFillInterval = IoTDBDescriptor.getInstance().getConfig().getDefaultFillInterval();
-      for (int i = 0; i < selectedSeries.size(); i++) {
-        PartialPath path = selectedSeries.get(i);
-        TSDataType dataType = dataTypes.get(i);
-
-        if (timeValuePairs.get(i) != null) {
-          // No need to fill
-          record.addField(timeValuePairs.get(i).getValue().getValue(), dataType);
-          continue;
-        }
+    } finally {
+      StorageEngine.getInstance().mergeUnLock(lockList);
+    }
 
-        IFill fill;
-        if (!typeIFillMap.containsKey(dataType)) {
-          switch (dataType) {
-            case INT32:
-            case INT64:
-            case FLOAT:
-            case DOUBLE:
-            case BOOLEAN:
-            case TEXT:
-              fill = new PreviousFill(dataType, queryTime, defaultFillInterval);
-              break;
-            default:
-              throw new UnsupportedDataTypeException("unsupported data type " + dataType);
-          }
-        } else {
-          fill = typeIFillMap.get(dataType).copy();
-        }
-        fill =
-            configureFill(
-                fill,
-                path,
-                dataType,
-                queryTime,
-                plan.getAllMeasurementsInDevice(path.getDevice()),
-                context);
-
-        TimeValuePair timeValuePair = fill.getFillResult();
-        if (timeValuePair == null || timeValuePair.getValue() == null) {
-          record.addField(null);
-        } else {
-          record.addField(timeValuePair.getValue().getValue(), dataType);
+    List<TimeValuePair> timeValuePairs = getTimeValuePairs(context);
+    long defaultFillInterval = IoTDBDescriptor.getInstance().getConfig().getDefaultFillInterval();
+    for (int i = 0; i < selectedSeries.size(); i++) {
+      PartialPath path = selectedSeries.get(i);
+      TSDataType dataType = dataTypes.get(i);
+
+      if (timeValuePairs.get(i) != null) {
+        // No need to fill
+        record.addField(timeValuePairs.get(i).getValue().getValue(), dataType);
+        continue;
+      }
+
+      IFill fill;
+      if (!typeIFillMap.containsKey(dataType)) {
+        switch (dataType) {
+          case INT32:
+          case INT64:
+          case FLOAT:
+          case DOUBLE:
+          case BOOLEAN:
+          case TEXT:
+            fill = new PreviousFill(dataType, queryTime, defaultFillInterval);
+            break;
+          default:
+            throw new UnsupportedDataTypeException("unsupported data type " + dataType);
         }
+      } else {
+        fill = typeIFillMap.get(dataType).copy();
+      }
+      fill =
+          configureFill(
+              fill,
+              path,
+              dataType,
+              queryTime,
+              plan.getAllMeasurementsInDevice(path.getDevice()),
+              context);
+
+      TimeValuePair timeValuePair = fill.getFillResult();
+      if (timeValuePair == null || timeValuePair.getValue() == null) {
+        record.addField(null);
+      } else {
+        record.addField(timeValuePair.getValue().getValue(), dataType);
       }
-    } finally {
-      StorageEngine.getInstance().mergeUnLock(lockList);
     }
 
     SingleDataSet dataSet = new SingleDataSet(selectedSeries, dataTypes);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
index f34f9f6..dfa9ed0 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
@@ -183,26 +183,26 @@ public class LastQueryExecutor {
       // init QueryDataSource Cache
       QueryResourceManager.getInstance()
           .initQueryDataSourceCache(processorToSeriesMap, context, filter);
-
-      for (int i = 0; i < nonCachedPaths.size(); i++) {
-        QueryDataSource dataSource =
-            QueryResourceManager.getInstance()
-                .getQueryDataSource(nonCachedPaths.get(i), context, filter);
-        LastPointReader lastReader =
-            new LastPointReader(
-                nonCachedPaths.get(i),
-                nonCachedDataTypes.get(i),
-                deviceMeasurementsMap.get(nonCachedPaths.get(i).getDevice()),
-                context,
-                dataSource,
-                Long.MAX_VALUE,
-                filter);
-        readerList.add(lastReader);
-      }
     } finally {
       StorageEngine.getInstance().mergeUnLock(lockList);
     }
 
+    for (int i = 0; i < nonCachedPaths.size(); i++) {
+      QueryDataSource dataSource =
+          QueryResourceManager.getInstance()
+              .getQueryDataSource(nonCachedPaths.get(i), context, filter);
+      LastPointReader lastReader =
+          new LastPointReader(
+              nonCachedPaths.get(i),
+              nonCachedDataTypes.get(i),
+              deviceMeasurementsMap.get(nonCachedPaths.get(i).getDevice()),
+              context,
+              dataSource,
+              Long.MAX_VALUE,
+              filter);
+      readerList.add(lastReader);
+    }
+
     // Compute Last result for the rest series paths by scanning Tsfiles
     int index = 0;
     for (int i = 0; i < resultContainer.size(); i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
index 92372be..0dc5ac8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
@@ -112,30 +112,31 @@ public class RawDataQueryExecutor {
       // init QueryDataSource cache
       QueryResourceManager.getInstance()
           .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
-      for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
-        PartialPath path = queryPlan.getDeduplicatedPaths().get(i);
-        TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i);
-
-        QueryDataSource queryDataSource =
-            QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
-        timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
-
-        ManagedSeriesReader reader =
-            new SeriesRawDataBatchReader(
-                path,
-                queryPlan.getAllMeasurementsInDevice(path.getDevice()),
-                dataType,
-                context,
-                queryDataSource,
-                timeFilter,
-                null,
-                null,
-                queryPlan.isAscending());
-        readersOfSelectedSeries.add(reader);
-      }
     } finally {
       StorageEngine.getInstance().mergeUnLock(lockList);
     }
+
+    for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
+      PartialPath path = queryPlan.getDeduplicatedPaths().get(i);
+      TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i);
+
+      QueryDataSource queryDataSource =
+          QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
+      timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+      ManagedSeriesReader reader =
+          new SeriesRawDataBatchReader(
+              path,
+              queryPlan.getAllMeasurementsInDevice(path.getDevice()),
+              dataType,
+              context,
+              queryDataSource,
+              timeFilter,
+              null,
+              null,
+              queryPlan.isAscending());
+      readersOfSelectedSeries.add(reader);
+    }
     return readersOfSelectedSeries;
   }
 
@@ -185,23 +186,24 @@ public class RawDataQueryExecutor {
       // init QueryDataSource Cache
       QueryResourceManager.getInstance()
           .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
-      for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
-        if (cached.get(i)) {
-          readersOfSelectedSeries.add(null);
-          continue;
-        }
-        PartialPath path = queryPlan.getDeduplicatedPaths().get(i);
-        IReaderByTimestamp seriesReaderByTimestamp =
-            getReaderByTimestamp(
-                path,
-                queryPlan.getAllMeasurementsInDevice(path.getDevice()),
-                queryPlan.getDeduplicatedDataTypes().get(i),
-                context);
-        readersOfSelectedSeries.add(seriesReaderByTimestamp);
-      }
     } finally {
       StorageEngine.getInstance().mergeUnLock(lockList);
     }
+
+    for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
+      if (cached.get(i)) {
+        readersOfSelectedSeries.add(null);
+        continue;
+      }
+      PartialPath path = queryPlan.getDeduplicatedPaths().get(i);
+      IReaderByTimestamp seriesReaderByTimestamp =
+          getReaderByTimestamp(
+              path,
+              queryPlan.getAllMeasurementsInDevice(path.getDevice()),
+              queryPlan.getDeduplicatedDataTypes().get(i),
+              context);
+      readersOfSelectedSeries.add(seriesReaderByTimestamp);
+    }
     return readersOfSelectedSeries;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java b/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
index 8e4acc4..287483b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
@@ -90,11 +90,10 @@ public class ServerTimeGenerator extends TimeGenerator {
       // init QueryDataSource Cache
       QueryResourceManager.getInstance()
           .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
-
-      operatorNode = construct(expression);
     } finally {
       StorageEngine.getInstance().mergeUnLock(lockList);
     }
+    operatorNode = construct(expression);
   }
 
   private Filter getPathListAndConstructTimeFilterFromExpression(