You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/09/02 08:13:37 UTC

[iotdb] branch nested-operations updated (1c36448 -> c8e960b)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a change to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


    from 1c36448  struct of DAGBuilder & TransformerBuilder
     new 9731a89  IntermediateLayer
     new 8a492c2  DAG builder
     new de1c509  introduce MultiInputIntermediateLayer
     new c8e960b  SingleInputSingleOutputIntermediateLayer & SingleInputMultiOutputIntermediateLayer

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../db/query/dataset/UDTFAlignByTimeDataSet.java   |   2 +
 .../apache/iotdb/db/query/dataset/UDTFDataSet.java |   2 +-
 .../db/query/dataset/UDTFNonAlignDataSet.java      |   1 +
 .../iotdb/db/query/expression/Expression.java      |  12 +-
 .../expression/binary/AdditionExpression.java      |  10 ++
 .../query/expression/binary/BinaryExpression.java  |  44 ++++--
 .../expression/binary/DivisionExpression.java      |  10 ++
 .../query/expression/binary/ModuloExpression.java  |   9 ++
 .../binary/MultiplicationExpression.java           |  10 ++
 .../expression/binary/SubtractionExpression.java   |  10 ++
 .../query/expression/unary/FunctionExpression.java |  35 +++--
 .../query/expression/unary/NegationExpression.java |  33 +++--
 .../query/expression/unary/TimeSeriesOperand.java  |  24 ++-
 .../query/udf/core/builder/TransformerBuilder.java |  41 -----
 .../udf/core/{builder => layer}/DAGBuilder.java    |  35 ++---
 .../udf/core/{input => layer}/InputLayer.java      |   4 +-
 .../udf/core/layer/IntermediateLayer.java}         |   9 +-
 .../MultiInputIntermediateLayer.java}              |  16 +-
 .../udf/core/{input => layer}/SafetyLine.java      |   2 +-
 .../SingleInputMultiOutputIntermediateLayer.java   | 165 +++++++++++++++++++++
 .../SingleInputSingleOutputIntermediateLayer.java} |  15 +-
 .../udf/core/transformer/UDFQueryTransformer.java  |   2 +-
 .../tv/ElasticSerializableTVList.java              |   5 +-
 .../ElasticSerializableTVListTest.java             |   2 +-
 24 files changed, 369 insertions(+), 129 deletions(-)
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/udf/core/builder/TransformerBuilder.java
 rename server/src/main/java/org/apache/iotdb/db/query/udf/core/{builder => layer}/DAGBuilder.java (72%)
 rename server/src/main/java/org/apache/iotdb/db/query/udf/core/{input => layer}/InputLayer.java (99%)
 copy server/src/main/java/org/apache/iotdb/db/{metrics/sink/Sink.java => query/udf/core/layer/IntermediateLayer.java} (80%)
 copy server/src/main/java/org/apache/iotdb/db/query/udf/core/{transformer/ArithmeticModuloTransformer.java => layer/MultiInputIntermediateLayer.java} (68%)
 rename server/src/main/java/org/apache/iotdb/db/query/udf/core/{input => layer}/SafetyLine.java (97%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java
 copy server/src/main/java/org/apache/iotdb/db/query/udf/core/{transformer/ArithmeticAdditionTransformer.java => layer/SingleInputSingleOutputIntermediateLayer.java} (67%)

[iotdb] 04/04: SingleInputSingleOutputIntermediateLayer & SingleInputMultiOutputIntermediateLayer

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c8e960bcb190baf6b398475156bf938852bd2865
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu Sep 2 16:12:44 2021 +0800

    SingleInputSingleOutputIntermediateLayer & SingleInputMultiOutputIntermediateLayer
---
 .../query/expression/binary/BinaryExpression.java  |  4 +--
 .../query/expression/unary/NegationExpression.java |  4 +--
 .../query/expression/unary/TimeSeriesOperand.java  |  4 +--
 ...> SingleInputMultiOutputIntermediateLayer.java} |  6 ++--
 .../SingleInputSingleOutputIntermediateLayer.java  | 36 ++++++++++++++++++++++
 5 files changed, 45 insertions(+), 9 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index 69dd13c..e31496c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
-import org.apache.iotdb.db.query.udf.core.layer.SingleInputIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputMultiOutputIntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticBinaryTransformer;
 
