You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/09/09 07:18:42 UTC

[iotdb] branch nested-operations updated: clean RawQueryInputLayer

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/nested-operations by this push:
     new 549f2e1  clean RawQueryInputLayer
549f2e1 is described below

commit 549f2e1da68e1602aa5450840ea4ea79a2fd6c26
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu Sep 9 15:18:02 2021 +0800

    clean RawQueryInputLayer
---
 .../db/query/dataset/UDTFAlignByTimeDataSet.java   |   4 +-
 .../apache/iotdb/db/query/dataset/UDTFDataSet.java |  14 +-
 .../db/query/dataset/UDTFNonAlignDataSet.java      |   2 +-
 .../iotdb/db/query/expression/Expression.java      |   4 +-
 .../query/expression/binary/BinaryExpression.java  |   4 +-
 .../query/expression/unary/FunctionExpression.java |   6 +-
 .../query/expression/unary/NegationExpression.java |   4 +-
 .../query/expression/unary/TimeSeriesOperand.java  |   4 +-
 .../iotdb/db/query/udf/core/layer/DAGBuilder.java  |   5 +-
 .../query/udf/core/layer/RawQueryInputLayer.java   | 197 +++++++
 .../iotdb/db/query/udf/core/layer/UDFLayer.java    | 575 ---------------------
 11 files changed, 221 insertions(+), 598 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
index c7e8af4..beec5d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
@@ -193,7 +193,7 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
       }
 
       // todo: control upper bound here
