You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/05/16 04:17:14 UTC
[iotdb] 07/08: bind input types
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch trasnsform-operator-bugfix
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 666ee799e867848c507023ee59f47f9c4505c9d9
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon May 16 12:15:01 2022 +0800
bind input types
---
.../mpp/execution/operator/process/FilterOperator.java | 6 ++++--
.../execution/operator/process/TransformOperator.java | 17 ++++++++---------
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 12 ++++++++++--
.../db/query/udf/core/layer/EvaluationDAGBuilder.java | 7 ++++---
4 files changed, 26 insertions(+), 16 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
index 54023374cf..8522d065cc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
@@ -75,9 +75,11 @@ public class FilterOperator extends TransformOperator {
@Override
protected void initTransformers(
- Map<String, List<InputLocation>> inputLocations, TypeProvider typeProvider)
+ Map<String, List<InputLocation>> inputLocations,
+ Expression[] outputExpressions,
+ TypeProvider typeProvider)
throws QueryProcessException, IOException {
- super.initTransformers(inputLocations, typeProvider);
+ super.initTransformers(inputLocations, outputExpressions, typeProvider);
filterPointReader = transformers[transformers.length - 1];
if (filterPointReader.getDataType() != TSDataType.BOOLEAN) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
index de3cedcc3c..784573cd1b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
@@ -63,8 +63,6 @@ public class TransformOperator implements ProcessOperator {
protected final OperatorContext operatorContext;
protected final Operator inputOperator;
- protected final List<TSDataType> inputDataTypes;
- protected final Expression[] outputExpressions;
protected final boolean keepNull;
protected boolean isFirstIteration;
@@ -87,15 +85,13 @@ public class TransformOperator implements ProcessOperator {
throws QueryProcessException, IOException {
this.operatorContext = operatorContext;
this.inputOperator = inputOperator;
- this.inputDataTypes = inputDataTypes;
- this.outputExpressions = outputExpressions;
this.keepNull = keepNull;
isFirstIteration = true;
initInputLayer(inputDataTypes);
- initUdtfContext(zoneId);
- initTransformers(inputLocations, typeProvider);
+ initUdtfContext(outputExpressions, zoneId);
+ initTransformers(inputLocations, outputExpressions, typeProvider);
}
private void initInputLayer(List<TSDataType> inputDataTypes) throws QueryProcessException {
@@ -106,13 +102,15 @@ public class TransformOperator implements ProcessOperator {
new TsBlockInputDataSet(inputOperator, inputDataTypes));
}
- private void initUdtfContext(ZoneId zoneId) {
+ private void initUdtfContext(Expression[] outputExpressions, ZoneId zoneId) {
udtfContext = new UDTFContext(zoneId);
udtfContext.constructUdfExecutors(outputExpressions);
}
protected void initTransformers(
- Map<String, List<InputLocation>> inputLocations, TypeProvider typeProvider)
+ Map<String, List<InputLocation>> inputLocations,
+ Expression[] outputExpressions,
+ TypeProvider typeProvider)
throws QueryProcessException, IOException {
UDFRegistrationService.getInstance().acquireRegistrationLock();
try {
@@ -125,10 +123,11 @@ public class TransformOperator implements ProcessOperator {
inputLayer,
inputLocations,
outputExpressions,
- udtfContext,
typeProvider,
+ udtfContext,
udfTransformerMemoryBudgetInMB + udfCollectorMemoryBudgetInMB)
.buildLayerMemoryAssigner()
+ .bindInputLayerColumnIndexWithExpression()
.buildResultColumnPointReaders()
.getOutputPointReaders();
} finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 492ab8f029..6a8c5956e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -454,7 +454,7 @@ public class LocalExecutionPlanner {
node.getPlanNodeId(),
TransformNode.class.getSimpleName());
final Operator inputOperator = generateOnlyChildOperator(node, context);
- final List<TSDataType> inputDataTypes = getOutputColumnTypes(node, context.getTypeProvider());
+ final List<TSDataType> inputDataTypes = getInputColumnTypes(node, context.getTypeProvider());
final Map<String, List<InputLocation>> inputLocations = makeLayout(node);
try {
@@ -478,7 +478,7 @@ public class LocalExecutionPlanner {
context.instanceContext.addOperatorContext(
context.getNextOperatorId(), node.getPlanNodeId(), FilterNode.class.getSimpleName());
final Operator inputOperator = generateOnlyChildOperator(node, context);
- final List<TSDataType> inputDataTypes = getOutputColumnTypes(node, context.getTypeProvider());
+ final List<TSDataType> inputDataTypes = getInputColumnTypes(node, context.getTypeProvider());
final Map<String, List<InputLocation>> inputLocations = makeLayout(node);
try {
@@ -680,6 +680,14 @@ public class LocalExecutionPlanner {
return outputMappings;
}
+ private List<TSDataType> getInputColumnTypes(PlanNode node, TypeProvider typeProvider) {
+ return node.getChildren().stream()
+ .map(PlanNode::getOutputColumnNames)
+ .flatMap(List::stream)
+ .map(typeProvider::getType)
+ .collect(Collectors.toList());
+ }
+
private List<TSDataType> getOutputColumnTypes(PlanNode node, TypeProvider typeProvider) {
return node.getOutputColumnNames().stream()
.map(typeProvider::getType)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java
index 80479da2d1..003447f0e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java
@@ -41,6 +41,8 @@ public class EvaluationDAGBuilder {
private final Expression[] outputExpressions;
private final LayerPointReader[] outputPointReaders;
+ private final TypeProvider typeProvider;
+
private final UDTFContext udtfContext;
private final LayerMemoryAssigner memoryAssigner;
@@ -50,22 +52,21 @@ public class EvaluationDAGBuilder {
// sub-expressions, but they can share the same point reader. we cache the point reader here to
// make sure that only one point reader will be built for one expression.
private final Map<Expression, IntermediateLayer> expressionIntermediateLayerMap;
- private final TypeProvider typeProvider;
public EvaluationDAGBuilder(
long queryId,
RawQueryInputLayer inputLayer,
Map<String, List<InputLocation>> inputLocations,
Expression[] outputExpressions,
- UDTFContext udtfContext,
TypeProvider typeProvider,
+ UDTFContext udtfContext,
float memoryBudgetInMB) {
this.queryId = queryId;
this.inputLayer = inputLayer;
this.inputLocations = inputLocations;
this.outputExpressions = outputExpressions;
- this.udtfContext = udtfContext;
this.typeProvider = typeProvider;
+ this.udtfContext = udtfContext;
int size = inputLayer.getInputColumnCount();
outputPointReaders = new LayerPointReader[size];