You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/01/29 02:22:07 UTC

[iotdb] branch QueryImprovement created (now f9a678e)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch QueryImprovement
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at f9a678e  Refine the lock granularity of the query

This branch includes the following new commits:

     new f9a678e  Refine the lock granularity of the query

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: Refine the lock granularity of the query

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch QueryImprovement
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f9a678e4bf55d6aee8741f86352bf09566cf5551
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Sat Jan 29 10:21:01 2022 +0800

    Refine the lock granularity of the query
---
 .../org/apache/iotdb/db/metadata/MManager.java     | 18 ++---
 .../query/dataset/groupby/GroupByFillDataSet.java  | 23 +++---
 .../groupby/GroupByWithValueFilterDataSet.java     | 11 +--
 .../groupby/GroupByWithoutValueFilterDataSet.java  | 47 ++++++------
 .../db/query/executor/AggregationExecutor.java     | 38 ++++-----
 .../iotdb/db/query/executor/FillQueryExecutor.java | 89 +++++++++++-----------
 .../iotdb/db/query/executor/LastQueryExecutor.java | 32 ++++----
 .../db/query/executor/RawDataQueryExecutor.java    | 72 ++++++++---------
 .../query/timegenerator/ServerTimeGenerator.java   |  3 +-
 9 files changed, 169 insertions(+), 164 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 4c66193..ad55efd 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -843,18 +843,18 @@ public class MManager {
           // init QueryDataSource Cache
           QueryResourceManager.getInstance()
               .initQueryDataSourceCache(processorToSeriesMap, context, null);
-
-          allMatchedNodes =
-              allMatchedNodes.stream()
-                  .sorted(
-                      Comparator.comparingLong(
-                              (MeasurementMNode mNode) -> MTree.getLastTimeStamp(mNode, context))
-                          .reversed()
-                          .thenComparing(MNode::getFullPath))
-                  .collect(toList());
         } finally {
           StorageEngine.getInstance().mergeUnLock(list);
         }
+
+        allMatchedNodes =
+            allMatchedNodes.stream()
+                .sorted(
+                    Comparator.comparingLong(
+                            (MeasurementMNode mNode) -> MTree.getLastTimeStamp(mNode, context))
+                        .reversed()
+                        .thenComparing(MNode::getFullPath))
+                .collect(toList());
       } catch (StorageEngineException | QueryProcessException e) {
         throw new MetadataException(e);
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
index 23e6d0f..cf5c14c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
@@ -137,20 +137,21 @@ public class GroupByFillDataSet extends QueryDataSet {
       // init QueryDataSource Cache
       QueryResourceManager.getInstance()
           .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
-      for (int i = 0; i < paths.size(); i++) {
-        PreviousFill fill = previousFillExecutors[i];
-        firstNotNullTV[i] = fill.getFillResult();
-        TimeValuePair timeValuePair = firstNotNullTV[i];
-        previousValue[i] = null;
-        previousTime[i] = Long.MAX_VALUE;
-        if (ascending && timeValuePair != null && timeValuePair.getValue() != null) {
-          previousValue[i] = timeValuePair.getValue().getValue();
-          previousTime[i] = timeValuePair.getTimestamp();
-        }
-      }
     } finally {
       StorageEngine.getInstance().mergeUnLock(lockList);
     }
+
+    for (int i = 0; i < paths.size(); i++) {
+      PreviousFill fill = previousFillExecutors[i];
+      firstNotNullTV[i] = fill.getFillResult();
+      TimeValuePair timeValuePair = firstNotNullTV[i];
+      previousValue[i] = null;
+      previousTime[i] = Long.MAX_VALUE;
+      if (ascending && timeValuePair != null && timeValuePair.getValue() != null) {
+        previousValue[i] = timeValuePair.getValue().getValue();
+        previousTime[i] = timeValuePair.getTimestamp();
+      }
+    }
   }
 
   private void initLastTimeArray(QueryContext context, GroupByTimeFillPlan groupByFillPlan)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index 339ba09..0eaa004 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -105,14 +105,15 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
       // init QueryDataSource Cache
       QueryResourceManager.getInstance()
           .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
-      for (int i = 0; i < paths.size(); i++) {
-        PartialPath path = (PartialPath) paths.get(i);
-        allDataReaderList.add(
-            getReaderByTime(path, groupByTimePlan, dataTypes.get(i), context, null));
-      }
     } finally {
       StorageEngine.getInstance().mergeUnLock(lockList);
     }