-      udfLayer.updateRowRecordListEvictionUpperBound();
+      rawQueryInputLayer.updateRowRecordListEvictionUpperBound();
     }
 
     /*
@@ -296,7 +296,7 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
     }
 
     // todo: control upper bound here
-    udfLayer.updateRowRecordListEvictionUpperBound();
+    rawQueryInputLayer.updateRowRecordListEvictionUpperBound();
 
     return rowRecord;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
index 3f69f62..f58a8de 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
 import org.apache.iotdb.db.query.udf.core.layer.DAGBuilder;
-import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
+import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
@@ -48,7 +48,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
 
   protected final long queryId;
   protected final UDTFPlan udtfPlan;
-  protected final UDFLayer udfLayer;
+  protected final RawQueryInputLayer rawQueryInputLayer;
 
   protected LayerPointReader[] transformers;
 
@@ -65,8 +65,8 @@ public abstract class UDTFDataSet extends QueryDataSet {
     super(new ArrayList<>(deduplicatedPaths), deduplicatedDataTypes);
     queryId = queryContext.getQueryId();
     this.udtfPlan = udtfPlan;
-    udfLayer =
-        new UDFLayer(
+    rawQueryInputLayer =
+        new RawQueryInputLayer(
             queryId,
             UDF_READER_MEMORY_BUDGET_IN_MB,
             deduplicatedPaths,
@@ -89,8 +89,8 @@ public abstract class UDTFDataSet extends QueryDataSet {
     super(new ArrayList<>(deduplicatedPaths), deduplicatedDataTypes);
     queryId = queryContext.getQueryId();
     this.udtfPlan = udtfPlan;
-    udfLayer =
-        new UDFLayer(
+    rawQueryInputLayer =
+        new RawQueryInputLayer(
             queryId,
             UDF_READER_MEMORY_BUDGET_IN_MB,
             deduplicatedPaths,
@@ -102,7 +102,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
 
   protected void initTransformers() throws QueryProcessException, IOException {
     transformers =
-        new DAGBuilder(queryId, udtfPlan, udfLayer, UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB)
+        new DAGBuilder(queryId, udtfPlan, rawQueryInputLayer, UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB)
             .buildLayerMemoryAssigner()
             .buildResultColumnPointReaders()
             .getResultColumnPointReaders();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java
index e3810bb..05f954c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java
@@ -114,7 +114,7 @@ public class UDTFNonAlignDataSet extends UDTFDataSet implements DirectNonAlignDa
     }
 
     // todo: control upper bound here
-    udfLayer.updateRowRecordListEvictionUpperBound();
+    rawQueryInputLayer.updateRowRecordListEvictionUpperBound();
 
     tsQueryNonAlignDataSet.setTimeList(timeBufferList);
     tsQueryNonAlignDataSet.setValueList(valueBufferList);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index 738cbea..cea4875 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
-import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
+import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
 
 import java.io.IOException;
 import java.time.ZoneId;
@@ -63,7 +63,7 @@ public abstract class Expression {
   public abstract IntermediateLayer constructIntermediateLayer(
       long queryId,
       UDTFPlan udtfPlan,
-      UDFLayer rawTimeSeriesInputLayer,
+      RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       LayerMemoryAssigner memoryAssigner)
       throws QueryProcessException, IOException;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index 4b76daa..99cfffc 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -28,9 +28,9 @@ import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
+import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
 import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnMultiReferenceIntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReferenceIntermediateLayer;
-import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticBinaryTransformer;
 import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
@@ -134,7 +134,7 @@ public abstract class BinaryExpression extends Expression {
   public IntermediateLayer constructIntermediateLayer(
       long queryId,
       UDTFPlan udtfPlan,
-      UDFLayer rawTimeSeriesInputLayer,
+      RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       LayerMemoryAssigner memoryAssigner)
       throws QueryProcessException, IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index 4fa6bbb..adb703b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -32,9 +32,9 @@ import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
 import org.apache.iotdb.db.query.udf.core.layer.MultiInputColumnIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
 import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnMultiReferenceIntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReferenceIntermediateLayer;
-import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
 import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
 import org.apache.iotdb.db.query.udf.core.transformer.UDFQueryRowTransformer;
 import org.apache.iotdb.db.query.udf.core.transformer.UDFQueryRowWindowTransformer;
@@ -185,7 +185,7 @@ public class FunctionExpression extends Expression {
   public IntermediateLayer constructIntermediateLayer(
       long queryId,
       UDTFPlan udtfPlan,
-      UDFLayer rawTimeSeriesInputLayer,
+      RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       LayerMemoryAssigner memoryAssigner)
       throws QueryProcessException, IOException {
@@ -217,7 +217,7 @@ public class FunctionExpression extends Expression {
   private IntermediateLayer constructUdfInputIntermediateLayer(
       long queryId,
       UDTFPlan udtfPlan,
-      UDFLayer rawTimeSeriesInputLayer,
+      RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       LayerMemoryAssigner memoryAssigner)
       throws QueryProcessException, IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index a85b386..0e33cd8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -28,9 +28,9 @@ import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
+import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
 import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnMultiReferenceIntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReferenceIntermediateLayer;
-import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
 import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticNegationTransformer;
 import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
 
@@ -98,7 +98,7 @@ public class NegationExpression extends Expression {
   public IntermediateLayer constructIntermediateLayer(
       long queryId,
       UDTFPlan udtfPlan,
-      UDFLayer rawTimeSeriesInputLayer,
+      RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       LayerMemoryAssigner memoryAssigner)
       throws QueryProcessException, IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index ec33153..55a79b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -28,9 +28,9 @@ import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
+import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
 import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnMultiReferenceIntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReferenceIntermediateLayer;
-import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
@@ -91,7 +91,7 @@ public class TimeSeriesOperand extends Expression {
   public IntermediateLayer constructIntermediateLayer(
       long queryId,
       UDTFPlan udtfPlan,
-      UDFLayer rawTimeSeriesInputLayer,
+      RawQueryInputLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
       LayerMemoryAssigner memoryAssigner)
       throws QueryProcessException {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
index e03ab9a..1a33c17 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
@@ -32,7 +32,7 @@ public class DAGBuilder {
 
   private final long queryId;
   private final UDTFPlan udtfPlan;
-  private final UDFLayer rawTimeSeriesInputLayer;
+  private final RawQueryInputLayer rawTimeSeriesInputLayer;
 
   // input
   private final Expression[] resultColumnExpressions;
@@ -47,7 +47,8 @@ public class DAGBuilder {
   // make sure that only one point reader will be built for one expression.
   private final Map<Expression, IntermediateLayer> expressionIntermediateLayerMap;
 
-  public DAGBuilder(long queryId, UDTFPlan udtfPlan, UDFLayer inputLayer, float memoryBudgetInMB) {
+  public DAGBuilder(
+      long queryId, UDTFPlan udtfPlan, RawQueryInputLayer inputLayer, float memoryBudgetInMB) {
     this.queryId = queryId;
     this.udtfPlan = udtfPlan;
     this.rawTimeSeriesInputLayer = inputLayer;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
new file mode 100644
index 0000000..b67923c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.layer;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.dataset.RawQueryDataSetWithValueFilter;
+import org.apache.iotdb.db.query.dataset.RawQueryDataSetWithoutValueFilter;
+import org.apache.iotdb.db.query.dataset.UDFInputDataSet;
+import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
+import org.apache.iotdb.db.query.udf.core.layer.SafetyLine.SafetyPile;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.datastructure.row.ElasticSerializableRowRecordList;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import java.io.IOException;
+import java.util.List;
+
+public class RawQueryInputLayer {
+
+  private UDFInputDataSet queryDataSet;
+  private TSDataType[] dataTypes;
+  private int timestampIndex;
+
+  private ElasticSerializableRowRecordList rowRecordList;
+  private SafetyLine safetyLine;
+
+  /** InputLayerWithoutValueFilter */
+  public RawQueryInputLayer(
+      long queryId,
+      float memoryBudgetInMB,
+      List<PartialPath> paths,
+      List<TSDataType> dataTypes,
+      List<ManagedSeriesReader> readers)
+      throws QueryProcessException, IOException, InterruptedException {
+    construct(
+        queryId,
+        memoryBudgetInMB,
+        new RawQueryDataSetWithoutValueFilter(queryId, paths, dataTypes, readers, true));
+  }
+
+  /** InputLayerWithValueFilter */
+  public RawQueryInputLayer(
+      long queryId,
+      float memoryBudgetInMB,
+      List<PartialPath> paths,
+      List<TSDataType> dataTypes,
+      TimeGenerator timeGenerator,
+      List<IReaderByTimestamp> readers,
+      List<Boolean> cached)
+      throws QueryProcessException {
+    construct(
+        queryId,
+        memoryBudgetInMB,
+        new RawQueryDataSetWithValueFilter(paths, dataTypes, timeGenerator, readers, cached, true));
+  }
+
+  private void construct(long queryId, float memoryBudgetInMB, UDFInputDataSet queryDataSet)
+      throws QueryProcessException {
+    this.queryDataSet = queryDataSet;
+    dataTypes = queryDataSet.getDataTypes().toArray(new TSDataType[0]);
+    timestampIndex = dataTypes.length;
+    rowRecordList =
+        new ElasticSerializableRowRecordList(
+            dataTypes, queryId, memoryBudgetInMB, 1 + dataTypes.length / 2);
+    safetyLine = new SafetyLine();
+  }
+
+  public void updateRowRecordListEvictionUpperBound() {
+    rowRecordList.setEvictionUpperBound(safetyLine.getSafetyLine());
+  }
+
+  public LayerPointReader constructPointReader(int columnIndex) {
+    return new InputLayerPointReader(columnIndex);
+  }
+
+  private class InputLayerPointReader implements LayerPointReader {
+
+    private final SafetyPile safetyPile;
+
+    private final int columnIndex;
+    private int currentRowIndex;
+
+    private boolean hasCachedRowRecord;
+    private Object[] cachedRowRecord;
+
+    InputLayerPointReader(int columnIndex) {
+      safetyPile = safetyLine.addSafetyPile();
+
+      this.columnIndex = columnIndex;
+      currentRowIndex = -1;
+
+      hasCachedRowRecord = false;
+      cachedRowRecord = null;
+    }
+
+    @Override
+    public boolean next() throws IOException, QueryProcessException {
+      if (hasCachedRowRecord) {
+        return true;
+      }
+
+      for (int i = currentRowIndex + 1; i < rowRecordList.size(); ++i) {
+        Object[] rowRecordCandidate = rowRecordList.getRowRecord(i);
+        if (rowRecordCandidate[columnIndex] != null) {
+          hasCachedRowRecord = true;
+          cachedRowRecord = rowRecordCandidate;
+          currentRowIndex = i;
+          break;
+        }
+      }
+
+      if (!hasCachedRowRecord) {
+        while (queryDataSet.hasNextRowInObjects()) {
+          Object[] rowRecordCandidate = queryDataSet.nextRowInObjects();
+          rowRecordList.put(rowRecordCandidate);
+          if (rowRecordCandidate[columnIndex] != null) {
+            hasCachedRowRecord = true;
+            cachedRowRecord = rowRecordCandidate;
+            currentRowIndex = rowRecordList.size() - 1;
+            break;
+          }
+        }
+      }
+
+      return hasCachedRowRecord;
+    }
+
+    @Override
+    public void readyForNext() {
+      hasCachedRowRecord = false;
+      cachedRowRecord = null;
+
+      safetyPile.moveForwardTo(currentRowIndex + 1);
+    }
+
+    @Override
+    public TSDataType getDataType() {
+      return dataTypes[columnIndex];
+    }
+
+    @Override
+    public long currentTime() {
+      return (long) cachedRowRecord[timestampIndex];
+    }
+
+    @Override
+    public int currentInt() {
+      return (int) cachedRowRecord[columnIndex];
+    }
+
+    @Override
+    public long currentLong() {
+      return (long) cachedRowRecord[columnIndex];
+    }
+
+    @Override
+    public float currentFloat() {
+      return (float) cachedRowRecord[columnIndex];
+    }
+
+    @Override
+    public double currentDouble() {
+      return (double) cachedRowRecord[columnIndex];
+    }
+
+    @Override
+    public boolean currentBoolean() {
+      return (boolean) cachedRowRecord[columnIndex];
+    }
+
+    @Override
+    public Binary currentBinary() {
+      return (Binary) cachedRowRecord[columnIndex];
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java
deleted file mode 100644
index 3148dd4..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java
+++ /dev/null
@@ -1,575 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.query.udf.core.layer;
-
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.query.dataset.RawQueryDataSetWithValueFilter;
-import org.apache.iotdb.db.query.dataset.RawQueryDataSetWithoutValueFilter;
-import org.apache.iotdb.db.query.dataset.UDFInputDataSet;
-import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
-import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
-import org.apache.iotdb.db.query.udf.api.access.Row;
-import org.apache.iotdb.db.query.udf.api.access.RowWindow;
-import org.apache.iotdb.db.query.udf.api.customizer.strategy.AccessStrategy;
-import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
-import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
-import org.apache.iotdb.db.query.udf.core.access.MultiColumnRow;
-import org.apache.iotdb.db.query.udf.core.access.MultiColumnWindow;
-import org.apache.iotdb.db.query.udf.core.layer.SafetyLine.SafetyPile;
-import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
-import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
-import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader;
-import org.apache.iotdb.db.query.udf.datastructure.primitive.ElasticSerializableIntList;
-import org.apache.iotdb.db.query.udf.datastructure.primitive.IntList;
-import org.apache.iotdb.db.query.udf.datastructure.primitive.SerializableIntList;
-import org.apache.iotdb.db.query.udf.datastructure.primitive.WrappedIntArray;
-import org.apache.iotdb.db.query.udf.datastructure.row.ElasticSerializableRowRecordList;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
-import org.apache.iotdb.tsfile.utils.Binary;
-
-import java.io.IOException;
-import java.util.List;
-
-public class UDFLayer {
-
-  private long queryId;
-
-  private UDFInputDataSet queryDataSet;
-  private TSDataType[] dataTypes;
-  private int timestampIndex;
-
-  private ElasticSerializableRowRecordList rowRecordList;
-  private SafetyLine safetyLine;
-
-  /** InputLayerWithoutValueFilter */
-  public UDFLayer(
-      long queryId,
-      float memoryBudgetInMB,
-      List<PartialPath> paths,
-      List<TSDataType> dataTypes,
-      List<ManagedSeriesReader> readers)
-      throws QueryProcessException, IOException, InterruptedException {
-    construct(
-        queryId,
-        memoryBudgetInMB,
-        new RawQueryDataSetWithoutValueFilter(queryId, paths, dataTypes, readers, true));
-  }
-
-  /** InputLayerWithValueFilter */
-  public UDFLayer(
-      long queryId,
-      float memoryBudgetInMB,
-      List<PartialPath> paths,
-      List<TSDataType> dataTypes,
-      TimeGenerator timeGenerator,
-      List<IReaderByTimestamp> readers,
-      List<Boolean> cached)
-      throws QueryProcessException {
-    construct(
-        queryId,
-        memoryBudgetInMB,
-        new RawQueryDataSetWithValueFilter(paths, dataTypes, timeGenerator, readers, cached, true));
-  }
-
-  public UDFLayer(long queryId, float memoryBudgetInMB, UDFInputDataSet queryDataSet)
-      throws QueryProcessException {
-    construct(queryId, memoryBudgetInMB, queryDataSet);
-  }
-
-  private void construct(long queryId, float memoryBudgetInMB, UDFInputDataSet queryDataSet)
-      throws QueryProcessException {
-    this.queryId = queryId;
-    this.queryDataSet = queryDataSet;
-    dataTypes = queryDataSet.getDataTypes().toArray(new TSDataType[0]);
-    timestampIndex = dataTypes.length;
-    rowRecordList =
-        new ElasticSerializableRowRecordList(
-            dataTypes, queryId, memoryBudgetInMB, 1 + dataTypes.length / 2);
-    safetyLine = new SafetyLine();
-  }
-
-  public void updateRowRecordListEvictionUpperBound() {
-    rowRecordList.setEvictionUpperBound(safetyLine.getSafetyLine());
-  }
-
-  public LayerPointReader constructPointReader(int columnIndex) {
-    return new InputLayerPointReader(columnIndex);
-  }
-
-  public LayerRowReader constructRowReader(int[] columnIndexes) {
-    return new InputLayerRowReader(columnIndexes);
-  }
-
-  public LayerRowWindowReader constructRowWindowReader(
-      int[] columnIndexes, AccessStrategy strategy, float memoryBudgetInMB)
-      throws QueryProcessException, IOException {
-    switch (strategy.getAccessStrategyType()) {
-      case SLIDING_TIME_WINDOW:
-        return new InputLayerRowSlidingTimeWindowReader(
-            columnIndexes, (SlidingTimeWindowAccessStrategy) strategy, memoryBudgetInMB);
-      case SLIDING_SIZE_WINDOW:
-        return new InputLayerRowSlidingSizeWindowReader(
-            columnIndexes, (SlidingSizeWindowAccessStrategy) strategy, memoryBudgetInMB);
-      default:
-        throw new IllegalStateException(
-            "Unexpected access strategy: " + strategy.getAccessStrategyType());
-    }
-  }
-
-  private class InputLayerPointReader implements LayerPointReader {
-
-    private final SafetyPile safetyPile;
-
-    private final int columnIndex;
-    private int currentRowIndex;
-
-    private boolean hasCachedRowRecord;
-    private Object[] cachedRowRecord;
-
-    InputLayerPointReader(int columnIndex) {
-      safetyPile = safetyLine.addSafetyPile();
-
-      this.columnIndex = columnIndex;
-      currentRowIndex = -1;
-
-      hasCachedRowRecord = false;
-      cachedRowRecord = null;
-    }
-
-    @Override
-    public boolean next() throws IOException, QueryProcessException {
-      if (hasCachedRowRecord) {
-        return true;
-      }
-
-      for (int i = currentRowIndex + 1; i < rowRecordList.size(); ++i) {
-        Object[] rowRecordCandidate = rowRecordList.getRowRecord(i);
-        if (rowRecordCandidate[columnIndex] != null) {
-          hasCachedRowRecord = true;
-          cachedRowRecord = rowRecordCandidate;
-          currentRowIndex = i;
-          break;
-        }
-      }
-
-      if (!hasCachedRowRecord) {
-        while (queryDataSet.hasNextRowInObjects()) {
-          Object[] rowRecordCandidate = queryDataSet.nextRowInObjects();
-          rowRecordList.put(rowRecordCandidate);
-          if (rowRecordCandidate[columnIndex] != null) {
-            hasCachedRowRecord = true;
-            cachedRowRecord = rowRecordCandidate;
-            currentRowIndex = rowRecordList.size() - 1;
-            break;
-          }
-        }
-      }
-
-      return hasCachedRowRecord;
-    }
-
-    @Override
-    public void readyForNext() {
-      hasCachedRowRecord = false;
-      cachedRowRecord = null;
-
-      safetyPile.moveForwardTo(currentRowIndex + 1);
-    }
-
-    @Override
-    public TSDataType getDataType() {
-      return dataTypes[columnIndex];
-    }
-
-    @Override
-    public long currentTime() {
-      return (long) cachedRowRecord[timestampIndex];
-    }
-
-    @Override
-    public int currentInt() {
-      return (int) cachedRowRecord[columnIndex];
-    }
-
-    @Override
-    public long currentLong() {
-      return (long) cachedRowRecord[columnIndex];
-    }
-
-    @Override
-    public float currentFloat() {
-      return (float) cachedRowRecord[columnIndex];
-    }
-
-    @Override
-    public double currentDouble() {
-      return (double) cachedRowRecord[columnIndex];
-    }
-
-    @Override
-    public boolean currentBoolean() {
-      return (boolean) cachedRowRecord[columnIndex];
-    }
-
-    @Override
-    public Binary currentBinary() {
-      return (Binary) cachedRowRecord[columnIndex];
-    }
-  }
-
-  private class InputLayerRowReader implements LayerRowReader {
-
-    private final SafetyPile safetyPile;
-
-    private final int[] columnIndexes;
-    private int currentRowIndex;
-
-    private boolean hasCachedRowRecord;
-    private Object[] cachedRowRecord;
-
-    private final MultiColumnRow row;
-
-    public InputLayerRowReader(int[] columnIndexes) {
-      safetyPile = safetyLine.addSafetyPile();
-
-      this.columnIndexes = columnIndexes;
-      currentRowIndex = -1;
-
-      hasCachedRowRecord = false;
-      cachedRowRecord = null;
-
-      row = new MultiColumnRow(columnIndexes, dataTypes);
-    }
-
-    @Override
-    public boolean next() throws IOException, QueryProcessException {
-      if (hasCachedRowRecord) {
-        return true;
-      }
-
-      for (int i = currentRowIndex + 1; i < rowRecordList.size(); ++i) {
-        Object[] rowRecordCandidate = rowRecordList.getRowRecord(i);
-        if (hasNotNullSelectedFields(rowRecordCandidate, columnIndexes)) {
-          hasCachedRowRecord = true;
-          cachedRowRecord = rowRecordCandidate;
-          currentRowIndex = i;
-          break;
-        }
-      }
-
-      if (!hasCachedRowRecord) {
-        while (queryDataSet.hasNextRowInObjects()) {
-          Object[] rowRecordCandidate = queryDataSet.nextRowInObjects();
-          rowRecordList.put(rowRecordCandidate);
-          if (hasNotNullSelectedFields(rowRecordCandidate, columnIndexes)) {
-            hasCachedRowRecord = true;
-            cachedRowRecord = rowRecordCandidate;
-            currentRowIndex = rowRecordList.size() - 1;
-            break;
-          }
-        }
-      }
-
-      return hasCachedRowRecord;
-    }
-
-    @Override
-    public void readyForNext() {
-      hasCachedRowRecord = false;
-      cachedRowRecord = null;
-
-      safetyPile.moveForwardTo(currentRowIndex + 1);
-    }
-
-    @Override
-    public TSDataType[] getDataTypes() {
-      return dataTypes;
-    }
-
-    @Override
-    public long currentTime() {
-      return (long) cachedRowRecord[timestampIndex];
-    }
-
-    @Override
-    public Row currentRow() {
-      return row.setRowRecord(cachedRowRecord);
-    }
-  }
-
-  private class InputLayerRowSlidingSizeWindowReader implements LayerRowWindowReader {
-
-    private final SafetyPile safetyPile;
-
-    private final int[] columnIndexes;
-    private final TSDataType[] columnDataTypes;
-
-    private final int windowSize;
-    private final IntList rowIndexes;
-    private final MultiColumnWindow rowWindow;
-
-    private final int slidingStep;
-
-    private int maxIndexInLastWindow;
-
-    private InputLayerRowSlidingSizeWindowReader(
-        int[] columnIndexes, SlidingSizeWindowAccessStrategy accessStrategy, float memoryBudgetInMB)
-        throws QueryProcessException {
-      safetyPile = safetyLine.addSafetyPile();
-
-      this.columnIndexes = columnIndexes;
-      columnDataTypes = new TSDataType[columnIndexes.length];
-      for (int i = 0; i < columnIndexes.length; ++i) {
-        columnDataTypes[i] = dataTypes[columnIndexes[i]];
-      }
-
-      windowSize = accessStrategy.getWindowSize();
-      rowIndexes =
-          windowSize < SerializableIntList.calculateCapacity(memoryBudgetInMB)
-              ? new WrappedIntArray(windowSize)
-              : new ElasticSerializableIntList(queryId, memoryBudgetInMB, 2);
-      rowWindow = new MultiColumnWindow(rowRecordList, columnIndexes, dataTypes, rowIndexes);
-
-      slidingStep = accessStrategy.getSlidingStep();
-
-      maxIndexInLastWindow = -1;
-    }
-
-    @Override
-    public boolean next() throws IOException, QueryProcessException {
-      if (0 < rowIndexes.size()) {
-        return true;
-      }
-
-      int count = 0;
-
-      for (int i = maxIndexInLastWindow + 1; i < rowRecordList.size(); ++i) {
-        if (hasNotNullSelectedFields(rowRecordList.getRowRecord(i), columnIndexes)) {
-          rowIndexes.put(i);
-          ++count;
-          if (count == windowSize) {
-            return true;
-          }
-        }
-      }
-
-      while (queryDataSet.hasNextRowInObjects()) {
-        Object[] rowRecordCandidate = queryDataSet.nextRowInObjects();
-        rowRecordList.put(rowRecordCandidate);
-        if (hasNotNullSelectedFields(rowRecordCandidate, columnIndexes)) {
-          rowIndexes.put(rowRecordList.size() - 1);
-          ++count;
-          if (count == windowSize) {
-            return true;
-          }
-        }
-      }
-
-      return count != 0;
-    }
-
-    @Override
-    public void readyForNext() throws IOException, QueryProcessException {
-      updateMaxIndexForLastWindow();
-
-      safetyPile.moveForwardTo(maxIndexInLastWindow + 1);
-
-      rowIndexes.clear();
-    }
-
-    private void updateMaxIndexForLastWindow() throws IOException, QueryProcessException {
-      if (rowIndexes.size() == 0) {
-        return;
-      }
-
-      if (slidingStep <= rowIndexes.size()) {
-        maxIndexInLastWindow = rowIndexes.get(slidingStep - 1);
-        return;
-      }
-
-      int currentStep = rowIndexes.size() - 1;
-
-      for (int i = rowIndexes.get(rowIndexes.size() - 1) + 1; i < rowRecordList.size(); ++i) {
-        if (hasNotNullSelectedFields(rowRecordList.getRowRecord(i), columnIndexes)) {
-          ++currentStep;
-          if (currentStep == slidingStep) {
-            maxIndexInLastWindow = i - 1;
-            return;
-          }
-        }
-      }
-
-      while (queryDataSet.hasNextRowInObjects()) {
-        Object[] rowRecordCandidate = queryDataSet.nextRowInObjects();
-        rowRecordList.put(rowRecordCandidate);
-        if (hasNotNullSelectedFields(rowRecordCandidate, columnIndexes)) {
-          ++currentStep;
-          if (currentStep == slidingStep) {
-            maxIndexInLastWindow = rowRecordList.size() - 2;
-            return;
-          }
-        }
-      }
-
-      maxIndexInLastWindow = rowRecordList.size() - 1;
-    }
-
-    @Override
-    public TSDataType[] getDataTypes() {
-      return columnDataTypes;
-    }
-
-    @Override
-    public RowWindow currentWindow() {
-      return rowWindow;
-    }
-  }
-
-  private class InputLayerRowSlidingTimeWindowReader implements LayerRowWindowReader {
-
-    private final SafetyPile safetyPile;
-
-    private final int[] columnIndexes;
-    private final TSDataType[] columnDataTypes;
-
-    private final long timeInterval;
-    private final long slidingStep;
-    private final long displayWindowEnd;
-
-    private final IntList rowIndexes;
-    private final MultiColumnWindow rowWindow;
-
-    private long nextWindowTimeBegin;
-    private int nextIndexBegin;
-
-    private final boolean hasAtLeastOneRow;
-
-    private InputLayerRowSlidingTimeWindowReader(
-        int[] columnIndexes, SlidingTimeWindowAccessStrategy accessStrategy, float memoryBudgetInMB)
-        throws QueryProcessException, IOException {
-      safetyPile = safetyLine.addSafetyPile();
-
-      this.columnIndexes = columnIndexes;
-      columnDataTypes = new TSDataType[columnIndexes.length];
-      for (int i = 0; i < columnIndexes.length; ++i) {
-        columnDataTypes[i] = dataTypes[columnIndexes[i]];
-      }
-
-      timeInterval = accessStrategy.getTimeInterval();
-      slidingStep = accessStrategy.getSlidingStep();
-      displayWindowEnd = accessStrategy.getDisplayWindowEnd();
-
-      rowIndexes = new ElasticSerializableIntList(queryId, memoryBudgetInMB, 2);
-      rowWindow = new MultiColumnWindow(rowRecordList, columnIndexes, dataTypes, rowIndexes);
-
-      nextWindowTimeBegin = accessStrategy.getDisplayWindowBegin();
-      nextIndexBegin = 0;
-
-      if (rowRecordList.size() == 0 && queryDataSet.hasNextRowInObjects()) {
-        rowRecordList.put(queryDataSet.nextRowInObjects());
-
-        if (nextWindowTimeBegin == Long.MIN_VALUE) {
-          // display window begin should be set to the same as the min timestamp of the query result
-          // set
-          nextWindowTimeBegin = rowRecordList.getTime(0);
-        }
-      }
-      hasAtLeastOneRow = rowRecordList.size() != 0;
-    }
-
-    @Override
-    public boolean next() throws IOException, QueryProcessException {
-      if (displayWindowEnd <= nextWindowTimeBegin) {
-        return false;
-      }
-      if (!hasAtLeastOneRow || 0 < rowIndexes.size()) {
-        return true;
-      }
-
-      long nextWindowTimeEnd = Math.min(nextWindowTimeBegin + timeInterval, displayWindowEnd);
-      int oldRowRecordListSize = rowRecordList.size();
-      while ((Long) rowRecordList.getRowRecord(rowRecordList.size() - 1)[timestampIndex]
-          < nextWindowTimeEnd) {
-        if (queryDataSet.hasNextRowInObjects()) {
-          rowRecordList.put(queryDataSet.nextRowInObjects());
-        } else if (displayWindowEnd == Long.MAX_VALUE
-            // display window end == the max timestamp of the query result set
-            && oldRowRecordListSize == rowRecordList.size()) {
-          return false;
-        } else {
-          break;
-        }
-      }
-
-      for (int i = nextIndexBegin; i < rowRecordList.size(); ++i) {
-        if (nextWindowTimeBegin <= (Long) rowRecordList.getRowRecord(i)[timestampIndex]) {
-          nextIndexBegin = i;
-          break;
-        }
-        if (i == rowRecordList.size() - 1) {
-          nextIndexBegin = rowRecordList.size();
-        }
-      }
-
-      for (int i = nextIndexBegin; i < rowRecordList.size(); ++i) {
-        Object[] rowRecordCandidate = rowRecordList.getRowRecord(i);
-        if (nextWindowTimeEnd <= (Long) rowRecordCandidate[timestampIndex]) {
-          break;
-        }
-        if (hasNotNullSelectedFields(rowRecordCandidate, columnIndexes)) {
-          rowIndexes.put(i);
-        }
-      }
-
-      return true;
-    }
-
-    @Override
-    public void readyForNext() {
-      nextWindowTimeBegin += slidingStep;
-
-      safetyPile.moveForwardTo(nextIndexBegin);
-
-      rowIndexes.clear();
-    }
-
-    @Override
-    public TSDataType[] getDataTypes() {
-      return columnDataTypes;
-    }
-
-    @Override
-    public RowWindow currentWindow() {
-      return rowWindow;
-    }
-  }
-
-  private static boolean hasNotNullSelectedFields(
-      Object[] rowRecordCandidate, int[] columnIndexes) {
-    for (int columnIndex : columnIndexes) {
-      if (rowRecordCandidate[columnIndex] != null) {
-        return true;
-      }
-    }
-    return false;
-  }
-}