You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/09/03 09:07:41 UTC

[iotdb] 02/03: refactor: inputLayer -> udfLayer

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6dfba2c72dd5d1899f989f88780bed03f1bf51f8
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Fri Sep 3 09:33:18 2021 +0800

    refactor: inputLayer -> udfLayer
---
 .../db/query/dataset/UDTFAlignByTimeDataSet.java   |  4 +--
 .../apache/iotdb/db/query/dataset/UDTFDataSet.java | 18 +++++------
 .../db/query/dataset/UDTFNonAlignDataSet.java      |  2 +-
 .../iotdb/db/query/expression/Expression.java      |  4 +--
 .../query/expression/binary/BinaryExpression.java  |  8 ++---
 .../query/expression/unary/FunctionExpression.java |  7 +++--
 .../query/expression/unary/NegationExpression.java |  6 ++--
 .../query/expression/unary/TimeSeriesOperand.java  |  7 +++--
 .../iotdb/db/query/udf/core/layer/DAGBuilder.java  | 35 ++++++++++++----------
 .../core/layer/{InputLayer.java => UDFLayer.java}  |  6 ++--
 10 files changed, 52 insertions(+), 45 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
index 6c065c2..c7e8af4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
@@ -193,7 +193,7 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
       }
 
       // todo: control upper bound here
