You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/09/07 08:32:23 UTC

[iotdb] branch nested-operations updated: MultiInputColumnIntermediateLayer

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/nested-operations by this push:
     new 73b717b  MultiInputColumnIntermediateLayer
73b717b is described below

commit 73b717b0d571c0e4b8a789f685d0cff894c2484d
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Sep 7 16:31:46 2021 +0800

    MultiInputColumnIntermediateLayer
---
 .../iotdb/db/query/expression/Expression.java      |   3 +-
 .../query/expression/unary/FunctionExpression.java |   8 +-
 ...ializableRowRecordListBackedMultiColumnRow.java |  97 ++++++
 ...zableRowRecordListBackedMultiColumnWindow.java} |  39 +--
 ...RecordListBackedMultiColumnWindowIterator.java} |  20 +-
 ...SerializableTVListBackedSingleColumnWindow.java |   4 -
 ...ableTVListBackedSingleColumnWindowIterator.java |   2 +
 .../db/query/udf/core/layer/LayerCacheUtils.java   |  23 ++
 .../layer/MultiInputColumnIntermediateLayer.java   | 337 +++++++++++++++++++++
 ...InputColumnMultiReferenceIntermediateLayer.java |  58 ----
 ...nputColumnSingleReferenceIntermediateLayer.java |  58 ----
 .../row/ElasticSerializableRowRecordList.java      |   9 +-
 12 files changed, 504 insertions(+), 154 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index 09a15ad..10b4117 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -55,7 +56,7 @@ public abstract class Expression {
       UDTFPlan udtfPlan,
       UDFLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
-      throws QueryProcessException;
+      throws QueryProcessException, IOException;
 
   public String getExpressionString() {
     if (expressionString == null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index 10b3811..462ba07 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -28,10 +28,11 @@ import org.apache.iotdb.db.qp.strategy.optimizer.ConcatPathOptimizer;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
-import org.apache.iotdb.db.query.udf.core.layer.MultiInputColumnMultiReferenceIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.MultiInputColumnIntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -153,7 +154,7 @@ public class FunctionExpression extends Expression {
       UDTFPlan udtfPlan,
       UDFLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
-      throws QueryProcessException {
+      throws QueryProcessException, IOException {
     if (!expressionIntermediateLayerMap.containsKey(this)) {
       List<LayerPointReader> parentLayerPointReaders = new ArrayList<>();
       for (Expression expression : expressions) {
@@ -165,8 +166,7 @@ public class FunctionExpression extends Expression {
       }
 
       expressionIntermediateLayerMap.put(
-          this,
-          new MultiInputColumnMultiReferenceIntermediateLayer(parentLayerPointReaders, -1, -1));
+          this, new MultiInputColumnIntermediateLayer(-1, -1, parentLayerPointReaders));
     }
 
     return expressionIntermediateLayerMap.get(this);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableRowRecordListBackedMultiColumnRow.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableRowRecordListBackedMultiColumnRow.java
new file mode 100644
index 0000000..d29b0d5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableRowRecordListBackedMultiColumnRow.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.access;
+
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+public class ElasticSerializableRowRecordListBackedMultiColumnRow implements Row {
+
+  private final TSDataType[] dataTypes;
+  private final int size;
+
+  private Object[] rowRecord;
+
+  public ElasticSerializableRowRecordListBackedMultiColumnRow(TSDataType[] dataTypes) {
+    this.dataTypes = dataTypes;
+    size = dataTypes.length;
+  }
+
+  @Override
+  public long getTime() {
+    return (long) rowRecord[size];
+  }
+
+  @Override
+  public int getInt(int columnIndex) {
+    return (int) rowRecord[columnIndex];
+  }
+
+  @Override
+  public long getLong(int columnIndex) {
+    return (long) rowRecord[columnIndex];
+  }
+
+  @Override
+  public float getFloat(int columnIndex) {
+    return (float) rowRecord[columnIndex];
+  }
+
+  @Override
+  public double getDouble(int columnIndex) {
+    return (double) rowRecord[columnIndex];
+  }
+
+  @Override
+  public boolean getBoolean(int columnIndex) {
+    return (boolean) rowRecord[columnIndex];
+  }
+
+  @Override
+  public Binary getBinary(int columnIndex) {
+    return (Binary) rowRecord[columnIndex];
+  }
+
+  @Override
+  public String getString(int columnIndex) {
+    return ((Binary) rowRecord[columnIndex]).getStringValue();
+  }
+
+  @Override
+  public TSDataType getDataType(int columnIndex) {
+    return dataTypes[columnIndex];
+  }
+
+  @Override
+  public boolean isNull(int columnIndex) {
+    return rowRecord[columnIndex] == null;
+  }
+
+  @Override
+  public int size() {
+    return size;
+  }
+
+  public Row setRowRecord(Object[] rowRecord) {
+    this.rowRecord = rowRecord;
+    return this;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableRowRecordListBackedMultiColumnWindow.java
similarity index 61%
copy from server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java
copy to server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableRowRecordListBackedMultiColumnWindow.java
index 4157f90..486120a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableRowRecordListBackedMultiColumnWindow.java
@@ -22,27 +22,33 @@ package org.apache.iotdb.db.query.udf.core.access;
 import org.apache.iotdb.db.query.udf.api.access.Row;
 import org.apache.iotdb.db.query.udf.api.access.RowIterator;
 import org.apache.iotdb.db.query.udf.api.access.RowWindow;
-import org.apache.iotdb.db.query.udf.datastructure.tv.ElasticSerializableTVList;
+import org.apache.iotdb.db.query.udf.datastructure.row.ElasticSerializableRowRecordList;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
-public class ElasticSerializableTVListBackedSingleColumnWindow implements RowWindow {
+import java.io.IOException;
+
+public class ElasticSerializableRowRecordListBackedMultiColumnWindow implements RowWindow {
+
+  private final ElasticSerializableRowRecordList rowRecordList;
+  private final TSDataType[] dataTypes;
 
-  private final ElasticSerializableTVList tvList;
   private int beginIndex;
   private int endIndex;
   private int size;
 
-  private final ElasticSerializableTVListBackedSingleColumnRow row;
-  private ElasticSerializableTVListBackedSingleColumnWindowIterator rowIterator;
+  private final ElasticSerializableRowRecordListBackedMultiColumnRow row;
+  private ElasticSerializableRowRecordListBackedMultiColumnWindowIterator rowIterator;
+
+  public ElasticSerializableRowRecordListBackedMultiColumnWindow(
+      ElasticSerializableRowRecordList rowRecordList) {
+    this.rowRecordList = rowRecordList;
+    this.dataTypes = rowRecordList.getDataTypes();
 
-  // [beginIndex, endIndex)
-  public ElasticSerializableTVListBackedSingleColumnWindow(ElasticSerializableTVList tvList) {
-    this.tvList = tvList;
     beginIndex = 0;
     endIndex = 0;
     size = 0;
 
-    row = new ElasticSerializableTVListBackedSingleColumnRow(tvList, beginIndex);
+    row = new ElasticSerializableRowRecordListBackedMultiColumnRow(dataTypes);
   }
 
   @Override
@@ -51,25 +57,21 @@ public class ElasticSerializableTVListBackedSingleColumnWindow implements RowWin
   }
 
   @Override
-  public Row getRow(int rowIndex) {
-    if (rowIndex < beginIndex || endIndex <= rowIndex) {
-      throw new ArrayIndexOutOfBoundsException(
-          String.format("Array index(%d) out of range [%d, %d).", rowIndex, beginIndex, endIndex));
-    }
-    return row.seek(beginIndex + rowIndex);
+  public Row getRow(int rowIndex) throws IOException {
+    return row.setRowRecord(rowRecordList.getRowRecord(beginIndex + rowIndex));
   }
 
   @Override
   public TSDataType getDataType(int columnIndex) {
-    return tvList.getDataType();
+    return dataTypes[columnIndex];
   }
 
   @Override
   public RowIterator getRowIterator() {
     if (rowIterator == null) {
       rowIterator =
-          new ElasticSerializableTVListBackedSingleColumnWindowIterator(
-              tvList, beginIndex, endIndex);
+          new ElasticSerializableRowRecordListBackedMultiColumnWindowIterator(
+              rowRecordList, beginIndex, endIndex);
     }
 
     rowIterator.reset();
@@ -81,7 +83,6 @@ public class ElasticSerializableTVListBackedSingleColumnWindow implements RowWin
     this.endIndex = endIndex;
     size = endIndex - beginIndex;
 
-    row.seek(beginIndex);
     rowIterator = null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindowIterator.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableRowRecordListBackedMultiColumnWindowIterator.java
similarity index 65%
copy from server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindowIterator.java
copy to server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableRowRecordListBackedMultiColumnWindowIterator.java
index 5991d50..801e43f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindowIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableRowRecordListBackedMultiColumnWindowIterator.java
@@ -21,23 +21,27 @@ package org.apache.iotdb.db.query.udf.core.access;
 
 import org.apache.iotdb.db.query.udf.api.access.Row;
 import org.apache.iotdb.db.query.udf.api.access.RowIterator;
-import org.apache.iotdb.db.query.udf.datastructure.tv.ElasticSerializableTVList;
+import org.apache.iotdb.db.query.udf.datastructure.row.ElasticSerializableRowRecordList;
 
 import java.io.IOException;
 
-public class ElasticSerializableTVListBackedSingleColumnWindowIterator implements RowIterator {
+public class ElasticSerializableRowRecordListBackedMultiColumnWindowIterator
+    implements RowIterator {
 
+  private final ElasticSerializableRowRecordList rowRecordList;
   private final int beginIndex;
   private final int size;
-  private final ElasticSerializableTVListBackedSingleColumnRow row;
+
+  private final ElasticSerializableRowRecordListBackedMultiColumnRow row;
   private int rowIndex;
 
-  // [beginIndex, endIndex)
-  public ElasticSerializableTVListBackedSingleColumnWindowIterator(
-      ElasticSerializableTVList tvList, int beginIndex, int endIndex) {
+  public ElasticSerializableRowRecordListBackedMultiColumnWindowIterator(
+      ElasticSerializableRowRecordList rowRecordList, int beginIndex, int endIndex) {
+    this.rowRecordList = rowRecordList;
     this.beginIndex = beginIndex;
     size = endIndex - beginIndex;
-    row = new ElasticSerializableTVListBackedSingleColumnRow(tvList, beginIndex);
+
+    row = new ElasticSerializableRowRecordListBackedMultiColumnRow(rowRecordList.getDataTypes());
     rowIndex = -1;
   }
 
@@ -48,7 +52,7 @@ public class ElasticSerializableTVListBackedSingleColumnWindowIterator implement
 
   @Override
   public Row next() throws IOException {
-    return row.seek(++rowIndex + beginIndex);
+    return row.setRowRecord(rowRecordList.getRowRecord(++rowIndex + beginIndex));
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java
index 4157f90..3296d5b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java
@@ -52,10 +52,6 @@ public class ElasticSerializableTVListBackedSingleColumnWindow implements RowWin
 
   @Override
   public Row getRow(int rowIndex) {
-    if (rowIndex < beginIndex || endIndex <= rowIndex) {
-      throw new ArrayIndexOutOfBoundsException(
-          String.format("Array index(%d) out of range [%d, %d).", rowIndex, beginIndex, endIndex));
-    }
     return row.seek(beginIndex + rowIndex);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindowIterator.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindowIterator.java
index 5991d50..7dc35d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindowIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindowIterator.java
@@ -29,6 +29,7 @@ public class ElasticSerializableTVListBackedSingleColumnWindowIterator implement
 
   private final int beginIndex;
   private final int size;
+
   private final ElasticSerializableTVListBackedSingleColumnRow row;
   private int rowIndex;
 
@@ -37,6 +38,7 @@ public class ElasticSerializableTVListBackedSingleColumnWindowIterator implement
       ElasticSerializableTVList tvList, int beginIndex, int endIndex) {
     this.beginIndex = beginIndex;
     size = endIndex - beginIndex;
+
     row = new ElasticSerializableTVListBackedSingleColumnRow(tvList, beginIndex);
     rowIndex = -1;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerCacheUtils.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerCacheUtils.java
index 5801eef..d098699 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerCacheUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerCacheUtils.java
@@ -20,7 +20,9 @@
 package org.apache.iotdb.db.query.udf.core.layer;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.dataset.UDFInputDataSet;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.datastructure.row.ElasticSerializableRowRecordList;
 import org.apache.iotdb.db.query.udf.datastructure.tv.ElasticSerializableTVList;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
@@ -78,4 +80,25 @@ public class LayerCacheUtils {
 
     return true;
   }
+
+  /** @return number of actually collected, which may be less than or equals to rowsNumber */
+  public static int cacheRows(
+      UDFInputDataSet source, ElasticSerializableRowRecordList target, int rowsNumber)
+      throws QueryProcessException, IOException {
+    int count = 0;
+    while (count < rowsNumber && cacheRow(source, target)) {
+      ++count;
+    }
+    return count;
+  }
+
+  public static boolean cacheRow(UDFInputDataSet source, ElasticSerializableRowRecordList target)
+      throws IOException, QueryProcessException {
+    if (source.hasNextRowInObjects()) {
+      target.put(source.nextRowInObjects());
+      return true;
+    } else {
+      return false;
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnIntermediateLayer.java
new file mode 100644
index 0000000..1766ff4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnIntermediateLayer.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.layer;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.dataset.UDFInputDataSet;
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.access.RowWindow;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.core.access.ElasticSerializableRowRecordListBackedMultiColumnRow;
+import org.apache.iotdb.db.query.udf.core.access.ElasticSerializableRowRecordListBackedMultiColumnWindow;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader;
+import org.apache.iotdb.db.query.udf.datastructure.row.ElasticSerializableRowRecordList;
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+public class MultiInputColumnIntermediateLayer extends IntermediateLayer
+    implements UDFInputDataSet {
+
+  private final LayerPointReader[] layerPointReaders;
+  private final TSDataType[] dataTypes;
+  private final TimeSelector timeHeap;
+
+  public MultiInputColumnIntermediateLayer(
+      long queryId, float memoryBudgetInMB, List<LayerPointReader> parentLayerPointReaders)
+      throws QueryProcessException, IOException {
+    super(queryId, memoryBudgetInMB);
+
+    layerPointReaders = parentLayerPointReaders.toArray(new LayerPointReader[0]);
+
+    dataTypes = new TSDataType[layerPointReaders.length];
+    for (int i = 0; i < layerPointReaders.length; ++i) {
+      dataTypes[i] = layerPointReaders[i].getDataType();
+    }
+
+    timeHeap = new TimeSelector(layerPointReaders.length << 1, true);
+    for (LayerPointReader reader : layerPointReaders) {
+      if (reader.next()) {
+        timeHeap.add(reader.currentTime());
+      }
+    }
+  }
+
+  @Override
+  public List<TSDataType> getDataTypes() {
+    return Arrays.asList(dataTypes);
+  }
+
+  @Override
+  public boolean hasNextRowInObjects() {
+    return !timeHeap.isEmpty();
+  }
+
+  @Override
+  public Object[] nextRowInObjects() throws IOException {
+    long minTime = timeHeap.pollFirst();
+
+    int rowLength = layerPointReaders.length;
+    Object[] row = new Object[rowLength + 1];
+    row[rowLength] = minTime;
+
+    try {
+      for (int i = 0; i < rowLength; ++i) {
+        LayerPointReader reader = layerPointReaders[i];
+        if (!reader.next() || reader.currentTime() != minTime) {
+          continue;
+        }
+
+        switch (reader.getDataType()) {
+          case INT32:
+            row[i] = reader.currentInt();
+            break;
+          case INT64:
+            row[i] = reader.currentLong();
+            break;
+          case FLOAT:
+            row[i] = reader.currentFloat();
+            break;
+          case DOUBLE:
+            row[i] = reader.currentDouble();
+            break;
+          case BOOLEAN:
+            row[i] = reader.currentBoolean();
+            break;
+          case TEXT:
+            row[i] = reader.currentBinary();
+            break;
+          default:
+            throw new UnSupportedDataTypeException("Unsupported data type.");
+        }
+        reader.readyForNext();
+
+        if (reader.next()) {
+          timeHeap.add(reader.currentTime());
+        }
+      }
+    } catch (QueryProcessException e) {
+      throw new IOException(e.getMessage());
+    }
+
+    return row;
+  }
+
+  @Override
+  public LayerPointReader constructPointReader() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public LayerRowReader constructRowReader() {
+
+    return new LayerRowReader() {
+
+      private final ElasticSerializableRowRecordListBackedMultiColumnRow row =
+          new ElasticSerializableRowRecordListBackedMultiColumnRow(dataTypes);
+
+      private boolean hasCached = false;
+
+      @Override
+      public boolean next() throws IOException {
+        if (hasCached) {
+          return true;
+        }
+
+        if (!hasNextRowInObjects()) {
+          return false;
+        }
+
+        row.setRowRecord(nextRowInObjects());
+        hasCached = true;
+        return true;
+      }
+
+      @Override
+      public void readyForNext() {
+        hasCached = false;
+      }
+
+      @Override
+      public TSDataType[] getDataTypes() {
+        return dataTypes;
+      }
+
+      @Override
+      public long currentTime() {
+        return row.getTime();
+      }
+
+      @Override
+      public Row currentRow() {
+        return row;
+      }
+    };
+  }
+
+  @Override
+  protected LayerRowWindowReader constructRowSlidingSizeWindowReader(
+      SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB)
+      throws QueryProcessException {
+
+    final UDFInputDataSet udfInputDataSet = this;
+
+    return new LayerRowWindowReader() {
+
+      private final int windowSize = strategy.getWindowSize();
+      private final int slidingStep = strategy.getSlidingStep();
+
+      private final ElasticSerializableRowRecordList rowRecordList =
+          new ElasticSerializableRowRecordList(dataTypes, queryId, memoryBudgetInMB, 2);
+      private final ElasticSerializableRowRecordListBackedMultiColumnWindow window =
+          new ElasticSerializableRowRecordListBackedMultiColumnWindow(rowRecordList);
+
+      private boolean hasCached = false;
+      private int beginIndex = -slidingStep;
+
+      @Override
+      public boolean next() throws IOException, QueryProcessException {
+        if (hasCached) {
+          return true;
+        }
+
+        beginIndex += slidingStep;
+        int endIndex = beginIndex + windowSize;
+
+        int rowsToBeCollected = endIndex - rowRecordList.size();
+        if (0 < rowsToBeCollected) {
+          hasCached =
+              LayerCacheUtils.cacheRows(udfInputDataSet, rowRecordList, rowsToBeCollected) != 0;
+          window.seek(beginIndex, rowRecordList.size());
+        } else {
+          hasCached = true;
+          window.seek(beginIndex, endIndex);
+        }
+
+        return hasCached;
+      }
+
+      @Override
+      public void readyForNext() {
+        hasCached = false;
+
+        rowRecordList.setEvictionUpperBound(beginIndex + 1);
+      }
+
+      @Override
+      public TSDataType[] getDataTypes() {
+        return dataTypes;
+      }
+
+      @Override
+      public RowWindow currentWindow() {
+        return window;
+      }
+    };
+  }
+
+  @Override
+  protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
+      SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
+      throws QueryProcessException, IOException {
+
+    final long timeInterval = strategy.getTimeInterval();
+    final long slidingStep = strategy.getSlidingStep();
+    final long displayWindowEnd = strategy.getDisplayWindowEnd();
+
+    final UDFInputDataSet udfInputDataSet = this;
+    final ElasticSerializableRowRecordList rowRecordList =
+        new ElasticSerializableRowRecordList(dataTypes, queryId, memoryBudgetInMB, 2);
+    final ElasticSerializableRowRecordListBackedMultiColumnWindow window =
+        new ElasticSerializableRowRecordListBackedMultiColumnWindow(rowRecordList);
+
+    long nextWindowTimeBeginGivenByStrategy = strategy.getDisplayWindowBegin();
+    if (rowRecordList.size() == 0
+        && LayerCacheUtils.cacheRow(udfInputDataSet, rowRecordList)
+        && nextWindowTimeBeginGivenByStrategy == Long.MIN_VALUE) {
+      // display window begin should be set to the same as the min timestamp of the query result
+      // set
+      nextWindowTimeBeginGivenByStrategy = rowRecordList.getTime(0);
+    }
+    long finalNextWindowTimeBeginGivenByStrategy = nextWindowTimeBeginGivenByStrategy;
+
+    final boolean hasAtLeastOneRow = rowRecordList.size() != 0;
+
+    return new LayerRowWindowReader() {
+
+      private long nextWindowTimeBegin = finalNextWindowTimeBeginGivenByStrategy;
+      private int nextIndexBegin = 0;
+
+      @Override
+      public boolean next() throws IOException, QueryProcessException {
+        if (displayWindowEnd <= nextWindowTimeBegin) {
+          return false;
+        }
+        if (!hasAtLeastOneRow || 0 < rowRecordList.size()) {
+          return true;
+        }
+
+        long nextWindowTimeEnd = Math.min(nextWindowTimeBegin + timeInterval, displayWindowEnd);
+        int oldTVListSize = rowRecordList.size();
+        while (rowRecordList.getTime(rowRecordList.size() - 1) < nextWindowTimeEnd) {
+          if (!LayerCacheUtils.cacheRow(udfInputDataSet, rowRecordList)) {
+            if (displayWindowEnd == Long.MAX_VALUE
+                // display window end == the max timestamp of the query result set
+                && oldTVListSize == rowRecordList.size()) {
+              return false;
+            } else {
+              break;
+            }
+          }
+        }
+
+        for (int i = nextIndexBegin; i < rowRecordList.size(); ++i) {
+          if (nextWindowTimeBegin <= rowRecordList.getTime(i)) {
+            nextIndexBegin = i;
+            break;
+          }
+          if (i == rowRecordList.size() - 1) {
+            nextIndexBegin = rowRecordList.size();
+          }
+        }
+
+        int nextIndexEnd = rowRecordList.size();
+        for (int i = nextIndexBegin; i < rowRecordList.size(); ++i) {
+          if (nextWindowTimeEnd <= rowRecordList.getTime(i)) {
+            nextIndexEnd = i;
+            break;
+          }
+        }
+        window.seek(nextIndexBegin, nextIndexEnd);
+
+        return true;
+      }
+
+      @Override
+      public void readyForNext() {
+        nextWindowTimeBegin += slidingStep;
+
+        rowRecordList.setEvictionUpperBound(nextIndexBegin + 1);
+      }
+
+      @Override
+      public TSDataType[] getDataTypes() {
+        return dataTypes;
+      }
+
+      @Override
+      public RowWindow currentWindow() {
+        return window;
+      }
+    };
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnMultiReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnMultiReferenceIntermediateLayer.java
deleted file mode 100644
index 861fd7d..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnMultiReferenceIntermediateLayer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.query.udf.core.layer;
-
-import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
-import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
-import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
-import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
-import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader;
-
-import java.util.List;
-
-public class MultiInputColumnMultiReferenceIntermediateLayer extends IntermediateLayer {
-
-  public MultiInputColumnMultiReferenceIntermediateLayer(
-      long queryId, float memoryBudgetInMB, List<LayerPointReader> parentLayerPointReaders) {
-    super(queryId, memoryBudgetInMB);
-  }
-
-  @Override
-  public LayerPointReader constructPointReader() {
-    return null;
-  }
-
-  @Override
-  public LayerRowReader constructRowReader() {
-    return null;
-  }
-
-  @Override
-  protected LayerRowWindowReader constructRowSlidingSizeWindowReader(
-      SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB) {
-    return null;
-  }
-
-  @Override
-  protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
-      SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
-    return null;
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnSingleReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnSingleReferenceIntermediateLayer.java
deleted file mode 100644
index 9ef769d..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnSingleReferenceIntermediateLayer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.query.udf.core.layer;
-
-import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
-import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
-import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
-import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
-import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader;
-
-import java.util.List;
-
-public class MultiInputColumnSingleReferenceIntermediateLayer extends IntermediateLayer {
-
-  public MultiInputColumnSingleReferenceIntermediateLayer(
-      long queryId, float memoryBudgetInMB, List<LayerPointReader> parentLayerPointReaders) {
-    super(queryId, memoryBudgetInMB);
-  }
-
-  @Override
-  public LayerPointReader constructPointReader() {
-    return null;
-  }
-
-  @Override
-  public LayerRowReader constructRowReader() {
-    return null;
-  }
-
-  @Override
-  protected LayerRowWindowReader constructRowSlidingSizeWindowReader(
-      SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB) {
-    return null;
-  }
-
-  @Override
-  protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
-      SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
-    return null;
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/datastructure/row/ElasticSerializableRowRecordList.java b/server/src/main/java/org/apache/iotdb/db/query/udf/datastructure/row/ElasticSerializableRowRecordList.java
index 910c842..7171ca1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/datastructure/row/ElasticSerializableRowRecordList.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/datastructure/row/ElasticSerializableRowRecordList.java
@@ -116,6 +116,10 @@ public class ElasticSerializableRowRecordList {
     return size;
   }
 
+  public TSDataType[] getDataTypes() {
+    return dataTypes;
+  }
+
   public long getTime(int index) throws IOException {
     return cache
         .get(index / internalRowRecordListCapacity)
@@ -138,7 +142,8 @@ public class ElasticSerializableRowRecordList {
           (long) indexListOfTextFields.length * byteArrayLengthForMemoryControl;
 
       if (rowRecord == null) {
-        totalByteArrayLength += indexListOfTextFields.length * byteArrayLengthForMemoryControl;
+        totalByteArrayLength +=
+            (long) indexListOfTextFields.length * byteArrayLengthForMemoryControl;
       } else {
         for (int indexListOfTextField : indexListOfTextFields) {
           Binary binary = (Binary) rowRecord[indexListOfTextField];
@@ -163,7 +168,7 @@ public class ElasticSerializableRowRecordList {
     }
 
     int newByteArrayLengthForMemoryControl = byteArrayLengthForMemoryControl;
-    while (newByteArrayLengthForMemoryControl * size < totalByteArrayLength) {
+    while ((long) newByteArrayLengthForMemoryControl * size < totalByteArrayLength) {
       newByteArrayLengthForMemoryControl *= 2;
     }
     int newInternalTVListCapacity =