You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/10/19 11:13:10 UTC
[iotdb] 09/12: OperatorTreeGenerator visitDeviceViewInto
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 41f8243a0d09e66aed2e3b8496f608c0dda2de01
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Oct 19 14:45:15 2022 +0800
OperatorTreeGenerator visitDeviceViewInto
---
.../operator/process/AbstractIntoOperator.java | 3 +-
.../operator/process/DeviceViewIntoOperator.java | 4 +
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 106 +++++++++++++++++----
.../plan/node/process/DeviceViewIntoNode.java | 4 +
4 files changed, 98 insertions(+), 19 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index d038874d0b..28b90daae7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -82,7 +82,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
}
protected void insertMultiTabletsInternally(boolean needCheck) {
- if ((needCheck && !insertTabletStatementGenerators.get(0).isFull())
+ if (insertTabletStatementGenerators == null
+ || (needCheck && !insertTabletStatementGenerators.get(0).isFull())
|| insertTabletStatementGenerators.get(0).isEmpty()) {
return;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index bf1a0920fa..f0eab0b71a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -105,6 +105,10 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
}
private void updateResultTsBlock() {
+ if (currentDevice == null) {
+ return;
+ }
+
TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
for (Pair<String, PartialPath> sourceTargetPathPair :
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index e91d67b973..8fb6a0387e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.operator.process.AggregationOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewIntoOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.FilterAndProjectOperator;
@@ -132,6 +133,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesCo
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewIntoNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
@@ -157,6 +159,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSc
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
@@ -1351,17 +1354,95 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
IntoOperator.class.getSimpleName());
IntoPathDescriptor intoPathDescriptor = node.getIntoPathDescriptor();
- Map<String, List<InputLocation>> layout = makeLayout(node);
+ Map<String, InputLocation> sourceColumnToInputLocationMap =
+ constructSourceColumnToInputLocationMap(node);
+
+ Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap =
+ new HashMap<>();
+ Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap = new HashMap<>();
+ processTargetPathToSourceMap(
+ intoPathDescriptor.getTargetPathToSourceMap(),
+ targetPathToSourceInputLocationMap,
+ targetPathToDataTypeMap,
+ sourceColumnToInputLocationMap,
+ context.getTypeProvider());
+
+ context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+ return new IntoOperator(
+ operatorContext,
+ child,
+ targetPathToSourceInputLocationMap,
+ targetPathToDataTypeMap,
+ intoPathDescriptor.getTargetDeviceToAlignedMap(),
+ intoPathDescriptor.getSourceTargetPathPairList(),
+ sourceColumnToInputLocationMap);
+ }
+
+ @Override
+ public Operator visitDeviceViewInto(DeviceViewIntoNode node, LocalExecutionPlanContext context) {
+ Operator child = node.getChild().accept(this, context);
+ OperatorContext operatorContext =
+ context
+ .getInstanceContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ DeviceViewIntoOperator.class.getSimpleName());
+
+ DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor =
+ node.getDeviceViewIntoPathDescriptor();
+ Map<String, InputLocation> sourceColumnToInputLocationMap =
+ constructSourceColumnToInputLocationMap(node);
+
+ Map<String, Map<PartialPath, Map<String, InputLocation>>>
+ deviceToTargetPathSourceInputLocationMap = new HashMap<>();
+ Map<String, Map<PartialPath, Map<String, TSDataType>>> deviceToTargetPathDataTypeMap =
+ new HashMap<>();
+ Map<String, Map<PartialPath, Map<String, String>>> sourceDeviceToTargetPathMap =
+ deviceViewIntoPathDescriptor.getSourceDeviceToTargetPathMap();
+ for (Map.Entry<String, Map<PartialPath, Map<String, String>>> deviceEntry :
+ sourceDeviceToTargetPathMap.entrySet()) {
+ String sourceDevice = deviceEntry.getKey();
+ Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap =
+ new HashMap<>();
+ Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap = new HashMap<>();
+ processTargetPathToSourceMap(
+ deviceEntry.getValue(),
+ targetPathToSourceInputLocationMap,
+ targetPathToDataTypeMap,
+ sourceColumnToInputLocationMap,
+ context.getTypeProvider());
+ deviceToTargetPathSourceInputLocationMap.put(
+ sourceDevice, targetPathToSourceInputLocationMap);
+ deviceToTargetPathDataTypeMap.put(sourceDevice, targetPathToDataTypeMap);
+ }
+
+ context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+ return new DeviceViewIntoOperator(
+ operatorContext,
+ child,
+ deviceToTargetPathSourceInputLocationMap,
+ deviceToTargetPathDataTypeMap,
+ deviceViewIntoPathDescriptor.getTargetDeviceToAlignedMap(),
+ deviceViewIntoPathDescriptor.getDeviceToSourceTargetPathPairListMap(),
+ sourceColumnToInputLocationMap);
+ }
+
+ private Map<String, InputLocation> constructSourceColumnToInputLocationMap(PlanNode node) {
Map<String, InputLocation> sourceColumnToInputLocationMap = new HashMap<>();
+ Map<String, List<InputLocation>> layout = makeLayout(node);
for (Map.Entry<String, List<InputLocation>> layoutEntry : layout.entrySet()) {
sourceColumnToInputLocationMap.put(layoutEntry.getKey(), layoutEntry.getValue().get(0));
}
+ return sourceColumnToInputLocationMap;
+ }
- Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap =
- new HashMap<>();
- Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap = new HashMap<>();
- Map<PartialPath, Map<String, String>> targetPathToSourceMap =
- intoPathDescriptor.getTargetPathToSourceMap();
+ private void processTargetPathToSourceMap(
+ Map<PartialPath, Map<String, String>> targetPathToSourceMap,
+ Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
+ Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
+ Map<String, InputLocation> sourceColumnToInputLocationMap,
+ TypeProvider typeProvider) {
for (Map.Entry<PartialPath, Map<String, String>> entry : targetPathToSourceMap.entrySet()) {
PartialPath targetDevice = entry.getKey();
Map<String, InputLocation> measurementToInputLocationMap = new HashMap<>();
@@ -1371,22 +1452,11 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
String sourceColumn = measurementEntry.getValue();
measurementToInputLocationMap.put(
targetMeasurement, sourceColumnToInputLocationMap.get(sourceColumn));
- measurementToDataTypeMap.put(
- targetMeasurement, context.getTypeProvider().getType(sourceColumn));
+ measurementToDataTypeMap.put(targetMeasurement, typeProvider.getType(sourceColumn));
}
targetPathToSourceInputLocationMap.put(targetDevice, measurementToInputLocationMap);
targetPathToDataTypeMap.put(targetDevice, measurementToDataTypeMap);
}
-
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
- return new IntoOperator(
- operatorContext,
- child,
- targetPathToSourceInputLocationMap,
- targetPathToDataTypeMap,
- intoPathDescriptor.getTargetDeviceToAlignedMap(),
- intoPathDescriptor.getSourceTargetPathPairList(),
- sourceColumnToInputLocationMap);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java
index 987cbc18d2..8c5055982c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java
@@ -50,6 +50,10 @@ public class DeviceViewIntoNode extends SingleChildProcessNode {
this.deviceViewIntoPathDescriptor = deviceViewIntoPathDescriptor;
}
+ public DeviceViewIntoPathDescriptor getDeviceViewIntoPathDescriptor() {
+ return deviceViewIntoPathDescriptor;
+ }
+
@Override
public PlanNode clone() {
return new DeviceViewIntoNode(getPlanNodeId(), this.deviceViewIntoPathDescriptor);