-      inputLayer.updateRowRecordListEvictionUpperBound();
+      udfLayer.updateRowRecordListEvictionUpperBound();
     }
 
     /*
@@ -296,7 +296,7 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
     }
 
     // todo: control upper bound here
-    inputLayer.updateRowRecordListEvictionUpperBound();
+    udfLayer.updateRowRecordListEvictionUpperBound();
 
     return rowRecord;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
index 0f9ef3d..6ed497d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
@@ -36,7 +36,7 @@ import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
 import org.apache.iotdb.db.query.udf.api.customizer.strategy.AccessStrategy;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
-import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
+import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticAdditionTransformer;
 import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticDivisionTransformer;
@@ -67,7 +67,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
 
   protected final long queryId;
   protected final UDTFPlan udtfPlan;
-  protected final InputLayer inputLayer;
+  protected final UDFLayer udfLayer;
 
   protected LayerPointReader[] transformers;
 
@@ -84,8 +84,8 @@ public abstract class UDTFDataSet extends QueryDataSet {
     super(new ArrayList<>(deduplicatedPaths), deduplicatedDataTypes);
     queryId = queryContext.getQueryId();
     this.udtfPlan = udtfPlan;
-    inputLayer =
-        new InputLayer(
+    udfLayer =
+        new UDFLayer(
             queryId,
             UDF_READER_MEMORY_BUDGET_IN_MB,
             deduplicatedPaths,
@@ -108,8 +108,8 @@ public abstract class UDTFDataSet extends QueryDataSet {
     super(new ArrayList<>(deduplicatedPaths), deduplicatedDataTypes);
     queryId = queryContext.getQueryId();
     this.udtfPlan = udtfPlan;
-    inputLayer =
-        new InputLayer(
+    udfLayer =
+        new UDFLayer(
             queryId,
             UDF_READER_MEMORY_BUDGET_IN_MB,
             deduplicatedPaths,
@@ -167,13 +167,13 @@ public abstract class UDTFDataSet extends QueryDataSet {
     switch (accessStrategy.getAccessStrategyType()) {
       case ROW_BY_ROW:
         transformers[columnIndex] =
-            new UDFQueryRowTransformer(inputLayer.constructRowReader(readerIndexes), executor);
+            new UDFQueryRowTransformer(udfLayer.constructRowReader(readerIndexes), executor);
         break;
       case SLIDING_SIZE_WINDOW:
       case SLIDING_TIME_WINDOW:
         transformers[columnIndex] =
             new UDFQueryRowWindowTransformer(
-                inputLayer.constructRowWindowReader(
+                udfLayer.constructRowWindowReader(
                     readerIndexes, accessStrategy, memoryBudgetForSingleWindowTransformer),
                 executor);
         break;
@@ -254,7 +254,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
   }
 
   private LayerPointReader constructPointReaderBySeriesName(String seriesName) {
-    return inputLayer.constructPointReader(udtfPlan.getReaderIndex(seriesName));
+    return udfLayer.constructPointReader(udtfPlan.getReaderIndex(seriesName));
   }
 
   public void finalizeUDFs(long queryId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java
index 25dd871..e3810bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java
@@ -114,7 +114,7 @@ public class UDTFNonAlignDataSet extends UDTFDataSet implements DirectNonAlignDa
     }
 
     // todo: control upper bound here
-    inputLayer.updateRowRecordListEvictionUpperBound();
+    udfLayer.updateRowRecordListEvictionUpperBound();
 
     tsQueryNonAlignDataSet.setTimeList(timeBufferList);
     tsQueryNonAlignDataSet.setValueList(valueBufferList);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index 9dd75bd..09a15ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -24,8 +24,8 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
-import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
 
 import java.util.List;
 import java.util.Map;
@@ -53,7 +53,7 @@ public abstract class Expression {
 
   public abstract IntermediateLayer constructIntermediateLayer(
       UDTFPlan udtfPlan,
-      InputLayer inputLayer,
+      UDFLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
       throws QueryProcessException;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index e31496c..b793ef9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -25,9 +25,9 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
-import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.SingleInputMultiOutputIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticBinaryTransformer;
 
@@ -113,16 +113,16 @@ public abstract class BinaryExpression extends Expression {
   @Override
   public IntermediateLayer constructIntermediateLayer(
       UDTFPlan udtfPlan,
-      InputLayer inputLayer,
+      UDFLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
       throws QueryProcessException {
     if (!expressionIntermediateLayerMap.containsKey(this)) {
       IntermediateLayer leftParentIntermediateLayer =
           leftExpression.constructIntermediateLayer(
-              udtfPlan, inputLayer, expressionIntermediateLayerMap);
+              udtfPlan, rawTimeSeriesInputLayer, expressionIntermediateLayerMap);
       IntermediateLayer rightParentIntermediateLayer =
           rightExpression.constructIntermediateLayer(
-              udtfPlan, inputLayer, expressionIntermediateLayerMap);
+              udtfPlan, rawTimeSeriesInputLayer, expressionIntermediateLayerMap);
 
       expressionIntermediateLayerMap.put(
           this,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index 696e2d2..d484769 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -27,9 +27,9 @@ import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.strategy.optimizer.ConcatPathOptimizer;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
-import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.MultiInputMultiOutputIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 
 import java.util.ArrayList;
@@ -151,7 +151,7 @@ public class FunctionExpression extends Expression {
   @Override
   public IntermediateLayer constructIntermediateLayer(
       UDTFPlan udtfPlan,
-      InputLayer inputLayer,
+      UDFLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
       throws QueryProcessException {
     if (!expressionIntermediateLayerMap.containsKey(this)) {
@@ -159,7 +159,8 @@ public class FunctionExpression extends Expression {
       for (Expression expression : expressions) {
         parentLayerPointReaders.add(
             expression
-                .constructIntermediateLayer(udtfPlan, inputLayer, expressionIntermediateLayerMap)
+                .constructIntermediateLayer(
+                    udtfPlan, rawTimeSeriesInputLayer, expressionIntermediateLayerMap)
                 .constructPointReader());
       }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index 8dcdc73..326d1af 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -25,9 +25,9 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
-import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.SingleInputMultiOutputIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
 import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticNegationTransformer;
 
 import java.util.ArrayList;
@@ -79,13 +79,13 @@ public class NegationExpression extends Expression {
   @Override
   public IntermediateLayer constructIntermediateLayer(
       UDTFPlan udtfPlan,
-      InputLayer inputLayer,
+      UDFLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
       throws QueryProcessException {
     if (!expressionIntermediateLayerMap.containsKey(this)) {
       IntermediateLayer parentIntermediateLayer =
           expression.constructIntermediateLayer(
-              udtfPlan, inputLayer, expressionIntermediateLayerMap);
+              udtfPlan, rawTimeSeriesInputLayer, expressionIntermediateLayerMap);
 
       expressionIntermediateLayerMap.put(
           this,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index 9b17315..a67e621 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -25,9 +25,9 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
-import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.SingleInputMultiOutputIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.List;
@@ -74,14 +74,15 @@ public class TimeSeriesOperand extends Expression {
   @Override
   public IntermediateLayer constructIntermediateLayer(
       UDTFPlan udtfPlan,
-      InputLayer inputLayer,
+      UDFLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
       throws QueryProcessException {
     if (!expressionIntermediateLayerMap.containsKey(this)) {
       expressionIntermediateLayerMap.put(
           this,
           new SingleInputMultiOutputIntermediateLayer(
-              inputLayer.constructPointReader(udtfPlan.getReaderIndex(path.getFullPath())),
+              rawTimeSeriesInputLayer.constructPointReader(
+                  udtfPlan.getReaderIndex(path.getFullPath())),
               -1,
               -1));
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
index 472ca2d..6a107d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.expression.ResultColumn;
-import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -33,42 +33,47 @@ import java.util.Map;
 public class DAGBuilder {
 
   private final UDTFPlan udtfPlan;
-  private final InputLayer inputLayer;
+  private final UDFLayer rawTimeSeriesInputLayer;
 
   // input
   private final List<Expression> resultColumnExpressions;
   // output
-  private final Transformer[] resultColumnTransformers;
+  private final LayerPointReader[] resultColumnPointReaders;
 
   // all result column expressions will be split into several sub-expressions, each expression has
-  // its own transformer. different result column expressions may have the same sub-expressions,
-  // but they can share the same transformer. we cache the transformer here to make sure that only
-  // one transformer will be built for one expression.
+  // its own result point reader. different result column expressions may have the same
+  // sub-expressions, but they can share the same point reader. we cache the point reader here to
+  // make sure that only one point reader will be built for one expression.
   private final Map<Expression, IntermediateLayer> expressionIntermediateLayerMap;
 
-  public DAGBuilder(UDTFPlan udtfPlan, InputLayer inputLayer) throws QueryProcessException {
+  public DAGBuilder(UDTFPlan udtfPlan, UDFLayer inputLayer) throws QueryProcessException {
     this.udtfPlan = udtfPlan;
-    this.inputLayer = inputLayer;
+    this.rawTimeSeriesInputLayer = inputLayer;
 
     resultColumnExpressions = new ArrayList<>();
     for (ResultColumn resultColumn : udtfPlan.getResultColumns()) {
       resultColumnExpressions.add(resultColumn.getExpression());
     }
-    resultColumnTransformers = new Transformer[resultColumnExpressions.size()];
+    resultColumnPointReaders = new LayerPointReader[resultColumnExpressions.size()];
 
     expressionIntermediateLayerMap = new HashMap<>();
 
     build();
   }
 
-  public void build() throws QueryProcessException {
-    for (Expression resultColumnExpression : resultColumnExpressions) {
-      resultColumnExpression.constructIntermediateLayer(
-          udtfPlan, inputLayer, expressionIntermediateLayerMap);
+  public DAGBuilder build() throws QueryProcessException {
+    for (int i = 0; i < resultColumnExpressions.size(); ++i) {
+      resultColumnPointReaders[i] =
+          resultColumnExpressions
+              .get(i)
+              .constructIntermediateLayer(
+                  udtfPlan, rawTimeSeriesInputLayer, expressionIntermediateLayerMap)
+              .constructPointReader();
     }
+    return this;
   }
 
-  public Transformer[] getResultColumnTransformers() {
-    return resultColumnTransformers;
+  public LayerPointReader[] getResultColumnPointReaders() {
+    return resultColumnPointReaders;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/InputLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java
similarity index 99%
rename from server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/InputLayer.java
rename to server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java
index f16a210..a613c4c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/InputLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java
@@ -49,7 +49,7 @@ import org.apache.iotdb.tsfile.utils.Binary;
 import java.io.IOException;
 import java.util.List;
 
-public class InputLayer {
+public class UDFLayer {
 
   private long queryId;
 
@@ -61,7 +61,7 @@ public class InputLayer {
   private SafetyLine safetyLine;
 
   /** InputLayerWithoutValueFilter */
-  public InputLayer(
+  public UDFLayer(
       long queryId,
       float memoryBudgetInMB,
       List<PartialPath> paths,
@@ -75,7 +75,7 @@ public class InputLayer {
   }
 
   /** InputLayerWithValueFilter */
-  public InputLayer(
+  public UDFLayer(
       long queryId,
       float memoryBudgetInMB,
       List<PartialPath> paths,