+
+    for (int i = 0; i < paths.size(); i++) {
+      PartialPath path = (PartialPath) paths.get(i);
+      allDataReaderList.add(
+          getReaderByTime(path, groupByTimePlan, dataTypes.get(i), context, null));
+    }
   }
 
   protected TimeGenerator getTimeGenerator(QueryContext context, RawDataQueryPlan queryPlan)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index 5628666..65480a4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -102,32 +102,33 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
       // init QueryDataSource Cache
       QueryResourceManager.getInstance()
           .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
-      // init resultIndexes, group result indexes by path
-      for (int i = 0; i < paths.size(); i++) {
-        PartialPath path = (PartialPath) paths.get(i);
-        if (!pathExecutors.containsKey(path)) {
-          // init GroupByExecutor
-          pathExecutors.put(
-              path,
-              getGroupByExecutor(
-                  path,
-                  groupByTimePlan.getAllMeasurementsInDevice(path.getDevice()),
-                  dataTypes.get(i),
-                  context,
-                  timeFilter.copy(),
-                  null,
-                  groupByTimePlan.isAscending()));
-          resultIndexes.put(path, new ArrayList<>());
-        }
-        resultIndexes.get(path).add(i);
-        AggregateResult aggrResult =
-            AggregateResultFactory.getAggrResultByName(
-                groupByTimePlan.getDeduplicatedAggregations().get(i), dataTypes.get(i), ascending);
-        pathExecutors.get(path).addAggregateResult(aggrResult);
-      }
     } finally {
       StorageEngine.getInstance().mergeUnLock(lockList);
     }
+
+    // init resultIndexes, group result indexes by path
+    for (int i = 0; i < paths.size(); i++) {
+      PartialPath path = (PartialPath) paths.get(i);
+      if (!pathExecutors.containsKey(path)) {
+        // init GroupByExecutor
+        pathExecutors.put(
+            path,
+            getGroupByExecutor(
+                path,
+                groupByTimePlan.getAllMeasurementsInDevice(path.getDevice()),
+                dataTypes.get(i),
+                context,
+                timeFilter.copy(),
+                null,
+                groupByTimePlan.isAscending()));
+        resultIndexes.put(path, new ArrayList<>());
+      }
+      resultIndexes.get(path).add(i);
+      AggregateResult aggrResult =
+          AggregateResultFactory.getAggrResultByName(
+              groupByTimePlan.getDeduplicatedAggregations().get(i), dataTypes.get(i), ascending);
+      pathExecutors.get(path).addAggregateResult(aggrResult);
+    }
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 6ce3328..9bd6f9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -115,19 +115,19 @@ public class AggregationExecutor {
       // init QueryDataSource Cache
       QueryResourceManager.getInstance()
           .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
-
-      for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
-        aggregateOneSeries(
-            entry,
-            aggregateResultList,
-            aggregationPlan.getAllMeasurementsInDevice(entry.getKey().getDevice()),
-            timeFilter,
-            context);
-      }
     } finally {
       StorageEngine.getInstance().mergeUnLock(lockList);
     }
 
+    for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
+      aggregateOneSeries(
+          entry,
+          aggregateResultList,
+          aggregationPlan.getAllMeasurementsInDevice(entry.getKey().getDevice()),
+          timeFilter,
+          context);
+    }
+
     return constructDataSet(Arrays.asList(aggregateResultList), aggregationPlan);
   }
 
@@ -367,20 +367,20 @@ public class AggregationExecutor {
       QueryResourceManager.getInstance()
           .initQueryDataSourceCache(
               processorToSeriesMap, context, timestampGenerator.getTimeFilter());
-
-      for (int i = 0; i < selectedSeries.size(); i++) {
-        PartialPath path = selectedSeries.get(i);
-        List<Integer> indexes = pathToAggrIndexesMap.remove(path);
-        if (indexes != null) {
-          IReaderByTimestamp seriesReaderByTimestamp =
-              getReaderByTime(path, queryPlan, dataTypes.get(i), context);
-          readerToAggrIndexesMap.put(seriesReaderByTimestamp, indexes);
-        }
-      }
     } finally {
       StorageEngine.getInstance().mergeUnLock(lockList);
     }
 
