You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/10/19 11:13:06 UTC

[iotdb] 05/12: add OperatorTreeGenerator for IntoOperator

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit aeff0ab7a9c32d2469a1f8362543b358dd025003
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Oct 18 10:00:12 2022 +0800

    add OperatorTreeGenerator for IntoOperator
---
 .../execution/operator/process/IntoOperator.java   | 26 +++++------
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 52 ++++++++++++++++++++++
 .../plan/planner/plan/node/process/IntoNode.java   |  4 ++
 3 files changed, 69 insertions(+), 13 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 1251bb3e50..7dc26ddc28 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -55,39 +55,39 @@ public class IntoOperator implements ProcessOperator {
   private final Operator child;
 
   private final List<InsertTabletStatementGenerator> insertTabletStatementGenerators;
-  private final List<Pair<String, String>> sourceTargetPathPairList;
+  private final List<Pair<String, PartialPath>> sourceTargetPathPairList;
   private final Map<String, InputLocation> sourceColumnToInputLocationMap;
 
   public IntoOperator(
       OperatorContext operatorContext,
       Operator child,
-      Map<PartialPath, Map<String, InputLocation>> targetPathToSourceMap,
+      Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
       Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
-      Map<PartialPath, Boolean> targetDeviceToAlignedMap,
-      List<Pair<String, String>> sourceTargetPathPairList,
+      Map<String, Boolean> targetDeviceToAlignedMap,
+      List<Pair<String, PartialPath>> sourceTargetPathPairList,
       Map<String, InputLocation> sourceColumnToInputLocationMap) {
     this.operatorContext = operatorContext;
     this.child = child;
     this.insertTabletStatementGenerators =
         constructInsertTabletStatementGenerators(
-            targetPathToSourceMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
+            targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
     this.sourceTargetPathPairList = sourceTargetPathPairList;
     this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap;
   }
 
   private List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators(
-      Map<PartialPath, Map<String, InputLocation>> targetPathToSourceMap,
+      Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
       Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
-      Map<PartialPath, Boolean> targetDeviceToAlignedMap) {
+      Map<String, Boolean> targetDeviceToAlignedMap) {
     List<InsertTabletStatementGenerator> insertTabletStatementGenerators =
-        new ArrayList<>(targetPathToSourceMap.size());
-    for (PartialPath targetDevice : targetPathToSourceMap.keySet()) {
+        new ArrayList<>(targetPathToSourceInputLocationMap.size());
+    for (PartialPath targetDevice : targetPathToSourceInputLocationMap.keySet()) {
       InsertTabletStatementGenerator generator =
           new InsertTabletStatementGenerator(
               targetDevice,
-              targetPathToSourceMap.get(targetDevice),
+              targetPathToSourceInputLocationMap.get(targetDevice),
               targetPathToDataTypeMap.get(targetDevice),
-              targetDeviceToAlignedMap.get(targetDevice));
+              targetDeviceToAlignedMap.get(targetDevice.toString()));
       insertTabletStatementGenerators.add(generator);
     }
     return insertTabletStatementGenerators;
@@ -150,10 +150,10 @@ public class IntoOperator implements ProcessOperator {
     TsBlockBuilder resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
     TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
     ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
-    for (Pair<String, String> sourceTargetPathPair : sourceTargetPathPairList) {
+    for (Pair<String, PartialPath> sourceTargetPathPair : sourceTargetPathPairList) {
       timeColumnBuilder.writeLong(0);
       columnBuilders[0].writeBinary(new Binary(sourceTargetPathPair.left));
-      columnBuilders[1].writeBinary(new Binary(sourceTargetPathPair.right));
+      columnBuilders[1].writeBinary(new Binary(sourceTargetPathPair.right.toString()));
       columnBuilders[2].writeInt(findWritten(sourceTargetPathPair.left));
       resultTsBlockBuilder.declarePosition();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index f3de5e72d4..1dcf871373 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FilterAndProjectOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.IntoOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
@@ -137,6 +138,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.IntoNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
@@ -158,6 +160,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregatio
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OutputColumn;
 import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
@@ -1336,6 +1339,55 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     return super.visitSort(node, context);
   }
 
+  @Override
+  public Operator visitInto(IntoNode node, LocalExecutionPlanContext context) {
+    Operator child = node.getChild().accept(this, context);
+    OperatorContext operatorContext =
+        context
+            .getInstanceContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                IntoOperator.class.getSimpleName());
+
+    IntoPathDescriptor intoPathDescriptor = node.getIntoPathDescriptor();
+    Map<String, List<InputLocation>> layout = makeLayout(node);
+    Map<String, InputLocation> sourceColumnToInputLocationMap = new HashMap<>();
+    for (Map.Entry<String, List<InputLocation>> layoutEntry : layout.entrySet()) {
+      sourceColumnToInputLocationMap.put(layoutEntry.getKey(), layoutEntry.getValue().get(0));
+    }
+
+    Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap =
+        new HashMap<>();
+    Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap = new HashMap<>();
+    Map<PartialPath, Map<String, String>> targetPathToSourceMap =
+        intoPathDescriptor.getTargetPathToSourceMap();
+    for (Map.Entry<PartialPath, Map<String, String>> entry : targetPathToSourceMap.entrySet()) {
+      PartialPath targetDevice = entry.getKey();
+      Map<String, InputLocation> measurementToInputLocationMap = new HashMap<>();
+      Map<String, TSDataType> measurementToDataTypeMap = new HashMap<>();
+      for (Map.Entry<String, String> measurementEntry : entry.getValue().entrySet()) {
+        String targetMeasurement = measurementEntry.getKey();
+        String sourceColumn = measurementEntry.getValue();
+        measurementToInputLocationMap.put(
+            targetMeasurement, sourceColumnToInputLocationMap.get(sourceColumn));
+        measurementToDataTypeMap.put(
+            targetMeasurement, context.getTypeProvider().getType(sourceColumn));
+      }
+      targetPathToSourceInputLocationMap.put(targetDevice, measurementToInputLocationMap);
+      targetPathToDataTypeMap.put(targetDevice, measurementToDataTypeMap);
+    }
+
+    return new IntoOperator(
+        operatorContext,
+        child,
+        targetPathToSourceInputLocationMap,
+        targetPathToDataTypeMap,
+        intoPathDescriptor.getTargetDeviceToAlignedMap(),
+        intoPathDescriptor.getSourceTargetPathPairList(),
+        sourceColumnToInputLocationMap);
+  }
+
   @Override
   public Operator visitTimeJoin(TimeJoinNode node, LocalExecutionPlanContext context) {
     List<Operator> children =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java
index b68e862a7a..73926bd8cc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java
@@ -48,6 +48,10 @@ public class IntoNode extends SingleChildProcessNode {
     this.intoPathDescriptor = intoPathDescriptor;
   }
 
+  public IntoPathDescriptor getIntoPathDescriptor() {
+    return intoPathDescriptor;
+  }
+
   @Override
   public PlanNode clone() {
     return new IntoNode(getPlanNodeId(), this.intoPathDescriptor);