You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/03/24 12:01:01 UTC

[incubator-iotdb] branch master updated: Delete dataTypeMapping etc fields in QueryPlan (#934)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 94411e0  Delete dataTypeMapping etc fields in QueryPlan (#934)
94411e0 is described below

commit 94411e01a123cdc83cb0353c31ccd0fd008fdea7
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Tue Mar 24 20:00:52 2020 +0800

    Delete dataTypeMapping etc fields in QueryPlan (#934)
    
    * Remove the dataTypeMapping field in QueryPlan for saving memory
---
 .../db/qp/physical/crud/AlignByDevicePlan.java     | 12 ++--
 .../iotdb/db/qp/physical/crud/QueryPlan.java       | 11 ----
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    | 65 ++++++++--------------
 .../db/query/dataset/AlignByDeviceDataSet.java     | 53 ++++++++++--------
 4 files changed, 58 insertions(+), 83 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
index 5faf7b3..297c7b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.qp.physical.crud;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -29,8 +28,8 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
 public class AlignByDevicePlan extends QueryPlan {
 
   private List<String> measurements; // to record result measurement columns, e.g. temperature, status, speed
-  private Map<String, Set<String>> deviceToMeasurementsMap; // e.g. root.ln.d1 -> temperature
   // to check data type consistency for the same name sensor of different devices
+  private List<String> devices;
   private Map<String, TSDataType> measurementDataTypeMap;
   private Map<String, IExpression> deviceToFilterMap;
   // to record different kinds of measurement
@@ -52,13 +51,12 @@ public class AlignByDevicePlan extends QueryPlan {
     return measurements;
   }
 
-  public void setDeviceToMeasurementsMap(
-      Map<String, Set<String>> deviceToMeasurementsMap) {
-    this.deviceToMeasurementsMap = deviceToMeasurementsMap;
+  public void setDevices(List<String> devices) {
+    this.devices = devices;
   }
 
-  public Map<String, Set<String>> getDeviceToMeasurementsMap() {
-    return deviceToMeasurementsMap;
+  public List<String> getDevices() {
+    return devices;
   }
 
   public void setMeasurementDataTypeMap(
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index 5c6aab7..85c238d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -18,9 +18,7 @@
  */
 package org.apache.iotdb.db.qp.physical.crud;
 
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -30,7 +28,6 @@ public abstract class QueryPlan extends PhysicalPlan {
 
   private List<Path> paths = null;
   private List<TSDataType> dataTypes = null;
-  private Map<Path, TSDataType> dataTypeMapping = new HashMap<>();
   private boolean alignByTime = true; // for disable align sql
 
   private int rowLimit = 0;
@@ -90,12 +87,4 @@ public abstract class QueryPlan extends PhysicalPlan {
     alignByTime = align;
   }
 
-  public Map<Path, TSDataType> getDataTypeMapping() {
-    return dataTypeMapping;
-  }
-
-  public void addTypeMapping(Path path, TSDataType dataType) {
-    dataTypeMapping.put(path, dataType);
-  }
-
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 4e06220..af386aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.qp.strategy;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -275,7 +274,6 @@ public class PhysicalGenerator {
 
       // to record result measurement columns
       List<String> measurements = new ArrayList<>();
-      Map<String, Set<String>> deviceToMeasurementsMap = new LinkedHashMap<>();
       // to check the same measurement of different devices having the same datatype
       Map<String, TSDataType> measurementDataTypeMap = new HashMap<>();
       Map<String, MeasurementType> measurementTypeMap = new HashMap<>();
@@ -340,11 +338,6 @@ public class PhysicalGenerator {
                   || measurementTypeMap.get(measurementChecked) != MeasurementType.Exist) {
                 measurementTypeMap.put(measurementChecked, MeasurementType.Exist);
               }
-              // update deviceToMeasurementsMap
-              if (!deviceToMeasurementsMap.containsKey(device)) {
-                deviceToMeasurementsMap.put(device, new HashSet<>());
-              }
-              deviceToMeasurementsMap.get(device).add(measurementChecked);
               // update paths
               paths.add(path);
             }
@@ -375,7 +368,7 @@ public class PhysicalGenerator {
 
       // assigns to alignByDevicePlan
       alignByDevicePlan.setMeasurements(measurements);
-      alignByDevicePlan.setDeviceToMeasurementsMap(deviceToMeasurementsMap);
+      alignByDevicePlan.setDevices(devices);
       alignByDevicePlan.setMeasurementDataTypeMap(measurementDataTypeMap);
       alignByDevicePlan.setMeasurementTypeMap(measurementTypeMap);
       alignByDevicePlan.setPaths(paths);
@@ -411,11 +404,10 @@ public class PhysicalGenerator {
       }
     }
     try {
-      generateDataTypes(queryPlan);
+      deduplicate(queryPlan);
     } catch (MetadataException e) {
       throw new QueryProcessException(e);
     }
-    deduplicate(queryPlan);
 
     queryPlan.setRowLimit(queryOperator.getRowLimit());
     queryPlan.setRowOffset(queryOperator.getRowOffset());
@@ -490,35 +482,42 @@ public class PhysicalGenerator {
     basicOperator.setSinglePath(concatPath);
   }
 
-  private void generateDataTypes(QueryPlan queryPlan) throws MetadataException {
+  private void deduplicate(QueryPlan queryPlan) throws MetadataException {
+    // generate dataType first
     List<Path> paths = queryPlan.getPaths();
     List<TSDataType> dataTypes = getSeriesTypes(paths);
-    for (int i = 0; i < paths.size(); i++) {
-      Path path = paths.get(i);
-      TSDataType dataType = dataTypes.get(i);
-      queryPlan.addTypeMapping(path, dataType);
-    }
     queryPlan.setDataTypes(dataTypes);
-  }
 
-  private void deduplicate(QueryPlan queryPlan) {
+    // deduplicate from here
     if (queryPlan instanceof AlignByDevicePlan) {
       return;
     }
     if (queryPlan instanceof AggregationPlan) {
       AggregationPlan aggregationPlan = (AggregationPlan) queryPlan;
-      deduplicateAggregation(aggregationPlan);
+      List<String> aggregations = aggregationPlan.getAggregations();
+
+      Set<String> columnSet = new HashSet<>();
+      for (int i = 0; i < paths.size(); i++) {
+        Path path = paths.get(i);
+        String column = aggregations.get(i) + "(" + path.toString() + ")";
+        if (!columnSet.contains(column)) {
+          aggregationPlan.addDeduplicatedPaths(path);
+          TSDataType seriesType = dataTypes.get(i);
+          aggregationPlan.addDeduplicatedDataTypes(seriesType);
+          aggregationPlan.addDeduplicatedAggregations(aggregations.get(i));
+          columnSet.add(column);
+        }
+      }
       return;
     }
     RawDataQueryPlan rawDataQueryPlan = (RawDataQueryPlan) queryPlan;
-    List<Path> paths = queryPlan.getPaths();
 
     Set<String> columnSet = new HashSet<>();
-    Map<Path, TSDataType> dataTypeMapping = queryPlan.getDataTypeMapping();
-    for (Path path : paths) {
+    for (int i = 0; i < paths.size(); i++) {
+      Path path = paths.get(i);
       String column = path.toString();
       if (!columnSet.contains(column)) {
-        TSDataType seriesType = dataTypeMapping.get(path);
+        TSDataType seriesType = dataTypes.get(i);
         rawDataQueryPlan.addDeduplicatedPaths(path);
         rawDataQueryPlan.addDeduplicatedDataTypes(seriesType);
         columnSet.add(column);
@@ -543,26 +542,6 @@ public class PhysicalGenerator {
     return new ArrayList<>(columnList.subList(seriesOffset, endPosition));
   }
 
-
-  private void deduplicateAggregation(AggregationPlan queryPlan) {
-    List<Path> paths = queryPlan.getPaths();
-    List<String> aggregations = queryPlan.getAggregations();
-
-    Set<String> columnSet = new HashSet<>();
-    Map<Path, TSDataType> dataTypeMapping = queryPlan.getDataTypeMapping();
-    for (int i = 0; i < paths.size(); i++) {
-      Path path = paths.get(i);
-      String column = aggregations.get(i) + "(" + path.toString() + ")";
-      if (!columnSet.contains(column)) {
-        queryPlan.addDeduplicatedPaths(path);
-        TSDataType seriesType = dataTypeMapping.get(path);
-        queryPlan.addDeduplicatedDataTypes(seriesType);
-        queryPlan.addDeduplicatedAggregations(aggregations.get(i));
-        columnSet.add(column);
-      }
-    }
-  }
-
   protected List<String> getMatchedTimeseries(String path) throws MetadataException {
     return MManager.getInstance().getAllTimeseriesName(path);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
index a2c3897..5b4e9b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
@@ -26,7 +26,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
 import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
@@ -56,10 +59,10 @@ public class AlignByDeviceDataSet extends QueryDataSet {
   private IExpression expression;
 
   private List<String> measurements;
-  private Map<String, Set<String>> deviceToMeasurementsMap;
+  private List<String> devices;
   private Map<String, IExpression> deviceToFilterMap;
   private Map<String, MeasurementType> measurementTypeMap;
-
+  private Map<String, TSDataType> measurementDataTpeMap;
 
   private GroupByPlan groupByPlan;
   private FillQueryPlan fillQueryPlan;
@@ -67,10 +70,9 @@ public class AlignByDeviceDataSet extends QueryDataSet {
   private RawDataQueryPlan rawDataQueryPlan;
 
   private boolean curDataSetInitialized;
-  private Iterator<String> deviceIterator;
   private String currentDevice;
   private QueryDataSet currentDataSet;
-  private Map<Path, TSDataType> tsDataTypeMap;
+  private Iterator<String> deviceIterator;
   private List<String> executeColumns;
 
   public AlignByDeviceDataSet(AlignByDevicePlan alignByDevicePlan, QueryContext context,
@@ -78,10 +80,10 @@ public class AlignByDeviceDataSet extends QueryDataSet {
     super(null, alignByDevicePlan.getDataTypes());
 
     this.measurements = alignByDevicePlan.getMeasurements();
-    this.tsDataTypeMap = alignByDevicePlan.getDataTypeMapping();
+    this.devices = alignByDevicePlan.getDevices();
+    this.measurementDataTpeMap = alignByDevicePlan.getMeasurementDataTypeMap();
     this.queryRouter = queryRouter;
     this.context = context;
-    this.deviceToMeasurementsMap = alignByDevicePlan.getDeviceToMeasurementsMap();
     this.deviceToFilterMap = alignByDevicePlan.getDeviceToFilterMap();
     this.measurementTypeMap = alignByDevicePlan.getMeasurementTypeMap();
 
@@ -104,7 +106,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
     }
 
     this.curDataSetInitialized = false;
-    this.deviceIterator = deviceToMeasurementsMap.keySet().iterator();
+    this.deviceIterator = devices.iterator();
   }
 
   protected boolean hasNextWithoutConstraint() throws IOException {
@@ -116,25 +118,32 @@ public class AlignByDeviceDataSet extends QueryDataSet {
 
     while (deviceIterator.hasNext()) {
       currentDevice = deviceIterator.next();
-      Set<String> measurementColumnsOfGivenDevice = deviceToMeasurementsMap
-          .get(currentDevice);
-      executeColumns = new ArrayList<>(measurementColumnsOfGivenDevice);
-
-      // extract paths and aggregations if exist from executeColumns
+      // get all measurements of current device
+      Set<String> measurementOfGivenDevice;
+      try {
+        MNode deviceNode = MManager.getInstance().getNodeByPath(currentDevice);
+        measurementOfGivenDevice = deviceNode.getChildren().keySet();
+      } catch (MetadataException e) {
+        throw new IOException("Cannot get node from " + currentDevice);
+      }
+      // extract paths and aggregations queried from all measurements
+      // executeColumns is for calculating rowRecord
+      executeColumns = new ArrayList<>();
       List<Path> executePaths = new ArrayList<>();
       List<TSDataType> tsDataTypes = new ArrayList<>();
       List<String> executeAggregations = new ArrayList<>();
-      for (String column : executeColumns) {
+      for (String column : measurementDataTpeMap.keySet()) {
+        String measurement = column;
         if (dataSetType == DataSetType.GROUPBY || dataSetType == DataSetType.AGGREGATE) {
-          Path path = new Path(currentDevice,
-              column.substring(column.indexOf('(') + 1, column.indexOf(')')));
-          tsDataTypes.add(tsDataTypeMap.get(path));
-          executePaths.add(path);
-          executeAggregations.add(column.substring(0, column.indexOf('(')));
-        } else {
-          Path path = new Path(currentDevice, column);
-          tsDataTypes.add(tsDataTypeMap.get(path));
-          executePaths.add(path);
+          measurement = column.substring(column.indexOf('(') + 1, column.indexOf(')'));
+          if (measurementOfGivenDevice.contains(measurement)) {
+            executeAggregations.add(column.substring(0, column.indexOf('(')));
+          }
+        }
+        if (measurementOfGivenDevice.contains(measurement)) {
+          executeColumns.add(column);
+          executePaths.add(new Path(currentDevice, measurement));
+          tsDataTypes.add(measurementDataTpeMap.get(column));
         }
       }