@@ -126,7 +126,7 @@ public abstract class BinaryExpression extends Expression {
 
       expressionIntermediateLayerMap.put(
           this,
-          new SingleInputIntermediateLayer(
+          new SingleInputMultiOutputIntermediateLayer(
               constructTransformer(
                   leftParentIntermediateLayer.constructPointReader(),
                   rightParentIntermediateLayer.constructPointReader()),
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index 813ee5f..8dcdc73 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
-import org.apache.iotdb.db.query.udf.core.layer.SingleInputIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputMultiOutputIntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticNegationTransformer;
 
 import java.util.ArrayList;
@@ -89,7 +89,7 @@ public class NegationExpression extends Expression {
 
       expressionIntermediateLayerMap.put(
           this,
-          new SingleInputIntermediateLayer(
+          new SingleInputMultiOutputIntermediateLayer(
               new ArithmeticNegationTransformer(parentIntermediateLayer.constructPointReader()),
               -1,
               -1));
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index c1e6eb8..9b17315 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
-import org.apache.iotdb.db.query.udf.core.layer.SingleInputIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputMultiOutputIntermediateLayer;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.List;
@@ -80,7 +80,7 @@ public class TimeSeriesOperand extends Expression {
     if (!expressionIntermediateLayerMap.containsKey(this)) {
       expressionIntermediateLayerMap.put(
           this,
-          new SingleInputIntermediateLayer(
+          new SingleInputMultiOutputIntermediateLayer(
               inputLayer.constructPointReader(udtfPlan.getReaderIndex(path.getFullPath())),
               -1,
               -1));
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputIntermediateLayer.java
rename to server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java
index 04697c9..a711ced 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java
@@ -28,16 +28,16 @@ import org.apache.iotdb.tsfile.utils.Binary;
 
 import java.io.IOException;
 
-public class SingleInputIntermediateLayer implements IntermediateLayer {
+public class SingleInputMultiOutputIntermediateLayer implements IntermediateLayer {
 
-  private static final int CACHE_BLOCK_SIZE = 1;
+  private static final int CACHE_BLOCK_SIZE = 2;
 
   private final TSDataType dataType;
   private final LayerPointReader parentLayerPointReader;
   private final ElasticSerializableTVList tvList;
   private final SafetyLine safetyLine;
 
-  public SingleInputIntermediateLayer(
+  public SingleInputMultiOutputIntermediateLayer(
       LayerPointReader parentLayerPointReader, long queryId, float memoryBudgetInMB)
       throws QueryProcessException {
     this.parentLayerPointReader = parentLayerPointReader;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
new file mode 100644
index 0000000..131472a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.layer;
+
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+
+public class SingleInputSingleOutputIntermediateLayer implements IntermediateLayer {
+
+  private final LayerPointReader parentLayerPointReader;
+
+  public SingleInputSingleOutputIntermediateLayer(LayerPointReader parentLayerPointReader) {
+    this.parentLayerPointReader = parentLayerPointReader;
+  }
+
+  @Override
+  public LayerPointReader constructPointReader() {
+    return parentLayerPointReader;
+  }
+}

[iotdb] 02/04: DAG builder

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8a492c2fc4dfbff72abba73f4ffc7d9336d3fa2b
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed Sep 1 18:26:17 2021 +0800

    DAG builder
---
 .../iotdb/db/query/expression/Expression.java      | 12 ++++--
 .../expression/binary/AdditionExpression.java      | 10 +++++
 .../query/expression/binary/BinaryExpression.java  | 43 ++++++++++++++++------
 .../expression/binary/DivisionExpression.java      | 10 +++++
 .../query/expression/binary/ModuloExpression.java  |  9 +++++
 .../binary/MultiplicationExpression.java           | 10 +++++
 .../expression/binary/SubtractionExpression.java   | 10 +++++
 .../query/expression/unary/FunctionExpression.java | 28 +++++++-------
 .../query/expression/unary/NegationExpression.java | 32 +++++++++++-----
 .../query/expression/unary/TimeSeriesOperand.java  | 23 +++++++++---
 .../udf/core/builder/LayerPointReaderBuilder.java  | 22 -----------
 .../query/udf/core/builder/TransformerBuilder.java | 41 ---------------------
 .../udf/core/{builder => layer}/DAGBuilder.java    | 32 +++++++---------
 13 files changed, 158 insertions(+), 124 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index 43f792a..9dd75bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -20,9 +20,12 @@
 package org.apache.iotdb.db.query.expression;
 
 import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
-import org.apache.iotdb.db.query.udf.core.builder.TransformerBuilder;
+import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
+import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
 
 import java.util.List;
 import java.util.Map;
@@ -48,8 +51,11 @@ public abstract class Expression {
 
   public abstract void collectPaths(Set<PartialPath> pathSet);
 
-  public abstract void constructTransformerBuilder(
-      Map<Expression, TransformerBuilder> expressionTransformerBuilderMap);
+  public abstract IntermediateLayer constructIntermediateLayer(
+      UDTFPlan udtfPlan,
+      InputLayer inputLayer,
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
+      throws QueryProcessException;
 
   public String getExpressionString() {
     if (expressionString == null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/AdditionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/AdditionExpression.java
index 5500ba4..3aca060 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/AdditionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/AdditionExpression.java
@@ -20,6 +20,9 @@
 package org.apache.iotdb.db.query.expression.binary;
 
 import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticAdditionTransformer;
+import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticBinaryTransformer;
 
 public class AdditionExpression extends BinaryExpression {
 
@@ -28,6 +31,13 @@ public class AdditionExpression extends BinaryExpression {
   }
 
   @Override
+  protected ArithmeticBinaryTransformer constructTransformer(
+      LayerPointReader leftParentLayerPointReader, LayerPointReader rightParentLayerPointReader) {
+    return new ArithmeticAdditionTransformer(
+        leftParentLayerPointReader, rightParentLayerPointReader);
+  }
+
+  @Override
   protected String operator() {
     return "+";
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index 36e31d5..4d52793 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -20,10 +20,15 @@
 package org.apache.iotdb.db.query.expression.binary;
 
 import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
-import org.apache.iotdb.db.query.udf.core.builder.TransformerBuilder;
+import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
+import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticBinaryTransformer;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -105,21 +110,35 @@ public abstract class BinaryExpression extends Expression {
   }
 
   @Override
-  public void constructTransformerBuilder(
-      Map<Expression, TransformerBuilder> expressionTransformerBuilderMap) {
-    if (expressionTransformerBuilderMap.containsKey(this)) {
-      return;
+  public IntermediateLayer constructIntermediateLayer(
+      UDTFPlan udtfPlan,
+      InputLayer inputLayer,
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
+      throws QueryProcessException {
+    if (!expressionIntermediateLayerMap.containsKey(this)) {
+      IntermediateLayer leftParentIntermediateLayer =
+          leftExpression.constructIntermediateLayer(
+              udtfPlan, inputLayer, expressionIntermediateLayerMap);
+      IntermediateLayer rightParentIntermediateLayer =
+          rightExpression.constructIntermediateLayer(
+              udtfPlan, inputLayer, expressionIntermediateLayerMap);
+
+      expressionIntermediateLayerMap.put(
+          this,
+          new IntermediateLayer(
+              constructTransformer(
+                  leftParentIntermediateLayer.constructPointReader(),
+                  rightParentIntermediateLayer.constructPointReader()),
+              -1,
+              -1));
     }
 
-    leftExpression.constructTransformerBuilder(expressionTransformerBuilderMap);
-    rightExpression.constructTransformerBuilder(expressionTransformerBuilderMap);
-
-    TransformerBuilder transformerBuilder = new TransformerBuilder(this);
-    transformerBuilder.addDependentExpression(leftExpression);
-    transformerBuilder.addDependentExpression(rightExpression);
-    expressionTransformerBuilderMap.put(this, transformerBuilder);
+    return expressionIntermediateLayerMap.get(this);
   }
 
+  protected abstract ArithmeticBinaryTransformer constructTransformer(
+      LayerPointReader leftParentLayerPointReader, LayerPointReader rightParentLayerPointReader);
+
   public Expression getLeftExpression() {
     return leftExpression;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/DivisionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/DivisionExpression.java
index e611887..9d8a613 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/DivisionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/DivisionExpression.java
@@ -20,6 +20,9 @@
 package org.apache.iotdb.db.query.expression.binary;
 
 import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticBinaryTransformer;
+import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticDivisionTransformer;
 
 public class DivisionExpression extends BinaryExpression {
 
@@ -28,6 +31,13 @@ public class DivisionExpression extends BinaryExpression {
   }
 
   @Override
+  protected ArithmeticBinaryTransformer constructTransformer(
+      LayerPointReader leftParentLayerPointReader, LayerPointReader rightParentLayerPointReader) {
+    return new ArithmeticDivisionTransformer(
+        leftParentLayerPointReader, rightParentLayerPointReader);
+  }
+
+  @Override
   protected String operator() {
     return "/";
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/ModuloExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/ModuloExpression.java
index 94dd905..464e623 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/ModuloExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/ModuloExpression.java
@@ -20,6 +20,9 @@
 package org.apache.iotdb.db.query.expression.binary;
 
 import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticBinaryTransformer;
+import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticModuloTransformer;
 
 public class ModuloExpression extends BinaryExpression {
 
@@ -28,6 +31,12 @@ public class ModuloExpression extends BinaryExpression {
   }
 
   @Override
+  protected ArithmeticBinaryTransformer constructTransformer(
+      LayerPointReader leftParentLayerPointReader, LayerPointReader rightParentLayerPointReader) {
+    return new ArithmeticModuloTransformer(leftParentLayerPointReader, rightParentLayerPointReader);
+  }
+
+  @Override
   protected String operator() {
     return "%";
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/MultiplicationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/MultiplicationExpression.java
index ae3d17b..420e2c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/MultiplicationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/MultiplicationExpression.java
@@ -20,6 +20,9 @@
 package org.apache.iotdb.db.query.expression.binary;
 
 import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticBinaryTransformer;
+import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticMultiplicationTransformer;
 
 public class MultiplicationExpression extends BinaryExpression {
 
@@ -28,6 +31,13 @@ public class MultiplicationExpression extends BinaryExpression {
   }
 
   @Override
+  protected ArithmeticBinaryTransformer constructTransformer(
+      LayerPointReader leftParentLayerPointReader, LayerPointReader rightParentLayerPointReader) {
+    return new ArithmeticMultiplicationTransformer(
+        leftParentLayerPointReader, rightParentLayerPointReader);
+  }
+
+  @Override
   protected String operator() {
     return "*";
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/SubtractionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/SubtractionExpression.java
index 65bcf50..3eb918e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/SubtractionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/SubtractionExpression.java
@@ -20,6 +20,9 @@
 package org.apache.iotdb.db.query.expression.binary;
 
 import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticBinaryTransformer;
+import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticSubtractionTransformer;
 
 public class SubtractionExpression extends BinaryExpression {
 
@@ -28,6 +31,13 @@ public class SubtractionExpression extends BinaryExpression {
   }
 
   @Override
+  protected ArithmeticBinaryTransformer constructTransformer(
+      LayerPointReader leftParentLayerPointReader, LayerPointReader rightParentLayerPointReader) {
+    return new ArithmeticSubtractionTransformer(
+        leftParentLayerPointReader, rightParentLayerPointReader);
+  }
+
+  @Override
   protected String operator() {
     return "-";
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index 2bb05b0..4773ae4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -20,12 +20,15 @@
 package org.apache.iotdb.db.query.expression.unary;
 
 import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.strategy.optimizer.ConcatPathOptimizer;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
-import org.apache.iotdb.db.query.udf.core.builder.TransformerBuilder;
+import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
+import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
 
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -144,21 +147,20 @@ public class FunctionExpression extends Expression {
   }
 
   @Override
-  public void constructTransformerBuilder(
-      Map<Expression, TransformerBuilder> expressionTransformerBuilderMap) {
-    if (expressionTransformerBuilderMap.containsKey(this)) {
-      return;
-    }
+  public IntermediateLayer constructIntermediateLayer(
+      UDTFPlan udtfPlan,
+      InputLayer inputLayer,
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
+      throws QueryProcessException {
+    if (!expressionIntermediateLayerMap.containsKey(this)) {
+      for (Expression expression : expressions) {
+        expression.constructIntermediateLayer(udtfPlan, inputLayer, expressionIntermediateLayerMap);
+      }
 
-    for (Expression expression : expressions) {
-      expression.constructTransformerBuilder(expressionTransformerBuilderMap);
+      // todo!
     }
 
-    TransformerBuilder transformerBuilder = new TransformerBuilder(this);
-    for (Expression expression : expressions) {
-      transformerBuilder.addDependentExpression(expression);
-    }
-    expressionTransformerBuilderMap.put(this, transformerBuilder);
+    return expressionIntermediateLayerMap.get(this);
   }
 
   public List<PartialPath> getPaths() {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index 2c6fb45..355eca0 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -20,10 +20,14 @@
 package org.apache.iotdb.db.query.expression.unary;
 
 import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
-import org.apache.iotdb.db.query.udf.core.builder.TransformerBuilder;
+import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
+import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticNegationTransformer;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -72,17 +76,25 @@ public class NegationExpression extends Expression {
   }
 
   @Override
-  public void constructTransformerBuilder(
-      Map<Expression, TransformerBuilder> expressionTransformerBuilderMap) {
-    if (expressionTransformerBuilderMap.containsKey(this)) {
-      return;
-    }
+  public IntermediateLayer constructIntermediateLayer(
+      UDTFPlan udtfPlan,
+      InputLayer inputLayer,
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
+      throws QueryProcessException {
+    if (!expressionIntermediateLayerMap.containsKey(this)) {
+      IntermediateLayer parentIntermediateLayer =
+          expression.constructIntermediateLayer(
+              udtfPlan, inputLayer, expressionIntermediateLayerMap);
 
-    expression.constructTransformerBuilder(expressionTransformerBuilderMap);
+      expressionIntermediateLayerMap.put(
+          this,
+          new IntermediateLayer(
+              new ArithmeticNegationTransformer(parentIntermediateLayer.constructPointReader()),
+              -1,
+              -1));
+    }
 
-    TransformerBuilder transformerBuilder = new TransformerBuilder(this);
-    transformerBuilder.addDependentExpression(expression);
-    expressionTransformerBuilderMap.put(this, transformerBuilder);
+    return expressionIntermediateLayerMap.get(this);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index 478e214..208a760 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -20,10 +20,13 @@
 package org.apache.iotdb.db.query.expression.unary;
 
 import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
-import org.apache.iotdb.db.query.udf.core.builder.TransformerBuilder;
+import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
+import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.List;
@@ -68,11 +71,21 @@ public class TimeSeriesOperand extends Expression {
   }
 
   @Override
-  public void constructTransformerBuilder(
-      Map<Expression, TransformerBuilder> expressionTransformerBuilderMap) {
-    if (!expressionTransformerBuilderMap.containsKey(this)) {
-      expressionTransformerBuilderMap.put(this, new TransformerBuilder(this));
+  public IntermediateLayer constructIntermediateLayer(
+      UDTFPlan udtfPlan,
+      InputLayer inputLayer,
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
+      throws QueryProcessException {
+    if (!expressionIntermediateLayerMap.containsKey(this)) {
+      expressionIntermediateLayerMap.put(
+          this,
+          new IntermediateLayer(
+              inputLayer.constructPointReader(udtfPlan.getReaderIndex(path.getFullPath())),
+              -1,
+              -1));
     }
+
+    return expressionIntermediateLayerMap.get(this);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/builder/LayerPointReaderBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/builder/LayerPointReaderBuilder.java
deleted file mode 100644
index ed96576..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/builder/LayerPointReaderBuilder.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.query.udf.core.builder;
-
-public class LayerPointReaderBuilder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/builder/TransformerBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/builder/TransformerBuilder.java
deleted file mode 100644
index f365876..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/builder/TransformerBuilder.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.query.udf.core.builder;
-
-import org.apache.iotdb.db.query.expression.Expression;
-
-import java.util.LinkedList;
-import java.util.List;
-
-public class TransformerBuilder {
-
-  private final Expression expression;
-
-  private final List<Expression> dependencies;
-
-  public TransformerBuilder(Expression expression) {
-    this.expression = expression;
-    dependencies = new LinkedList<>();
-  }
-
-  public void addDependentExpression(Expression dependentExpression) {
-    dependencies.add(dependentExpression);
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/builder/DAGBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
similarity index 75%
rename from server/src/main/java/org/apache/iotdb/db/query/udf/core/builder/DAGBuilder.java
rename to server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
index d8dc64e..472ca2d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/builder/DAGBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
@@ -17,8 +17,9 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.query.udf.core.builder;
+package org.apache.iotdb.db.query.udf.core.layer;
 
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.expression.ResultColumn;
@@ -32,6 +33,7 @@ import java.util.Map;
 public class DAGBuilder {
 
   private final UDTFPlan udtfPlan;
+  private final InputLayer inputLayer;
 
   // input
   private final List<Expression> resultColumnExpressions;
@@ -40,38 +42,32 @@ public class DAGBuilder {
 
   // all result column expressions will be split into several sub-expressions, each expression has
   // its own transformer. different result column expressions may have the same sub-expressions,
-  // but they can share the same transformer. we cache the transformer builder here to make sure
-  // that only one transformer will be built for one expression.
-  private final Map<Expression, TransformerBuilder> expressionTransformerBuilderMap;
+  // but they can share the same transformer. we cache the transformer here to make sure that only
+  // one transformer will be built for one expression.
+  private final Map<Expression, IntermediateLayer> expressionIntermediateLayerMap;
 
-  public DAGBuilder(UDTFPlan udtfPlan) {
+  public DAGBuilder(UDTFPlan udtfPlan, InputLayer inputLayer) throws QueryProcessException {
     this.udtfPlan = udtfPlan;
+    this.inputLayer = inputLayer;
+
     resultColumnExpressions = new ArrayList<>();
     for (ResultColumn resultColumn : udtfPlan.getResultColumns()) {
       resultColumnExpressions.add(resultColumn.getExpression());
     }
     resultColumnTransformers = new Transformer[resultColumnExpressions.size()];
-    expressionTransformerBuilderMap = new HashMap<>();
 
-    build();
-  }
+    expressionIntermediateLayerMap = new HashMap<>();
 
-  public void build() {
-    constructTransformerBuilder();
-    buildTransformer();
-    buildDAG();
+    build();
   }
 
-  private void constructTransformerBuilder() {
+  public void build() throws QueryProcessException {
     for (Expression resultColumnExpression : resultColumnExpressions) {
-      resultColumnExpression.constructTransformerBuilder(expressionTransformerBuilderMap);
+      resultColumnExpression.constructIntermediateLayer(
+          udtfPlan, inputLayer, expressionIntermediateLayerMap);
     }
   }
 
-  private void buildTransformer() {}
-
-  private void buildDAG() {}
-
   public Transformer[] getResultColumnTransformers() {
     return resultColumnTransformers;
   }

[iotdb] 01/04: IntermediateLayer

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9731a895152d26da2acf561d23f9e26c064b753d
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed Sep 1 12:34:28 2021 +0800

    IntermediateLayer
---
 .../db/query/dataset/UDTFAlignByTimeDataSet.java   |   2 +
 .../apache/iotdb/db/query/dataset/UDTFDataSet.java |   2 +-
 .../db/query/dataset/UDTFNonAlignDataSet.java      |   1 +
 .../db/query/udf/core/builder/DAGBuilder.java      |   5 +
 .../udf/core/builder/LayerPointReaderBuilder.java  |  22 +++
 .../udf/core/{input => layer}/InputLayer.java      |   4 +-
 .../db/query/udf/core/layer/IntermediateLayer.java | 164 +++++++++++++++++++++
 .../udf/core/{input => layer}/SafetyLine.java      |   2 +-
 .../udf/core/transformer/UDFQueryTransformer.java  |   2 +-
 .../tv/ElasticSerializableTVList.java              |   5 +-
 .../ElasticSerializableTVListTest.java             |   2 +-
 11 files changed, 203 insertions(+), 8 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
index 41ea61a..6c065c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
@@ -192,6 +192,7 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
         --rowOffset;
       }
 
+      // todo: control upper bound here
       inputLayer.updateRowRecordListEvictionUpperBound();
     }
 
@@ -294,6 +295,7 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
       throw new IOException(e.getMessage());
     }
 
+    // todo: control upper bound here
     inputLayer.updateRowRecordListEvictionUpperBound();
 
     return rowRecord;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
index b7741ef..0f9ef3d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
@@ -36,7 +36,7 @@ import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
 import org.apache.iotdb.db.query.udf.api.customizer.strategy.AccessStrategy;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
-import org.apache.iotdb.db.query.udf.core.input.InputLayer;
+import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticAdditionTransformer;
 import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticDivisionTransformer;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java
index 7f90cae..25dd871 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java
@@ -113,6 +113,7 @@ public class UDTFNonAlignDataSet extends UDTFDataSet implements DirectNonAlignDa
       valueBufferList.add(timeValueByteBufferPair.right);
     }
 
+    // todo: control upper bound here
     inputLayer.updateRowRecordListEvictionUpperBound();
 
     tsQueryNonAlignDataSet.setTimeList(timeBufferList);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/builder/DAGBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/builder/DAGBuilder.java
index eaaaa14..d8dc64e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/builder/DAGBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/builder/DAGBuilder.java
@@ -31,6 +31,8 @@ import java.util.Map;
 
 public class DAGBuilder {
 
+  private final UDTFPlan udtfPlan;
+
   // input
   private final List<Expression> resultColumnExpressions;
   // output
@@ -43,12 +45,15 @@ public class DAGBuilder {
   private final Map<Expression, TransformerBuilder> expressionTransformerBuilderMap;
 
   public DAGBuilder(UDTFPlan udtfPlan) {
+    this.udtfPlan = udtfPlan;
     resultColumnExpressions = new ArrayList<>();
     for (ResultColumn resultColumn : udtfPlan.getResultColumns()) {
       resultColumnExpressions.add(resultColumn.getExpression());
     }
     resultColumnTransformers = new Transformer[resultColumnExpressions.size()];
     expressionTransformerBuilderMap = new HashMap<>();
+
+    build();
   }
 
   public void build() {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/builder/LayerPointReaderBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/builder/LayerPointReaderBuilder.java
new file mode 100644
index 0000000..ed96576
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/builder/LayerPointReaderBuilder.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.builder;
+
+public class LayerPointReaderBuilder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/input/InputLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/InputLayer.java
similarity index 99%
rename from server/src/main/java/org/apache/iotdb/db/query/udf/core/input/InputLayer.java
rename to server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/InputLayer.java
index de08f96..f16a210 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/input/InputLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/InputLayer.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.query.udf.core.input;
+package org.apache.iotdb.db.query.udf.core.layer;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
@@ -33,7 +33,7 @@ import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAc
 import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
 import org.apache.iotdb.db.query.udf.core.access.RowImpl;
 import org.apache.iotdb.db.query.udf.core.access.RowWindowImpl;
-import org.apache.iotdb.db.query.udf.core.input.SafetyLine.SafetyPile;
+import org.apache.iotdb.db.query.udf.core.layer.SafetyLine.SafetyPile;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
 import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
new file mode 100644
index 0000000..f4ccd03
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.layer;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.udf.core.layer.SafetyLine.SafetyPile;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.datastructure.tv.ElasticSerializableTVList;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import java.io.IOException;
+
+public class IntermediateLayer {
+
+  private static final int CACHE_BLOCK_SIZE = 1;
+
+  private final TSDataType dataType;
+  private final LayerPointReader parentLayerPointReader;
+  private final ElasticSerializableTVList tvList;
+  private final SafetyLine safetyLine;
+
+  public IntermediateLayer(
+      LayerPointReader parentLayerPointReader, long queryId, float memoryBudgetInMB)
+      throws QueryProcessException {
+    this.parentLayerPointReader = parentLayerPointReader;
+    dataType = parentLayerPointReader.getDataType();
+    tvList =
+        ElasticSerializableTVList.newElasticSerializableTVList(
+            dataType, queryId, memoryBudgetInMB, CACHE_BLOCK_SIZE);
+    safetyLine = new SafetyLine();
+  }
+
+  public LayerPointReader constructPointReader() {
+
+    return new LayerPointReader() {
+
+      private final SafetyPile safetyPile = safetyLine.addSafetyPile();
+
+      private boolean hasCached = false;
+      private int currentPointIndex = -1;
+
+      @Override
+      public boolean next() throws QueryProcessException, IOException {
+        if (hasCached) {
+          return true;
+        }
+
+        if (currentPointIndex < tvList.size() - 1) {
+          ++currentPointIndex;
+          hasCached = true;
+        }
+
+        // tvList.size() - 1 <= currentPointIndex
+        if (!hasCached && parentLayerPointReader.next()) {
+          cachePoint();
+          parentLayerPointReader.readyForNext();
+
+          ++currentPointIndex;
+          hasCached = true;
+        }
+
+        return hasCached;
+      }
+
+      private void cachePoint() throws IOException, QueryProcessException {
+        switch (dataType) {
+          case INT32:
+            tvList.putInt(
+                parentLayerPointReader.currentTime(), parentLayerPointReader.currentInt());
+            break;
+          case INT64:
+            tvList.putLong(
+                parentLayerPointReader.currentTime(), parentLayerPointReader.currentLong());
+            break;
+          case FLOAT:
+            tvList.putFloat(
+                parentLayerPointReader.currentTime(), parentLayerPointReader.currentFloat());
+            break;
+          case DOUBLE:
+            tvList.putDouble(
+                parentLayerPointReader.currentTime(), parentLayerPointReader.currentDouble());
+            break;
+          case BOOLEAN:
+            tvList.putBoolean(
+                parentLayerPointReader.currentTime(), parentLayerPointReader.currentBoolean());
+            break;
+          case TEXT:
+            tvList.putBinary(
+                parentLayerPointReader.currentTime(), parentLayerPointReader.currentBinary());
+            break;
+          default:
+            throw new UnsupportedOperationException(dataType.name());
+        }
+      }
+
+      @Override
+      public void readyForNext() {
+        hasCached = false;
+
+        safetyPile.moveForwardTo(currentPointIndex + 1);
+        // todo: reduce the update rate
+        tvList.setEvictionUpperBound(safetyLine.getSafetyLine());
+      }
+
+      @Override
+      public TSDataType getDataType() {
+        return dataType;
+      }
+
+      @Override
+      public long currentTime() throws IOException {
+        return tvList.getTime(currentPointIndex);
+      }
+
+      @Override
+      public int currentInt() throws IOException {
+        return tvList.getInt(currentPointIndex);
+      }
+
+      @Override
+      public long currentLong() throws IOException {
+        return tvList.getLong(currentPointIndex);
+      }
+
+      @Override
+      public float currentFloat() throws IOException {
+        return tvList.getFloat(currentPointIndex);
+      }
+
+      @Override
+      public double currentDouble() throws IOException {
+        return tvList.getDouble(currentPointIndex);
+      }
+
+      @Override
+      public boolean currentBoolean() throws IOException {
+        return tvList.getBoolean(currentPointIndex);
+      }
+
+      @Override
+      public Binary currentBinary() throws IOException {
+        return tvList.getBinary(currentPointIndex);
+      }
+    };
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/input/SafetyLine.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SafetyLine.java
similarity index 97%
rename from server/src/main/java/org/apache/iotdb/db/query/udf/core/input/SafetyLine.java
rename to server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SafetyLine.java
index 7f21c90..35be04d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/input/SafetyLine.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SafetyLine.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.query.udf.core.input;
+package org.apache.iotdb.db.query.udf.core.layer;
 
 public class SafetyLine {
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/UDFQueryTransformer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/UDFQueryTransformer.java
index 7bb5b60..5254a50 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/UDFQueryTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/UDFQueryTransformer.java
@@ -39,7 +39,7 @@ public abstract class UDFQueryTransformer extends Transformer {
   protected UDFQueryTransformer(UDTFExecutor executor) {
     this.executor = executor;
     udfOutputDataType = executor.getConfigurations().getOutputDataType();
-    udfOutput = executor.getCollector().getPointReaderUsingEvictionStrategy();
+    udfOutput = executor.getCollector().constructPointReaderUsingTrivialEvictionStrategy();
     terminated = false;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/datastructure/tv/ElasticSerializableTVList.java b/server/src/main/java/org/apache/iotdb/db/query/udf/datastructure/tv/ElasticSerializableTVList.java
index 6dd0a9f..8d36911 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/datastructure/tv/ElasticSerializableTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/datastructure/tv/ElasticSerializableTVList.java
@@ -224,7 +224,8 @@ public class ElasticSerializableTVList implements PointCollector {
     }
   }
 
-  public LayerPointReader getPointReaderUsingEvictionStrategy() {
+  // todo: remove it
+  public LayerPointReader constructPointReaderUsingTrivialEvictionStrategy() {
 
     return new LayerPointReader() {
 
@@ -290,7 +291,7 @@ public class ElasticSerializableTVList implements PointCollector {
    * @param evictionUpperBound the index of the first element that cannot be evicted. in other
    *     words, elements whose index are <b>less than</b> the evictionUpperBound can be evicted.
    */
-  private void setEvictionUpperBound(int evictionUpperBound) {
+  public void setEvictionUpperBound(int evictionUpperBound) {
     this.evictionUpperBound = evictionUpperBound;
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/query/udf/datastructure/ElasticSerializableTVListTest.java b/server/src/test/java/org/apache/iotdb/db/query/udf/datastructure/ElasticSerializableTVListTest.java
index db287b4..280e340 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/udf/datastructure/ElasticSerializableTVListTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/udf/datastructure/ElasticSerializableTVListTest.java
@@ -202,7 +202,7 @@ public class ElasticSerializableTVListTest extends SerializableListTest {
                 generateRandomString(
                     byteLengthMin + random.nextInt(byteLengthMax - byteLengthMin))));
       }
-      LayerPointReader reader = tvList.getPointReaderUsingEvictionStrategy();
+      LayerPointReader reader = tvList.constructPointReaderUsingTrivialEvictionStrategy();
       while (reader.next()) {
         int length = reader.currentBinary().getLength();
         assertTrue(byteLengthMin <= length && length < byteLengthMax);

[iotdb] 03/04: introduce MultiInputIntermediateLayer

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit de1c509c853145f8217f7ef6afee03fd9c21bfdd
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed Sep 1 18:50:33 2021 +0800

    introduce MultiInputIntermediateLayer
---
 .../query/expression/binary/BinaryExpression.java  |   3 +-
 .../query/expression/unary/FunctionExpression.java |  11 +-
 .../query/expression/unary/NegationExpression.java |   3 +-
 .../query/expression/unary/TimeSeriesOperand.java  |   3 +-
 .../db/query/udf/core/layer/IntermediateLayer.java | 141 +--------------------
 .../core/layer/MultiInputIntermediateLayer.java    |  35 +++++
 ...ayer.java => SingleInputIntermediateLayer.java} |   5 +-
 7 files changed, 55 insertions(+), 146 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index 4d52793..69dd13c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputIntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticBinaryTransformer;
 
@@ -125,7 +126,7 @@ public abstract class BinaryExpression extends Expression {
 
       expressionIntermediateLayerMap.put(
           this,
-          new IntermediateLayer(
+          new SingleInputIntermediateLayer(
               constructTransformer(
                   leftParentIntermediateLayer.constructPointReader(),
                   rightParentIntermediateLayer.constructPointReader()),
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index 4773ae4..7f55995 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -29,6 +29,8 @@ import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.MultiInputIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -153,11 +155,16 @@ public class FunctionExpression extends Expression {
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
       throws QueryProcessException {
     if (!expressionIntermediateLayerMap.containsKey(this)) {
+      List<LayerPointReader> parentLayerPointReaders = new ArrayList<>();
       for (Expression expression : expressions) {
-        expression.constructIntermediateLayer(udtfPlan, inputLayer, expressionIntermediateLayerMap);
+        parentLayerPointReaders.add(
+            expression
+                .constructIntermediateLayer(udtfPlan, inputLayer, expressionIntermediateLayerMap)
+                .constructPointReader());
       }
 
-      // todo!
+      expressionIntermediateLayerMap.put(
+          this, new MultiInputIntermediateLayer(parentLayerPointReaders, -1, -1));
     }
 
     return expressionIntermediateLayerMap.get(this);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index 355eca0..813ee5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputIntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticNegationTransformer;
 
 import java.util.ArrayList;
@@ -88,7 +89,7 @@ public class NegationExpression extends Expression {
 
       expressionIntermediateLayerMap.put(
           this,
-          new IntermediateLayer(
+          new SingleInputIntermediateLayer(
               new ArithmeticNegationTransformer(parentIntermediateLayer.constructPointReader()),
               -1,
               -1));
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index 208a760..c1e6eb8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputIntermediateLayer;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.List;
@@ -79,7 +80,7 @@ public class TimeSeriesOperand extends Expression {
     if (!expressionIntermediateLayerMap.containsKey(this)) {
       expressionIntermediateLayerMap.put(
           this,
-          new IntermediateLayer(
+          new SingleInputIntermediateLayer(
               inputLayer.constructPointReader(udtfPlan.getReaderIndex(path.getFullPath())),
               -1,
               -1));
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
index f4ccd03..cbac211 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
@@ -19,146 +19,9 @@
 
 package org.apache.iotdb.db.query.udf.core.layer;
 
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.query.udf.core.layer.SafetyLine.SafetyPile;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
-import org.apache.iotdb.db.query.udf.datastructure.tv.ElasticSerializableTVList;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Binary;
 
-import java.io.IOException;
+public interface IntermediateLayer {
 
-public class IntermediateLayer {
-
-  private static final int CACHE_BLOCK_SIZE = 1;
-
-  private final TSDataType dataType;
-  private final LayerPointReader parentLayerPointReader;
-  private final ElasticSerializableTVList tvList;
-  private final SafetyLine safetyLine;
-
-  public IntermediateLayer(
-      LayerPointReader parentLayerPointReader, long queryId, float memoryBudgetInMB)
-      throws QueryProcessException {
-    this.parentLayerPointReader = parentLayerPointReader;
-    dataType = parentLayerPointReader.getDataType();
-    tvList =
-        ElasticSerializableTVList.newElasticSerializableTVList(
-            dataType, queryId, memoryBudgetInMB, CACHE_BLOCK_SIZE);
-    safetyLine = new SafetyLine();
-  }
-
-  public LayerPointReader constructPointReader() {
-
-    return new LayerPointReader() {
-
-      private final SafetyPile safetyPile = safetyLine.addSafetyPile();
-
-      private boolean hasCached = false;
-      private int currentPointIndex = -1;
-
-      @Override
-      public boolean next() throws QueryProcessException, IOException {
-        if (hasCached) {
-          return true;
-        }
-
-        if (currentPointIndex < tvList.size() - 1) {
-          ++currentPointIndex;
-          hasCached = true;
-        }
-
-        // tvList.size() - 1 <= currentPointIndex
-        if (!hasCached && parentLayerPointReader.next()) {
-          cachePoint();
-          parentLayerPointReader.readyForNext();
-
-          ++currentPointIndex;
-          hasCached = true;
-        }
-
-        return hasCached;
-      }
-
-      private void cachePoint() throws IOException, QueryProcessException {
-        switch (dataType) {
-          case INT32:
-            tvList.putInt(
-                parentLayerPointReader.currentTime(), parentLayerPointReader.currentInt());
-            break;
-          case INT64:
-            tvList.putLong(
-                parentLayerPointReader.currentTime(), parentLayerPointReader.currentLong());
-            break;
-          case FLOAT:
-            tvList.putFloat(
-                parentLayerPointReader.currentTime(), parentLayerPointReader.currentFloat());
-            break;
-          case DOUBLE:
-            tvList.putDouble(
-                parentLayerPointReader.currentTime(), parentLayerPointReader.currentDouble());
-            break;
-          case BOOLEAN:
-            tvList.putBoolean(
-                parentLayerPointReader.currentTime(), parentLayerPointReader.currentBoolean());
-            break;
-          case TEXT:
-            tvList.putBinary(
-                parentLayerPointReader.currentTime(), parentLayerPointReader.currentBinary());
-            break;
-          default:
-            throw new UnsupportedOperationException(dataType.name());
-        }
-      }
-
-      @Override
-      public void readyForNext() {
-        hasCached = false;
-
-        safetyPile.moveForwardTo(currentPointIndex + 1);
-        // todo: reduce the update rate
-        tvList.setEvictionUpperBound(safetyLine.getSafetyLine());
-      }
-
-      @Override
-      public TSDataType getDataType() {
-        return dataType;
-      }
-
-      @Override
-      public long currentTime() throws IOException {
-        return tvList.getTime(currentPointIndex);
-      }
-
-      @Override
-      public int currentInt() throws IOException {
-        return tvList.getInt(currentPointIndex);
-      }
-
-      @Override
-      public long currentLong() throws IOException {
-        return tvList.getLong(currentPointIndex);
-      }
-
-      @Override
-      public float currentFloat() throws IOException {
-        return tvList.getFloat(currentPointIndex);
-      }
-
-      @Override
-      public double currentDouble() throws IOException {
-        return tvList.getDouble(currentPointIndex);
-      }
-
-      @Override
-      public boolean currentBoolean() throws IOException {
-        return tvList.getBoolean(currentPointIndex);
-      }
-
-      @Override
-      public Binary currentBinary() throws IOException {
-        return tvList.getBinary(currentPointIndex);
-      }
-    };
-  }
+  LayerPointReader constructPointReader();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputIntermediateLayer.java
new file mode 100644
index 0000000..260c308
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputIntermediateLayer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.layer;
+
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+
+import java.util.List;
+
+public class MultiInputIntermediateLayer implements IntermediateLayer {
+
+  public MultiInputIntermediateLayer(
+      List<LayerPointReader> parentLayerPointReaders, long queryId, float memoryBudgetInMB) {}
+
+  @Override
+  public LayerPointReader constructPointReader() {
+    return null;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputIntermediateLayer.java
similarity index 97%
copy from server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
copy to server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputIntermediateLayer.java
index f4ccd03..04697c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputIntermediateLayer.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.tsfile.utils.Binary;
 
 import java.io.IOException;
 
-public class IntermediateLayer {
+public class SingleInputIntermediateLayer implements IntermediateLayer {
 
   private static final int CACHE_BLOCK_SIZE = 1;
 
@@ -37,7 +37,7 @@ public class IntermediateLayer {
   private final ElasticSerializableTVList tvList;
   private final SafetyLine safetyLine;
 
-  public IntermediateLayer(
+  public SingleInputIntermediateLayer(
       LayerPointReader parentLayerPointReader, long queryId, float memoryBudgetInMB)
       throws QueryProcessException {
     this.parentLayerPointReader = parentLayerPointReader;
@@ -48,6 +48,7 @@ public class IntermediateLayer {
     safetyLine = new SafetyLine();
   }
 
+  @Override
   public LayerPointReader constructPointReader() {
 
     return new LayerPointReader() {