+    for (int i = 0; i < selectedSeries.size(); i++) {
+      PartialPath path = selectedSeries.get(i);
+      List<Integer> indexes = pathToAggrIndexesMap.remove(path);
+      if (indexes != null) {
+        IReaderByTimestamp seriesReaderByTimestamp =
+            getReaderByTime(path, queryPlan, dataTypes.get(i), context);
+        readerToAggrIndexesMap.put(seriesReaderByTimestamp, indexes);
+      }
+    }
+
     List<AggregateResult> aggregateResults = new ArrayList<>();
     for (int i = 0; i < selectedSeries.size(); i++) {
       AggregateResult result =
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
index 662267e..20681d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
@@ -89,53 +89,54 @@ public class FillQueryExecutor {
       // init QueryDataSource Cache
       QueryResourceManager.getInstance()
           .initQueryDataSourceCache(processorToSeriesMap, context, contructTimeFilter());
-      List<TimeValuePair> timeValuePairs = getTimeValuePairs(context);
-      long defaultFillInterval = IoTDBDescriptor.getInstance().getConfig().getDefaultFillInterval();
-      for (int i = 0; i < selectedSeries.size(); i++) {
-        PartialPath path = selectedSeries.get(i);
-        TSDataType dataType = dataTypes.get(i);
-
-        if (timeValuePairs.get(i) != null) {
-          // No need to fill
-          record.addField(timeValuePairs.get(i).getValue().getValue(), dataType);
-          continue;
-        }
+    } finally {
+      StorageEngine.getInstance().mergeUnLock(lockList);
+    }
 
-        IFill fill;
-        if (!typeIFillMap.containsKey(dataType)) {
-          switch (dataType) {
-            case INT32:
-            case INT64:
-            case FLOAT:
-            case DOUBLE:
-            case BOOLEAN:
-            case TEXT:
-              fill = new PreviousFill(dataType, queryTime, defaultFillInterval);
-              break;
-            default:
-              throw new UnsupportedDataTypeException("unsupported data type " + dataType);
-          }
-        } else {
-          fill = typeIFillMap.get(dataType).copy();
-        }
-        fill =
-            configureFill(
-                fill,
-                path,
-                dataType,
-                queryTime,
-                plan.getAllMeasurementsInDevice(path.getDevice()),
-                context);
-
-        TimeValuePair timeValuePair = fill.getFillResult();
-        if (timeValuePair == null || timeValuePair.getValue() == null) {
-          record.addField(null);
-        } else {
-          record.addField(timeValuePair.getValue().getValue(), dataType);
+    List<TimeValuePair> timeValuePairs = getTimeValuePairs(context);
+    long defaultFillInterval = IoTDBDescriptor.getInstance().getConfig().getDefaultFillInterval();
+    for (int i = 0; i < selectedSeries.size(); i++) {
+      PartialPath path = selectedSeries.get(i);
+      TSDataType dataType = dataTypes.get(i);
+
+      if (timeValuePairs.get(i) != null) {
+        // No need to fill
+        record.addField(timeValuePairs.get(i).getValue().getValue(), dataType);
+        continue;
+      }
+
+      IFill fill;
+      if (!typeIFillMap.containsKey(dataType)) {
+        switch (dataType) {
+          case INT32:
+          case INT64:
+          case FLOAT:
+          case DOUBLE:
+          case BOOLEAN:
+          case TEXT:
+            fill = new PreviousFill(dataType, queryTime, defaultFillInterval);
+            break;
+          default:
+            throw new UnsupportedDataTypeException("unsupported data type " + dataType);
         }
+      } else {
+        fill = typeIFillMap.get(dataType).copy();
+      }
+      fill =
+          configureFill(
+              fill,
+              path,
+              dataType,
+              queryTime,
+              plan.getAllMeasurementsInDevice(path.getDevice()),
+              context);
+
+      TimeValuePair timeValuePair = fill.getFillResult();
+      if (timeValuePair == null || timeValuePair.getValue() == null) {
+        record.addField(null);
+      } else {
+        record.addField(timeValuePair.getValue().getValue(), dataType);
       }
-    } finally {
-      StorageEngine.getInstance().mergeUnLock(lockList);
     }
 
     SingleDataSet dataSet = new SingleDataSet(selectedSeries, dataTypes);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
index f34f9f6..dfa9ed0 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
@@ -183,26 +183,26 @@ public class LastQueryExecutor {
       // init QueryDataSource Cache
       QueryResourceManager.getInstance()
           .initQueryDataSourceCache(processorToSeriesMap, context, filter);
-
-      for (int i = 0; i < nonCachedPaths.size(); i++) {
-        QueryDataSource dataSource =
-            QueryResourceManager.getInstance()
-                .getQueryDataSource(nonCachedPaths.get(i), context, filter);
-        LastPointReader lastReader =
-            new LastPointReader(
-                nonCachedPaths.get(i),
-                nonCachedDataTypes.get(i),
-                deviceMeasurementsMap.get(nonCachedPaths.get(i).getDevice()),
-                context,
-                dataSource,
-                Long.MAX_VALUE,
-                filter);
-        readerList.add(lastReader);
-      }
     } finally {
       StorageEngine.getInstance().mergeUnLock(lockList);
     }
 
+    for (int i = 0; i < nonCachedPaths.size(); i++) {
+      QueryDataSource dataSource =
+          QueryResourceManager.getInstance()
+              .getQueryDataSource(nonCachedPaths.get(i), context, filter);
+      LastPointReader lastReader =
+          new LastPointReader(
+              nonCachedPaths.get(i),
+              nonCachedDataTypes.get(i),
+              deviceMeasurementsMap.get(nonCachedPaths.get(i).getDevice()),
+              context,
+              dataSource,
+              Long.MAX_VALUE,
+              filter);
+      readerList.add(lastReader);
+    }
+
     // Compute Last result for the rest series paths by scanning Tsfiles
     int index = 0;
     for (int i = 0; i < resultContainer.size(); i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
index 92372be..0dc5ac8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
@@ -112,30 +112,31 @@ public class RawDataQueryExecutor {
       // init QueryDataSource cache
       QueryResourceManager.getInstance()
           .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
-      for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
-        PartialPath path = queryPlan.getDeduplicatedPaths().get(i);
-        TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i);
-
-        QueryDataSource queryDataSource =
-            QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
-        timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
-
-        ManagedSeriesReader reader =
-            new SeriesRawDataBatchReader(
-                path,
-                queryPlan.getAllMeasurementsInDevice(path.getDevice()),
-                dataType,
-                context,
-                queryDataSource,
-                timeFilter,
-                null,
-                null,
-                queryPlan.isAscending());
-        readersOfSelectedSeries.add(reader);
-      }
     } finally {
       StorageEngine.getInstance().mergeUnLock(lockList);
     }
+
+    for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
+      PartialPath path = queryPlan.getDeduplicatedPaths().get(i);
+      TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i);
+
+      QueryDataSource queryDataSource =
+          QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
+      timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+      ManagedSeriesReader reader =
+          new SeriesRawDataBatchReader(
+              path,
+              queryPlan.getAllMeasurementsInDevice(path.getDevice()),
+              dataType,
+              context,
+              queryDataSource,
+              timeFilter,
+              null,
+              null,
+              queryPlan.isAscending());
+      readersOfSelectedSeries.add(reader);
+    }
     return readersOfSelectedSeries;
   }
 
