You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/05/16 04:17:14 UTC

[iotdb] 07/08: bind input types

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch trasnsform-operator-bugfix
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 666ee799e867848c507023ee59f47f9c4505c9d9
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon May 16 12:15:01 2022 +0800

    bind input types
---
 .../mpp/execution/operator/process/FilterOperator.java  |  6 ++++--
 .../execution/operator/process/TransformOperator.java   | 17 ++++++++---------
 .../db/mpp/plan/planner/LocalExecutionPlanner.java      | 12 ++++++++++--
 .../db/query/udf/core/layer/EvaluationDAGBuilder.java   |  7 ++++---
 4 files changed, 26 insertions(+), 16 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
index 54023374cf..8522d065cc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
@@ -75,9 +75,11 @@ public class FilterOperator extends TransformOperator {
 
   @Override
   protected void initTransformers(
-      Map<String, List<InputLocation>> inputLocations, TypeProvider typeProvider)
+      Map<String, List<InputLocation>> inputLocations,
+      Expression[] outputExpressions,
+      TypeProvider typeProvider)
       throws QueryProcessException, IOException {
-    super.initTransformers(inputLocations, typeProvider);
+    super.initTransformers(inputLocations, outputExpressions, typeProvider);
 
     filterPointReader = transformers[transformers.length - 1];
     if (filterPointReader.getDataType() != TSDataType.BOOLEAN) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
index de3cedcc3c..784573cd1b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
@@ -63,8 +63,6 @@ public class TransformOperator implements ProcessOperator {
 
   protected final OperatorContext operatorContext;
   protected final Operator inputOperator;
-  protected final List<TSDataType> inputDataTypes;
-  protected final Expression[] outputExpressions;
   protected final boolean keepNull;
 
   protected boolean isFirstIteration;
@@ -87,15 +85,13 @@ public class TransformOperator implements ProcessOperator {
       throws QueryProcessException, IOException {
     this.operatorContext = operatorContext;
     this.inputOperator = inputOperator;
-    this.inputDataTypes = inputDataTypes;
-    this.outputExpressions = outputExpressions;
     this.keepNull = keepNull;
 
     isFirstIteration = true;
 
     initInputLayer(inputDataTypes);
-    initUdtfContext(zoneId);
-    initTransformers(inputLocations, typeProvider);
+    initUdtfContext(outputExpressions, zoneId);
+    initTransformers(inputLocations, outputExpressions, typeProvider);
   }
 
   private void initInputLayer(List<TSDataType> inputDataTypes) throws QueryProcessException {
@@ -106,13 +102,15 @@ public class TransformOperator implements ProcessOperator {
             new TsBlockInputDataSet(inputOperator, inputDataTypes));
   }
 
-  private void initUdtfContext(ZoneId zoneId) {
+  private void initUdtfContext(Expression[] outputExpressions, ZoneId zoneId) {
     udtfContext = new UDTFContext(zoneId);
     udtfContext.constructUdfExecutors(outputExpressions);
   }
 
   protected void initTransformers(
-      Map<String, List<InputLocation>> inputLocations, TypeProvider typeProvider)
+      Map<String, List<InputLocation>> inputLocations,
+      Expression[] outputExpressions,
+      TypeProvider typeProvider)
       throws QueryProcessException, IOException {
     UDFRegistrationService.getInstance().acquireRegistrationLock();
     try {
@@ -125,10 +123,11 @@ public class TransformOperator implements ProcessOperator {
                   inputLayer,
                   inputLocations,
                   outputExpressions,
-                  udtfContext,
                   typeProvider,
+                  udtfContext,
                   udfTransformerMemoryBudgetInMB + udfCollectorMemoryBudgetInMB)
               .buildLayerMemoryAssigner()
+              .bindInputLayerColumnIndexWithExpression()
               .buildResultColumnPointReaders()
               .getOutputPointReaders();
     } finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 492ab8f029..6a8c5956e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -454,7 +454,7 @@ public class LocalExecutionPlanner {
               node.getPlanNodeId(),
               TransformNode.class.getSimpleName());
       final Operator inputOperator = generateOnlyChildOperator(node, context);
-      final List<TSDataType> inputDataTypes = getOutputColumnTypes(node, context.getTypeProvider());
+      final List<TSDataType> inputDataTypes = getInputColumnTypes(node, context.getTypeProvider());
       final Map<String, List<InputLocation>> inputLocations = makeLayout(node);
 
       try {
@@ -478,7 +478,7 @@ public class LocalExecutionPlanner {
           context.instanceContext.addOperatorContext(
               context.getNextOperatorId(), node.getPlanNodeId(), FilterNode.class.getSimpleName());
       final Operator inputOperator = generateOnlyChildOperator(node, context);
-      final List<TSDataType> inputDataTypes = getOutputColumnTypes(node, context.getTypeProvider());
+      final List<TSDataType> inputDataTypes = getInputColumnTypes(node, context.getTypeProvider());
       final Map<String, List<InputLocation>> inputLocations = makeLayout(node);
 
       try {
@@ -680,6 +680,14 @@ public class LocalExecutionPlanner {
       return outputMappings;
     }
 
+    private List<TSDataType> getInputColumnTypes(PlanNode node, TypeProvider typeProvider) {
+      return node.getChildren().stream()
+          .map(PlanNode::getOutputColumnNames)
+          .flatMap(List::stream)
+          .map(typeProvider::getType)
+          .collect(Collectors.toList());
+    }
+
     private List<TSDataType> getOutputColumnTypes(PlanNode node, TypeProvider typeProvider) {
       return node.getOutputColumnNames().stream()
           .map(typeProvider::getType)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java
index 80479da2d1..003447f0e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java
@@ -41,6 +41,8 @@ public class EvaluationDAGBuilder {
   private final Expression[] outputExpressions;
   private final LayerPointReader[] outputPointReaders;
 
+  private final TypeProvider typeProvider;
+
   private final UDTFContext udtfContext;
 
   private final LayerMemoryAssigner memoryAssigner;
@@ -50,22 +52,21 @@ public class EvaluationDAGBuilder {
   // sub-expressions, but they can share the same point reader. we cache the point reader here to
   // make sure that only one point reader will be built for one expression.
   private final Map<Expression, IntermediateLayer> expressionIntermediateLayerMap;
-  private final TypeProvider typeProvider;
 
   public EvaluationDAGBuilder(
       long queryId,
       RawQueryInputLayer inputLayer,
       Map<String, List<InputLocation>> inputLocations,
       Expression[] outputExpressions,
-      UDTFContext udtfContext,
       TypeProvider typeProvider,
+      UDTFContext udtfContext,
       float memoryBudgetInMB) {
     this.queryId = queryId;
     this.inputLayer = inputLayer;
     this.inputLocations = inputLocations;
     this.outputExpressions = outputExpressions;
-    this.udtfContext = udtfContext;
     this.typeProvider = typeProvider;
+    this.udtfContext = udtfContext;
 
     int size = inputLayer.getInputColumnCount();
     outputPointReaders = new LayerPointReader[size];