You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/09/08 10:14:34 UTC

[iotdb] 02/02: make layer connections

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1c6637dd523b4fcfff6aa4853033ea00ae0eb6c3
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed Sep 8 18:13:49 2021 +0800

    make layer connections
---
 .../apache/iotdb/db/qp/physical/crud/UDTFPlan.java |  23 +++--
 .../iotdb/db/query/expression/Expression.java      |  12 ++-
 .../query/expression/binary/BinaryExpression.java  |  69 ++++++++++---
 .../query/expression/unary/FunctionExpression.java | 115 ++++++++++++++++++---
 .../query/expression/unary/NegationExpression.java |  55 +++++++---
 .../query/expression/unary/TimeSeriesOperand.java  |  40 +++++--
 .../iotdb/db/query/udf/core/layer/DAGBuilder.java  |  26 ++++-
 .../query/udf/core/layer/LayerMemoryAssigner.java  |  67 ++++++++++++
 8 files changed, 340 insertions(+), 67 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
index aa20652..792802f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
@@ -51,7 +51,7 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
 
   protected final ZoneId zoneId;
 
-  protected Map<String, UDTFExecutor> columnName2Executor = new HashMap<>();
+  protected Map<String, UDTFExecutor> expressionName2Executor = new HashMap<>();
   protected Map<Integer, UDTFExecutor> originalOutputColumnIndex2Executor = new HashMap<>();
 
   protected Map<Integer, Integer> datasetOutputIndexToResultColumnIndex = new HashMap<>();
