You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/10/19 11:13:06 UTC
[iotdb] 05/12: add OperatorTreeGenerator for IntoOperator
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit aeff0ab7a9c32d2469a1f8362543b358dd025003
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Oct 18 10:00:12 2022 +0800
add OperatorTreeGenerator for IntoOperator
---
.../execution/operator/process/IntoOperator.java | 26 +++++------
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 52 ++++++++++++++++++++++
.../plan/planner/plan/node/process/IntoNode.java | 4 ++
3 files changed, 69 insertions(+), 13 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 1251bb3e50..7dc26ddc28 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -55,39 +55,39 @@ public class IntoOperator implements ProcessOperator {
private final Operator child;
private final List<InsertTabletStatementGenerator> insertTabletStatementGenerators;
- private final List<Pair<String, String>> sourceTargetPathPairList;
+ private final List<Pair<String, PartialPath>> sourceTargetPathPairList;
private final Map<String, InputLocation> sourceColumnToInputLocationMap;
public IntoOperator(
OperatorContext operatorContext,
Operator child,
- Map<PartialPath, Map<String, InputLocation>> targetPathToSourceMap,
+ Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
- Map<PartialPath, Boolean> targetDeviceToAlignedMap,
- List<Pair<String, String>> sourceTargetPathPairList,
+ Map<String, Boolean> targetDeviceToAlignedMap,
+ List<Pair<String, PartialPath>> sourceTargetPathPairList,
Map<String, InputLocation> sourceColumnToInputLocationMap) {
this.operatorContext = operatorContext;
this.child = child;
this.insertTabletStatementGenerators =
constructInsertTabletStatementGenerators(
- targetPathToSourceMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
+ targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
this.sourceTargetPathPairList = sourceTargetPathPairList;
this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap;
}
private List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators(
- Map<PartialPath, Map<String, InputLocation>> targetPathToSourceMap,
+ Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
- Map<PartialPath, Boolean> targetDeviceToAlignedMap) {
+ Map<String, Boolean> targetDeviceToAlignedMap) {
List<InsertTabletStatementGenerator> insertTabletStatementGenerators =
- new ArrayList<>(targetPathToSourceMap.size());
- for (PartialPath targetDevice : targetPathToSourceMap.keySet()) {
+ new ArrayList<>(targetPathToSourceInputLocationMap.size());
+ for (PartialPath targetDevice : targetPathToSourceInputLocationMap.keySet()) {
InsertTabletStatementGenerator generator =
new InsertTabletStatementGenerator(
targetDevice,
- targetPathToSourceMap.get(targetDevice),
+ targetPathToSourceInputLocationMap.get(targetDevice),
targetPathToDataTypeMap.get(targetDevice),
- targetDeviceToAlignedMap.get(targetDevice));
+ targetDeviceToAlignedMap.get(targetDevice.toString()));
insertTabletStatementGenerators.add(generator);
}
return insertTabletStatementGenerators;
@@ -150,10 +150,10 @@ public class IntoOperator implements ProcessOperator {
TsBlockBuilder resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
- for (Pair<String, String> sourceTargetPathPair : sourceTargetPathPairList) {
+ for (Pair<String, PartialPath> sourceTargetPathPair : sourceTargetPathPairList) {
timeColumnBuilder.writeLong(0);
columnBuilders[0].writeBinary(new Binary(sourceTargetPathPair.left));
- columnBuilders[1].writeBinary(new Binary(sourceTargetPathPair.right));
+ columnBuilders[1].writeBinary(new Binary(sourceTargetPathPair.right.toString()));
columnBuilders[2].writeInt(findWritten(sourceTargetPathPair.left));
resultTsBlockBuilder.declarePosition();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index f3de5e72d4..1dcf871373 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.FilterAndProjectOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.IntoOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
@@ -137,6 +138,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.IntoNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
@@ -158,6 +160,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregatio
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OutputColumn;
import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
@@ -1336,6 +1339,55 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
return super.visitSort(node, context);
}
+ @Override
+ public Operator visitInto(IntoNode node, LocalExecutionPlanContext context) {
+ Operator child = node.getChild().accept(this, context);
+ OperatorContext operatorContext =
+ context
+ .getInstanceContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ IntoOperator.class.getSimpleName());
+
+ IntoPathDescriptor intoPathDescriptor = node.getIntoPathDescriptor();
+ Map<String, List<InputLocation>> layout = makeLayout(node);
+ Map<String, InputLocation> sourceColumnToInputLocationMap = new HashMap<>();
+ for (Map.Entry<String, List<InputLocation>> layoutEntry : layout.entrySet()) {
+ sourceColumnToInputLocationMap.put(layoutEntry.getKey(), layoutEntry.getValue().get(0));
+ }
+
+ Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap =
+ new HashMap<>();
+ Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap = new HashMap<>();
+ Map<PartialPath, Map<String, String>> targetPathToSourceMap =
+ intoPathDescriptor.getTargetPathToSourceMap();
+ for (Map.Entry<PartialPath, Map<String, String>> entry : targetPathToSourceMap.entrySet()) {
+ PartialPath targetDevice = entry.getKey();
+ Map<String, InputLocation> measurementToInputLocationMap = new HashMap<>();
+ Map<String, TSDataType> measurementToDataTypeMap = new HashMap<>();
+ for (Map.Entry<String, String> measurementEntry : entry.getValue().entrySet()) {
+ String targetMeasurement = measurementEntry.getKey();
+ String sourceColumn = measurementEntry.getValue();
+ measurementToInputLocationMap.put(
+ targetMeasurement, sourceColumnToInputLocationMap.get(sourceColumn));
+ measurementToDataTypeMap.put(
+ targetMeasurement, context.getTypeProvider().getType(sourceColumn));
+ }
+ targetPathToSourceInputLocationMap.put(targetDevice, measurementToInputLocationMap);
+ targetPathToDataTypeMap.put(targetDevice, measurementToDataTypeMap);
+ }
+
+ return new IntoOperator(
+ operatorContext,
+ child,
+ targetPathToSourceInputLocationMap,
+ targetPathToDataTypeMap,
+ intoPathDescriptor.getTargetDeviceToAlignedMap(),
+ intoPathDescriptor.getSourceTargetPathPairList(),
+ sourceColumnToInputLocationMap);
+ }
+
@Override
public Operator visitTimeJoin(TimeJoinNode node, LocalExecutionPlanContext context) {
List<Operator> children =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java
index b68e862a7a..73926bd8cc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java
@@ -48,6 +48,10 @@ public class IntoNode extends SingleChildProcessNode {
this.intoPathDescriptor = intoPathDescriptor;
}
+ public IntoPathDescriptor getIntoPathDescriptor() {
+ return intoPathDescriptor;
+ }
+
@Override
public PlanNode clone() {
return new IntoNode(getPlanNodeId(), this.intoPathDescriptor);