You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/10/19 11:13:10 UTC

[iotdb] 09/12: OperatorTreeGenerator visitDeviceViewInto

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 41f8243a0d09e66aed2e3b8496f608c0dda2de01
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Oct 19 14:45:15 2022 +0800

    OperatorTreeGenerator  visitDeviceViewInto
---
 .../operator/process/AbstractIntoOperator.java     |   3 +-
 .../operator/process/DeviceViewIntoOperator.java   |   4 +
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 106 +++++++++++++++++----
 .../plan/node/process/DeviceViewIntoNode.java      |   4 +
 4 files changed, 98 insertions(+), 19 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index d038874d0b..28b90daae7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -82,7 +82,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
   }
 
   protected void insertMultiTabletsInternally(boolean needCheck) {
-    if ((needCheck && !insertTabletStatementGenerators.get(0).isFull())
+    if (insertTabletStatementGenerators == null
+        || (needCheck && !insertTabletStatementGenerators.get(0).isFull())
         || insertTabletStatementGenerators.get(0).isEmpty()) {
       return;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index bf1a0920fa..f0eab0b71a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -105,6 +105,10 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
   }
 
   private void updateResultTsBlock() {
+    if (currentDevice == null) {
+      return;
+    }
+
     TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
     ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
     for (Pair<String, PartialPath> sourceTargetPathPair :
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index e91d67b973..8fb6a0387e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.process.AggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewIntoOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FilterAndProjectOperator;
@@ -132,6 +133,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesCo
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewIntoNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
@@ -157,6 +159,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSc
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
@@ -1351,17 +1354,95 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                 IntoOperator.class.getSimpleName());
 
     IntoPathDescriptor intoPathDescriptor = node.getIntoPathDescriptor();
-    Map<String, List<InputLocation>> layout = makeLayout(node);
+    Map<String, InputLocation> sourceColumnToInputLocationMap =
+        constructSourceColumnToInputLocationMap(node);
+
+    Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap =
+        new HashMap<>();
+    Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap = new HashMap<>();
+    processTargetPathToSourceMap(
+        intoPathDescriptor.getTargetPathToSourceMap(),
+        targetPathToSourceInputLocationMap,
+        targetPathToDataTypeMap,
+        sourceColumnToInputLocationMap,
+        context.getTypeProvider());
+
+    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+    return new IntoOperator(
+        operatorContext,
+        child,
+        targetPathToSourceInputLocationMap,
+        targetPathToDataTypeMap,
+        intoPathDescriptor.getTargetDeviceToAlignedMap(),
+        intoPathDescriptor.getSourceTargetPathPairList(),
+        sourceColumnToInputLocationMap);
+  }
+
+  @Override
+  public Operator visitDeviceViewInto(DeviceViewIntoNode node, LocalExecutionPlanContext context) {
+    Operator child = node.getChild().accept(this, context);
+    OperatorContext operatorContext =
+        context
+            .getInstanceContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                DeviceViewIntoOperator.class.getSimpleName());
+
+    DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor =
+        node.getDeviceViewIntoPathDescriptor();
+    Map<String, InputLocation> sourceColumnToInputLocationMap =
+        constructSourceColumnToInputLocationMap(node);
+
+    Map<String, Map<PartialPath, Map<String, InputLocation>>>
+        deviceToTargetPathSourceInputLocationMap = new HashMap<>();
+    Map<String, Map<PartialPath, Map<String, TSDataType>>> deviceToTargetPathDataTypeMap =
+        new HashMap<>();
+    Map<String, Map<PartialPath, Map<String, String>>> sourceDeviceToTargetPathMap =
+        deviceViewIntoPathDescriptor.getSourceDeviceToTargetPathMap();
+    for (Map.Entry<String, Map<PartialPath, Map<String, String>>> deviceEntry :
+        sourceDeviceToTargetPathMap.entrySet()) {
+      String sourceDevice = deviceEntry.getKey();
+      Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap =
+          new HashMap<>();
+      Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap = new HashMap<>();
+      processTargetPathToSourceMap(
+          deviceEntry.getValue(),
+          targetPathToSourceInputLocationMap,
+          targetPathToDataTypeMap,
+          sourceColumnToInputLocationMap,
+          context.getTypeProvider());
+      deviceToTargetPathSourceInputLocationMap.put(
+          sourceDevice, targetPathToSourceInputLocationMap);
+      deviceToTargetPathDataTypeMap.put(sourceDevice, targetPathToDataTypeMap);
+    }
+
+    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+    return new DeviceViewIntoOperator(
+        operatorContext,
+        child,
+        deviceToTargetPathSourceInputLocationMap,
+        deviceToTargetPathDataTypeMap,
+        deviceViewIntoPathDescriptor.getTargetDeviceToAlignedMap(),
+        deviceViewIntoPathDescriptor.getDeviceToSourceTargetPathPairListMap(),
+        sourceColumnToInputLocationMap);
+  }
+
+  private Map<String, InputLocation> constructSourceColumnToInputLocationMap(PlanNode node) {
     Map<String, InputLocation> sourceColumnToInputLocationMap = new HashMap<>();
+    Map<String, List<InputLocation>> layout = makeLayout(node);
     for (Map.Entry<String, List<InputLocation>> layoutEntry : layout.entrySet()) {
       sourceColumnToInputLocationMap.put(layoutEntry.getKey(), layoutEntry.getValue().get(0));
     }
+    return sourceColumnToInputLocationMap;
+  }
 
-    Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap =
-        new HashMap<>();
-    Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap = new HashMap<>();
-    Map<PartialPath, Map<String, String>> targetPathToSourceMap =
-        intoPathDescriptor.getTargetPathToSourceMap();
+  private void processTargetPathToSourceMap(
+      Map<PartialPath, Map<String, String>> targetPathToSourceMap,
+      Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
+      Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
+      Map<String, InputLocation> sourceColumnToInputLocationMap,
+      TypeProvider typeProvider) {
     for (Map.Entry<PartialPath, Map<String, String>> entry : targetPathToSourceMap.entrySet()) {
       PartialPath targetDevice = entry.getKey();
       Map<String, InputLocation> measurementToInputLocationMap = new HashMap<>();
@@ -1371,22 +1452,11 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         String sourceColumn = measurementEntry.getValue();
         measurementToInputLocationMap.put(
             targetMeasurement, sourceColumnToInputLocationMap.get(sourceColumn));
-        measurementToDataTypeMap.put(
-            targetMeasurement, context.getTypeProvider().getType(sourceColumn));
+        measurementToDataTypeMap.put(targetMeasurement, typeProvider.getType(sourceColumn));
       }
       targetPathToSourceInputLocationMap.put(targetDevice, measurementToInputLocationMap);
       targetPathToDataTypeMap.put(targetDevice, measurementToDataTypeMap);
     }
-
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
-    return new IntoOperator(
-        operatorContext,
-        child,
-        targetPathToSourceInputLocationMap,
-        targetPathToDataTypeMap,
-        intoPathDescriptor.getTargetDeviceToAlignedMap(),
-        intoPathDescriptor.getSourceTargetPathPairList(),
-        sourceColumnToInputLocationMap);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java
index 987cbc18d2..8c5055982c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java
@@ -50,6 +50,10 @@ public class DeviceViewIntoNode extends SingleChildProcessNode {
     this.deviceViewIntoPathDescriptor = deviceViewIntoPathDescriptor;
   }
 
+  public DeviceViewIntoPathDescriptor getDeviceViewIntoPathDescriptor() {
+    return deviceViewIntoPathDescriptor;
+  }
+
   @Override
   public PlanNode clone() {
     return new DeviceViewIntoNode(getPlanNodeId(), this.deviceViewIntoPathDescriptor);