@@ -114,21 +114,18 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
   public void constructUdfExecutors(List<ResultColumn> resultColumns) {
     for (int i = 0; i < resultColumns.size(); ++i) {
       Expression expression = resultColumns.get(i).getExpression();
-      if (!(expression instanceof FunctionExpression)) {
-        continue;
+      expression.constructUdfExecutors(expressionName2Executor, zoneId);
+      if (expression instanceof FunctionExpression) {
+        originalOutputColumnIndex2Executor.put(
+            i, expressionName2Executor.get(expression.toString()));
       }
-
-      String columnName = expression.toString();
-      columnName2Executor.computeIfAbsent(
-          columnName, k -> new UDTFExecutor((FunctionExpression) expression, zoneId));
-      originalOutputColumnIndex2Executor.put(i, columnName2Executor.get(columnName));
     }
   }
 
   @Override
   public void initializeUdfExecutors(long queryId, float collectorMemoryBudgetInMB)
       throws QueryProcessException {
-    Collection<UDTFExecutor> executors = columnName2Executor.values();
+    Collection<UDTFExecutor> executors = expressionName2Executor.values();
     collectorMemoryBudgetInMB /= executors.size();
 
     UDFRegistrationService.getInstance().acquireRegistrationLock();
@@ -146,7 +143,7 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
   @Override
   public void finalizeUDFExecutors(long queryId) {
     try {
-      for (UDTFExecutor executor : columnName2Executor.values()) {
+      for (UDTFExecutor executor : expressionName2Executor.values()) {
         executor.beforeDestroy();
       }
     } finally {
@@ -184,10 +181,14 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
   }
 
   public UDTFExecutor getExecutorByDataSetOutputColumnIndex(int datasetOutputIndex) {
-    return columnName2Executor.get(
+    return expressionName2Executor.get(
         getResultColumnByDatasetOutputIndex(datasetOutputIndex).getResultColumnName());
   }
 
+  public UDTFExecutor getExecutorByFunctionExpression(FunctionExpression functionExpression) {
+    return expressionName2Executor.get(functionExpression.getExpressionString());
+  }
+
   public String getRawQueryColumnNameByDatasetOutputColumnIndex(int datasetOutputIndex) {
     return getResultColumnByDatasetOutputIndex(datasetOutputIndex).getResultColumnName();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index 10b4117..3d4ce07 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -24,10 +24,13 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
 import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
 
 import java.io.IOException;
+import java.time.ZoneId;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -52,10 +55,17 @@ public abstract class Expression {
 
   public abstract void collectPaths(Set<PartialPath> pathSet);
 
+  public abstract void constructUdfExecutors(
+      Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId);
+
+  public abstract void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner);
+
   public abstract IntermediateLayer constructIntermediateLayer(
+      int queryId,
       UDTFPlan udtfPlan,
       UDFLayer rawTimeSeriesInputLayer,
-      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+      LayerMemoryAssigner memoryAssigner)
       throws QueryProcessException, IOException;
 
   public String getExpressionString() {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index 0ae46e0..383a9a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -25,11 +25,18 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnMultiReferenceIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReferenceIntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticBinaryTransformer;
+import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
 
+import java.io.IOException;
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -110,27 +117,55 @@ public abstract class BinaryExpression extends Expression {
   }
 
   @Override
+  public void constructUdfExecutors(
+      Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
+    leftExpression.constructUdfExecutors(expressionName2Executor, zoneId);
+    rightExpression.constructUdfExecutors(expressionName2Executor, zoneId);
+  }
+
+  @Override
+  public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
+    memoryAssigner.increaseExpressionReference(leftExpression);
+    memoryAssigner.increaseExpressionReference(rightExpression);
+  }
+
+  @Override
   public IntermediateLayer constructIntermediateLayer(
+      int queryId,
       UDTFPlan udtfPlan,
       UDFLayer rawTimeSeriesInputLayer,
-      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
-      throws QueryProcessException {
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+      LayerMemoryAssigner memoryAssigner)
+      throws QueryProcessException, IOException {
     if (!expressionIntermediateLayerMap.containsKey(this)) {
-      //      IntermediateLayer leftParentIntermediateLayer =
-      //          leftExpression.constructIntermediateLayer(
-      //              udtfPlan, rawTimeSeriesInputLayer, expressionIntermediateLayerMap);
-      //      IntermediateLayer rightParentIntermediateLayer =
-      //          rightExpression.constructIntermediateLayer(
-      //              udtfPlan, rawTimeSeriesInputLayer, expressionIntermediateLayerMap);
-      //
-      //      expressionIntermediateLayerMap.put(
-      //          this,
-      //          new SingleInputColumnMultiReferenceIntermediateLayer(
-      //              constructTransformer(
-      //                  leftParentIntermediateLayer.constructPointReader(),
-      //                  rightParentIntermediateLayer.constructPointReader()),
-      //              -1,
-      //              -1));
+      float memoryBudgetInMB = memoryAssigner.assign();
+
+      IntermediateLayer leftParentIntermediateLayer =
+          leftExpression.constructIntermediateLayer(
+              queryId,
+              udtfPlan,
+              rawTimeSeriesInputLayer,
+              expressionIntermediateLayerMap,
+              memoryAssigner);
+      IntermediateLayer rightParentIntermediateLayer =
+          rightExpression.constructIntermediateLayer(
+              queryId,
+              udtfPlan,
+              rawTimeSeriesInputLayer,
+              expressionIntermediateLayerMap,
+              memoryAssigner);
+      Transformer transformer =
+          constructTransformer(
+              leftParentIntermediateLayer.constructPointReader(),
+              rightParentIntermediateLayer.constructPointReader());
+
+      expressionIntermediateLayerMap.put(
+          this,
+          memoryAssigner.getReference(this) == 1
+              ? new SingleInputColumnSingleReferenceIntermediateLayer(
+                  queryId, memoryBudgetInMB, transformer)
+              : new SingleInputColumnMultiReferenceIntermediateLayer(
+                  queryId, memoryBudgetInMB, transformer));
     }
 
     return expressionIntermediateLayerMap.get(this);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index b1d0efb..c40a619 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -27,10 +27,21 @@ import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.strategy.optimizer.ConcatPathOptimizer;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.AccessStrategy;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
+import org.apache.iotdb.db.query.udf.core.layer.MultiInputColumnIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnMultiReferenceIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReferenceIntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
+import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
+import org.apache.iotdb.db.query.udf.core.transformer.UDFQueryRowTransformer;
+import org.apache.iotdb.db.query.udf.core.transformer.UDFQueryRowWindowTransformer;
+import org.apache.iotdb.db.query.udf.core.transformer.UDFQueryTransformer;
 
 import java.io.IOException;
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -38,6 +49,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public class FunctionExpression extends Expression {
 
@@ -148,28 +160,107 @@ public class FunctionExpression extends Expression {
   }
 
   @Override
+  public void constructUdfExecutors(
+      Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
+    String expressionString = getExpressionString();
+    if (expressionName2Executor.containsKey(expressionString)) {
+      return;
+    }
+
+    for (Expression expression : expressions) {
+      expression.constructUdfExecutors(expressionName2Executor, zoneId);
+    }
+    expressionName2Executor.put(expressionString, new UDTFExecutor(this, zoneId));
+  }
+
+  @Override
+  public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
+    for (Expression expression : expressions) {
+      memoryAssigner.increaseExpressionReference(expression);
+    }
+  }
+
+  @Override
   public IntermediateLayer constructIntermediateLayer(
+      int queryId,
       UDTFPlan udtfPlan,
       UDFLayer rawTimeSeriesInputLayer,
-      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+      LayerMemoryAssigner memoryAssigner)
       throws QueryProcessException, IOException {
     if (!expressionIntermediateLayerMap.containsKey(this)) {
-      //      List<LayerPointReader> parentLayerPointReaders = new ArrayList<>();
-      //      for (Expression expression : expressions) {
-      //        parentLayerPointReaders.add(
-      //            expression
-      //                .constructIntermediateLayer(
-      //                    udtfPlan, rawTimeSeriesInputLayer, expressionIntermediateLayerMap)
-      //                .constructPointReader());
-      //      }
-      //
-      //      expressionIntermediateLayerMap.put(
-      //          this, new MultiInputColumnIntermediateLayer(-1, -1, parentLayerPointReaders));
+      float memoryBudgetInMB = memoryAssigner.assign();
+
+      IntermediateLayer udfInputIntermediateLayer =
+          constructUdfInputIntermediateLayer(
+              queryId,
+              udtfPlan,
+              rawTimeSeriesInputLayer,
+              expressionIntermediateLayerMap,
+              memoryAssigner);
+      Transformer transformer =
+          constructUdfTransformer(udtfPlan, memoryAssigner, udfInputIntermediateLayer);
+
+      expressionIntermediateLayerMap.put(
+          this,
+          memoryAssigner.getReference(this) == 1
+              ? new SingleInputColumnSingleReferenceIntermediateLayer(
+                  queryId, memoryBudgetInMB, transformer)
+              : new SingleInputColumnMultiReferenceIntermediateLayer(
+                  queryId, memoryBudgetInMB, transformer));
     }
 
     return expressionIntermediateLayerMap.get(this);
   }
 
+  private IntermediateLayer constructUdfInputIntermediateLayer(
+      int queryId,
+      UDTFPlan udtfPlan,
+      UDFLayer rawTimeSeriesInputLayer,
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+      LayerMemoryAssigner memoryAssigner)
+      throws QueryProcessException, IOException {
+    List<IntermediateLayer> intermediateLayers = new ArrayList<>();
+    for (Expression expression : expressions) {
+      intermediateLayers.add(
+          expression.constructIntermediateLayer(
+              queryId,
+              udtfPlan,
+              rawTimeSeriesInputLayer,
+              expressionIntermediateLayerMap,
+              memoryAssigner));
+    }
+    return intermediateLayers.size() == 1
+        ? intermediateLayers.get(0)
+        : new MultiInputColumnIntermediateLayer(
+            queryId,
+            memoryAssigner.assign(),
+            intermediateLayers.stream()
+                .map(IntermediateLayer::constructPointReader)
+                .collect(Collectors.toList()));
+  }
+
+  private UDFQueryTransformer constructUdfTransformer(
+      UDTFPlan udtfPlan,
+      LayerMemoryAssigner memoryAssigner,
+      IntermediateLayer udfInputIntermediateLayer)
+      throws QueryProcessException, IOException {
+    UDTFExecutor executor = udtfPlan.getExecutorByFunctionExpression(this);
+    AccessStrategy accessStrategy = executor.getConfigurations().getAccessStrategy();
+    switch (accessStrategy.getAccessStrategyType()) {
+      case ROW_BY_ROW:
+        return new UDFQueryRowTransformer(udfInputIntermediateLayer.constructRowReader(), executor);
+      case SLIDING_SIZE_WINDOW:
+      case SLIDING_TIME_WINDOW:
+        return new UDFQueryRowWindowTransformer(
+            udfInputIntermediateLayer.constructRowWindowReader(
+                accessStrategy, memoryAssigner.assign()),
+            executor);
+      default:
+        throw new UnsupportedOperationException("Unsupported transformer access strategy");
+    }
+  }
+
   public List<PartialPath> getPaths() {
     if (paths == null) {
       paths = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index 8a13b27..eade5be 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -25,9 +25,17 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnMultiReferenceIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReferenceIntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
+import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticNegationTransformer;
+import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
 
+import java.io.IOException;
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -75,23 +83,44 @@ public class NegationExpression extends Expression {
   }
 
   @Override
+  public void constructUdfExecutors(
+      Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
+    expression.constructUdfExecutors(expressionName2Executor, zoneId);
+  }
+
+  @Override
+  public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
+    memoryAssigner.increaseExpressionReference(this);
+  }
+
+  @Override
   public IntermediateLayer constructIntermediateLayer(
+      int queryId,
       UDTFPlan udtfPlan,
       UDFLayer rawTimeSeriesInputLayer,
-      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
-      throws QueryProcessException {
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+      LayerMemoryAssigner memoryAssigner)
+      throws QueryProcessException, IOException {
     if (!expressionIntermediateLayerMap.containsKey(this)) {
-      //      IntermediateLayer parentIntermediateLayer =
-      //          expression.constructIntermediateLayer(
-      //              udtfPlan, rawTimeSeriesInputLayer, expressionIntermediateLayerMap);
-      //
-      //      expressionIntermediateLayerMap.put(
-      //          this,
-      //          new SingleInputColumnMultiReferenceIntermediateLayer(
-      //              new
-      // ArithmeticNegationTransformer(parentIntermediateLayer.constructPointReader()),
-      //              -1,
-      //              -1));
+      float memoryBudgetInMB = memoryAssigner.assign();
+
+      IntermediateLayer parentLayerPointReader =
+          expression.constructIntermediateLayer(
+              queryId,
+              udtfPlan,
+              rawTimeSeriesInputLayer,
+              expressionIntermediateLayerMap,
+              memoryAssigner);
+      Transformer transformer =
+          new ArithmeticNegationTransformer(parentLayerPointReader.constructPointReader());
+
+      expressionIntermediateLayerMap.put(
+          this,
+          memoryAssigner.getReference(this) == 1
+              ? new SingleInputColumnSingleReferenceIntermediateLayer(
+                  queryId, memoryBudgetInMB, transformer)
+              : new SingleInputColumnMultiReferenceIntermediateLayer(
+                  queryId, memoryBudgetInMB, transformer));
     }
 
     return expressionIntermediateLayerMap.get(this);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index a8db711..6ad0ac3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -25,10 +25,16 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnMultiReferenceIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReferenceIntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
+import java.time.ZoneId;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -71,19 +77,37 @@ public class TimeSeriesOperand extends Expression {
   }
 
   @Override
+  public void constructUdfExecutors(
+      Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
+    // nothing to do
+  }
+
+  @Override
+  public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
+    memoryAssigner.increaseExpressionReference(this);
+  }
+
+  @Override
   public IntermediateLayer constructIntermediateLayer(
+      int queryId,
       UDTFPlan udtfPlan,
       UDFLayer rawTimeSeriesInputLayer,
-      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+      LayerMemoryAssigner memoryAssigner)
       throws QueryProcessException {
     if (!expressionIntermediateLayerMap.containsKey(this)) {
-      //      expressionIntermediateLayerMap.put(
-      //          this,
-      //          new SingleInputColumnMultiReferenceIntermediateLayer(
-      //              rawTimeSeriesInputLayer.constructPointReader(
-      //                  udtfPlan.getReaderIndex(path.getFullPath())),
-      //              -1,
-      //              -1));
+      float memoryBudgetInMB = memoryAssigner.assign();
+
+      LayerPointReader parentLayerPointReader =
+          rawTimeSeriesInputLayer.constructPointReader(udtfPlan.getReaderIndex(path.getFullPath()));
+
+      expressionIntermediateLayerMap.put(
+          this,
+          memoryAssigner.getReference(this) == 1
+              ? new SingleInputColumnSingleReferenceIntermediateLayer(
+                  queryId, memoryBudgetInMB, parentLayerPointReader)
+              : new SingleInputColumnMultiReferenceIntermediateLayer(
+                  queryId, memoryBudgetInMB, parentLayerPointReader));
     }
 
     return expressionIntermediateLayerMap.get(this);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
index 5f398b4..3e021e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
@@ -33,6 +33,7 @@ import java.util.Map;
 
 public class DAGBuilder {
 
+  private final int queryId;
   private final UDTFPlan udtfPlan;
   private final UDFLayer rawTimeSeriesInputLayer;
 
@@ -41,14 +42,16 @@ public class DAGBuilder {
   // output
   private final LayerPointReader[] resultColumnPointReaders;
 
+  private final LayerMemoryAssigner memoryAssigner;
+
   // all result column expressions will be split into several sub-expressions, each expression has
   // its own result point reader. different result column expressions may have the same
   // sub-expressions, but they can share the same point reader. we cache the point reader here to
   // make sure that only one point reader will be built for one expression.
   private final Map<Expression, IntermediateLayer> expressionIntermediateLayerMap;
 
-  public DAGBuilder(UDTFPlan udtfPlan, UDFLayer inputLayer)
-      throws QueryProcessException, IOException {
+  public DAGBuilder(int queryId, UDTFPlan udtfPlan, UDFLayer inputLayer, float memoryBudgetInMB) {
+    this.queryId = queryId;
     this.udtfPlan = udtfPlan;
     this.rawTimeSeriesInputLayer = inputLayer;
 
@@ -58,18 +61,31 @@ public class DAGBuilder {
     }
     resultColumnPointReaders = new LayerPointReader[resultColumnExpressions.size()];
 
+    memoryAssigner = new LayerMemoryAssigner(memoryBudgetInMB);
+
     expressionIntermediateLayerMap = new HashMap<>();
+  }
 
-    build();
+  public DAGBuilder buildLayerMemoryAssigner() {
+    for (Expression expression : resultColumnExpressions) {
+      memoryAssigner.increaseExpressionReference(expression);
+      expression.updateStatisticsForMemoryAssigner(memoryAssigner);
+    }
+    memoryAssigner.build();
+    return this;
   }
 
-  public DAGBuilder build() throws QueryProcessException, IOException {
+  public DAGBuilder buildResultColumnPointReaders() throws QueryProcessException, IOException {
     for (int i = 0; i < resultColumnExpressions.size(); ++i) {
       resultColumnPointReaders[i] =
           resultColumnExpressions
               .get(i)
               .constructIntermediateLayer(
-                  udtfPlan, rawTimeSeriesInputLayer, expressionIntermediateLayerMap)
+                  queryId,
+                  udtfPlan,
+                  rawTimeSeriesInputLayer,
+                  expressionIntermediateLayerMap,
+                  memoryAssigner)
               .constructPointReader();
     }
     return this;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerMemoryAssigner.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerMemoryAssigner.java
new file mode 100644
index 0000000..afa2298
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerMemoryAssigner.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.layer;
+
+import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class LayerMemoryAssigner {
+
+  private final float memoryBudgetInMB;
+
+  private final Map<Expression, Integer> expressionReferenceCount;
+
+  private float memoryBudgetForSingleReference;
+
+  public LayerMemoryAssigner(float memoryBudgetInMB) {
+    expressionReferenceCount = new HashMap<>();
+    this.memoryBudgetInMB = memoryBudgetInMB;
+  }
+
+  public void increaseExpressionReference(Expression expression) {
+    if (!expressionReferenceCount.containsKey(expression)) {
+      expressionReferenceCount.put(expression, 1);
+    }
+    expressionReferenceCount.put(expression, 1 + expressionReferenceCount.get(expression));
+  }
+
+  public void build() {
+    int memoryPartitions = 0;
+    for (Entry<Expression, Integer> expressionReferenceEntry :
+        expressionReferenceCount.entrySet()) {
+      memoryPartitions +=
+          expressionReferenceEntry.getValue()
+              * (expressionReferenceEntry.getKey() instanceof FunctionExpression ? 2 : 1);
+    }
+    memoryBudgetForSingleReference = memoryBudgetInMB / memoryPartitions;
+  }
+
+  public int getReference(Expression expression) {
+    return expressionReferenceCount.get(expression);
+  }
+
+  public float assign() {
+    return memoryBudgetForSingleReference;
+  }
+}