You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/10 01:18:29 UTC

[iotdb] branch master updated: [IOTDB-3134] Calculating allSensors fields in LocalExecutionPlanner (#5845)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 825583eb2e [IOTDB-3134] Calculating allSensors fields in LocalExecutionPlanner (#5845)
825583eb2e is described below

commit 825583eb2e548b522682efc904a2ec9ba847293d
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Tue May 10 09:18:24 2022 +0800

    [IOTDB-3134] Calculating allSensors fields in LocalExecutionPlanner (#5845)
---
 .../db/mpp/plan/planner/LocalExecutionPlanner.java      | 17 +++++++++++++++--
 1 file changed, 15 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 3a8919af44..3f3c1515f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -106,9 +106,12 @@ import org.apache.commons.lang3.Validate;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -189,7 +192,7 @@ public class LocalExecutionPlanner {
           new SeriesScanOperator(
               node.getPlanNodeId(),
               seriesPath,
-              node.getAllSensors(),
+              context.getAllSensors(seriesPath.getDeviceIdString(), seriesPath.getMeasurement()),
               seriesPath.getSeriesType(),
               operatorContext,
               node.getTimeFilter(),
@@ -375,7 +378,7 @@ public class LocalExecutionPlanner {
           new SeriesAggregateScanOperator(
               node.getPlanNodeId(),
               seriesPath,
-              node.getAllSensors(),
+              context.getAllSensors(seriesPath.getDeviceIdString(), seriesPath.getMeasurement()),
               operatorContext,
               aggregators,
               node.getTimeFilter(),
@@ -693,6 +696,8 @@ public class LocalExecutionPlanner {
   private static class LocalExecutionPlanContext {
     private final FragmentInstanceContext instanceContext;
     private final List<PartialPath> paths;
+    // deviceId -> sensorId Set
+    private final Map<String, Set<String>> allSensorsMap;
     // Used to lock corresponding query resources
     private final List<DataSourceOperator> sourceOperators;
     private ISinkHandle sinkHandle;
@@ -706,12 +711,14 @@ public class LocalExecutionPlanner {
       this.typeProvider = typeProvider;
       this.instanceContext = instanceContext;
       this.paths = new ArrayList<>();
+      this.allSensorsMap = new HashMap<>();
       this.sourceOperators = new ArrayList<>();
     }
 
     public LocalExecutionPlanContext(FragmentInstanceContext instanceContext) {
       this.instanceContext = instanceContext;
       this.paths = new ArrayList<>();
+      this.allSensorsMap = new HashMap<>();
       this.sourceOperators = new ArrayList<>();
     }
 
@@ -723,6 +730,12 @@ public class LocalExecutionPlanner {
       return paths;
     }
 
+    public Set<String> getAllSensors(String deviceId, String sensorId) {
+      Set<String> allSensors = allSensorsMap.computeIfAbsent(deviceId, k -> new HashSet<>());
+      allSensors.add(sensorId);
+      return allSensors;
+    }
+
     public List<DataSourceOperator> getSourceOperators() {
       return sourceOperators;
     }