@@ -185,23 +186,24 @@ public class RawDataQueryExecutor {
       // init QueryDataSource Cache
       QueryResourceManager.getInstance()
           .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
-      for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
-        if (cached.get(i)) {
-          readersOfSelectedSeries.add(null);
-          continue;
-        }
-        PartialPath path = queryPlan.getDeduplicatedPaths().get(i);
-        IReaderByTimestamp seriesReaderByTimestamp =
-            getReaderByTimestamp(
-                path,
-                queryPlan.getAllMeasurementsInDevice(path.getDevice()),
-                queryPlan.getDeduplicatedDataTypes().get(i),
-                context);
-        readersOfSelectedSeries.add(seriesReaderByTimestamp);
-      }
     } finally {
       StorageEngine.getInstance().mergeUnLock(lockList);
     }
+
+    for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
+      if (cached.get(i)) {
+        readersOfSelectedSeries.add(null);
+        continue;
+      }
+      PartialPath path = queryPlan.getDeduplicatedPaths().get(i);
+      IReaderByTimestamp seriesReaderByTimestamp =
+          getReaderByTimestamp(
+              path,
+              queryPlan.getAllMeasurementsInDevice(path.getDevice()),
+              queryPlan.getDeduplicatedDataTypes().get(i),
+              context);
+      readersOfSelectedSeries.add(seriesReaderByTimestamp);
+    }
     return readersOfSelectedSeries;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java b/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
index 8e4acc4..287483b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
@@ -90,11 +90,10 @@ public class ServerTimeGenerator extends TimeGenerator {
       // init QueryDataSource Cache
       QueryResourceManager.getInstance()
           .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
-
-      operatorNode = construct(expression);
     } finally {
       StorageEngine.getInstance().mergeUnLock(lockList);
     }
+    operatorNode = construct(expression);
   }
 
   private Filter getPathListAndConstructTimeFilterFromExpression(