You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/14 08:42:10 UTC

[iotdb] branch ml/windowSet updated (cf1cffa7f9 -> edc3bb43b7)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a change to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from cf1cffa7f9 modify WindowSplitOperator
     new 9f03e74dae implement WindowConcatOperator
     new edc3bb43b7 add V2 interface

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../main/java/org/apache/iotdb/SessionExample.java |   4 +-
 .../process/{ => window}/WindowConcatOperator.java |  44 ++++-
 .../operator/process/window/WindowSliceQueue.java  |  80 ++++++++
 .../process/{ => window}/WindowSplitOperator.java  |   3 +-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |   4 +-
 .../service/thrift/impl/ClientRPCServiceImpl.java  |  59 +++++-
 .../db/service/thrift/impl/TSServiceImpl.java      |   5 +
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   | 203 +++++++++++++++++++++
 .../apache/iotdb/session/SessionConnection.java    |   2 +-
 thrift/src/main/thrift/client.thrift               |   5 +-
 10 files changed, 391 insertions(+), 18 deletions(-)
 rename server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/{ => window}/WindowConcatOperator.java (64%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSliceQueue.java
 rename server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/{ => window}/WindowSplitOperator.java (97%)


[iotdb] 02/02: add V2 interface

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit edc3bb43b7d4613fd31e067e5abe7233885dd11d
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Nov 14 16:40:46 2022 +0800

    add V2 interface
---
 .../service/thrift/impl/ClientRPCServiceImpl.java  |  59 +++++-
 .../db/service/thrift/impl/TSServiceImpl.java      |   5 +
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   | 203 +++++++++++++++++++++
 .../apache/iotdb/session/SessionConnection.java    |   2 +-
 thrift/src/main/thrift/client.thrift               |   5 +-
 5 files changed, 270 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 61363a1d6c..a7796ac5d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -457,8 +457,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
 
       try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
         TSFetchWindowBatchResp resp =
-            createTSFetchWindowBatchResp(queryExecution.getDatasetHeader());
-        resp.setWindowBatch(QueryDataSetUtils.convertTsBlocksToWindowBatch(queryExecution));
+                createTSFetchWindowBatchResp(queryExecution.getDatasetHeader());
+        resp.setWindowBatchDataSetList(QueryDataSetUtils.convertTsBlocksToWindowBatchDataSetList(queryExecution));
         return resp;
       }
     } catch (Exception e) {
@@ -474,6 +474,61 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
     }
   }
 
+  @Override
+  public TSFetchWindowBatchResp fetchWindowBatchV2(TSFetchWindowBatchReq req) throws TException {
+    if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
+      return RpcUtils.getTSFetchWindowBatchResp(getNotLoggedInStatus());
+    }
+    long startTime = System.currentTimeMillis();
+    try {
+      Statement s = StatementGenerator.createStatement(req);
+
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(s, SESSION_MANAGER.getCurrSession());
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return RpcUtils.getTSFetchWindowBatchResp(status);
+      }
+
+      QUERY_FREQUENCY_RECORDER.incrementAndGet();
+      AUDIT_LOGGER.debug("Session {} execute fetch window set: {}", req.sessionId, req);
+      long queryId =
+              SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId);
+      // create and cache dataset
+      ExecutionResult result =
+              COORDINATOR.execute(
+                      s,
+                      queryId,
+                      SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
+                      "",
+                      PARTITION_FETCHER,
+                      SCHEMA_FETCHER,
+                      config.getQueryTimeoutThreshold());
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new RuntimeException("error code: " + result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
+        TSFetchWindowBatchResp resp =
+                createTSFetchWindowBatchResp(queryExecution.getDatasetHeader());
+        resp.setWindowBatch(QueryDataSetUtils.convertTsBlocksToWindowBatch(queryExecution));
+        return resp;
+      }
+    } catch (Exception e) {
+      // TODO call the coordinator to release query resource
+      return RpcUtils.getTSFetchWindowBatchResp(
+              onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_RAW_DATA_QUERY));
+    } finally {
+      addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+      long costTime = System.currentTimeMillis() - startTime;
+      if (costTime >= CONFIG.getSlowQueryThreshold()) {
+        SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, req);
+      }
+    }
+  }
+
   @Override
   public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
     IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 7470898fac..2ce613c3cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -283,6 +283,11 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
     return null;
   }
 
+  @Override
+  public TSFetchWindowBatchResp fetchWindowBatchV2(TSFetchWindowBatchReq req) throws TException {
+    return null;
+  }
+
   @Override
   public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {
     IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index d053e4b953..17dd769f5a 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -428,6 +428,209 @@ public class QueryDataSetUtils {
     return windowSet;
   }
 
+  public static List<TSQueryDataSet> convertTsBlocksToWindowBatchDataSetList(IQueryExecution queryExecution)
+          throws IoTDBException, IOException {
+    List<TSQueryDataSet> windowSet = new ArrayList<>();
+
+    int columnNum = queryExecution.getOutputValueColumnCount();
+    // one time column and each value column has an actual value buffer and a bitmap value to
+    // indicate whether it is a null
+    int columnNumWithTime = columnNum * 2 + 1;
+
+    while (true) {
+      Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+      if (!optionalTsBlock.isPresent()) {
+        break;
+      }
+      TsBlock tsBlock = optionalTsBlock.get();
+      if (tsBlock.isEmpty()) {
+        continue;
+      }
+
+      TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
+
+      DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime];
+      ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime];
+      for (int i = 0; i < columnNumWithTime; i++) {
+        byteArrayOutputStreams[i] = new ByteArrayOutputStream();
+        dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
+      }
+
+      int rowCount = 0;
+      int[] valueOccupation = new int[columnNum];
+
+      // used to record a bitmap for every 8 points
+      int[] bitmaps = new int[columnNum];
+
+      int currentCount = tsBlock.getPositionCount();
+      // serialize time column
+      for (int i = 0; i < currentCount; i++) {
+        // use columnOutput to write byte array
+        dataOutputStreams[0].writeLong(tsBlock.getTimeByIndex(i));
+      }
+
+      // serialize each value column and its bitmap
+      for (int k = 0; k < columnNum; k++) {
+        // get DataOutputStream for current value column and its bitmap
+        DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1];
+        DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)];
+
+        Column column = tsBlock.getColumn(k);
+        TSDataType type = column.getDataType();
+        switch (type) {
+          case INT32:
+            for (int i = 0; i < currentCount; i++) {
+              rowCount++;
+              if (column.isNull(i)) {
+                bitmaps[k] = bitmaps[k] << 1;
+              } else {
+                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+                dataOutputStream.writeInt(column.getInt(i));
+                valueOccupation[k] += 4;
+              }
+              if (rowCount != 0 && rowCount % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmaps[k]);
+                // we should clear the bitmap every 8 points
+                bitmaps[k] = 0;
+              }
+            }
+            break;
+          case INT64:
+            for (int i = 0; i < currentCount; i++) {
+              rowCount++;
+              if (column.isNull(i)) {
+                bitmaps[k] = bitmaps[k] << 1;
+              } else {
+                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+                dataOutputStream.writeLong(column.getLong(i));
+                valueOccupation[k] += 8;
+              }
+              if (rowCount != 0 && rowCount % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmaps[k]);
+                // we should clear the bitmap every 8 points
+                bitmaps[k] = 0;
+              }
+            }
+            break;
+          case FLOAT:
+            for (int i = 0; i < currentCount; i++) {
+              rowCount++;
+              if (column.isNull(i)) {
+                bitmaps[k] = bitmaps[k] << 1;
+              } else {
+                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+                dataOutputStream.writeFloat(column.getFloat(i));
+                valueOccupation[k] += 4;
+              }
+              if (rowCount != 0 && rowCount % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmaps[k]);
+                // we should clear the bitmap every 8 points
+                bitmaps[k] = 0;
+              }
+            }
+            break;
+          case DOUBLE:
+            for (int i = 0; i < currentCount; i++) {
+              rowCount++;
+              if (column.isNull(i)) {
+                bitmaps[k] = bitmaps[k] << 1;
+              } else {
+                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+                dataOutputStream.writeDouble(column.getDouble(i));
+                valueOccupation[k] += 8;
+              }
+              if (rowCount != 0 && rowCount % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmaps[k]);
+                // we should clear the bitmap every 8 points
+                bitmaps[k] = 0;
+              }
+            }
+            break;
+          case BOOLEAN:
+            for (int i = 0; i < currentCount; i++) {
+              rowCount++;
+              if (column.isNull(i)) {
+                bitmaps[k] = bitmaps[k] << 1;
+              } else {
+                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+                dataOutputStream.writeBoolean(column.getBoolean(i));
+                valueOccupation[k] += 1;
+              }
+              if (rowCount != 0 && rowCount % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmaps[k]);
+                // we should clear the bitmap every 8 points
+                bitmaps[k] = 0;
+              }
+            }
+            break;
+          case TEXT:
+            for (int i = 0; i < currentCount; i++) {
+              rowCount++;
+              if (column.isNull(i)) {
+                bitmaps[k] = bitmaps[k] << 1;
+              } else {
+                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+                Binary binary = column.getBinary(i);
+                dataOutputStream.writeInt(binary.getLength());
+                dataOutputStream.write(binary.getValues());
+                valueOccupation[k] = valueOccupation[k] + 4 + binary.getLength();
+              }
+              if (rowCount != 0 && rowCount % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmaps[k]);
+                // we should clear the bitmap every 8 points
+                bitmaps[k] = 0;
+              }
+            }
+            break;
+          default:
+            throw new UnSupportedDataTypeException(
+                    String.format("Data type %s is not supported.", type));
+        }
+        if (k != columnNum - 1) {
+          rowCount -= currentCount;
+        }
+      }
+
+      // feed the remaining bitmap
+      int remaining = rowCount % 8;
+      for (int k = 0; k < columnNum; k++) {
+        if (remaining != 0) {
+          DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)];
+          dataBitmapOutputStream.writeByte(bitmaps[k] << (8 - remaining));
+        }
+      }
+
+      // calculate the time buffer size
+      int timeOccupation = rowCount * 8;
+      ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
+      timeBuffer.put(byteArrayOutputStreams[0].toByteArray());
+      timeBuffer.flip();
+      tsQueryDataSet.setTime(timeBuffer);
+
+      // calculate the bitmap buffer size
+      int bitmapOccupation = (rowCount + 7) / 8;
+
+      List<ByteBuffer> bitmapList = new LinkedList<>();
+      List<ByteBuffer> valueList = new LinkedList<>();
+      for (int i = 1; i < byteArrayOutputStreams.length; i += 2) {
+        ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 2]);
+        valueBuffer.put(byteArrayOutputStreams[i].toByteArray());
+        valueBuffer.flip();
+        valueList.add(valueBuffer);
+
+        ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
+        bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray());
+        bitmapBuffer.flip();
+        bitmapList.add(bitmapBuffer);
+      }
+      tsQueryDataSet.setBitmapList(bitmapList);
+      tsQueryDataSet.setValueList(valueList);
+
+      windowSet.add(tsQueryDataSet);
+    }
+    return windowSet;
+  }
+
   public static long[] readTimesFromBuffer(ByteBuffer buffer, int size) {
     long[] times = new long[size];
     for (int i = 0; i < size; i++) {
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 1cfa1666c3..f0a11bcecc 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -506,7 +506,7 @@ public class SessionConnection {
 
     TSFetchWindowBatchResp resp;
     try {
-      resp = client.fetchWindowBatch(req);
+      resp = client.fetchWindowBatchV2(req);
       RpcUtils.verifySuccess(resp.getStatus());
     } catch (TException e) {
       throw new StatementExecutionException("");
diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift
index 6d5f6cfc11..f97fa4012e 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -434,7 +434,8 @@ struct TSFetchWindowBatchResp {
   2: required list<string> columnNameList
   3: required list<string> columnTypeList
   4: required map<string, i32> columnNameIndexMap
-  5: required list<list<binary>> windowBatch
+  5: optional list<list<binary>> windowBatch
+  6: optional list<TSQueryDataSet> windowBatchDataSetList
 }
 
 // The sender and receiver need to check some info to confirm validity
@@ -586,4 +587,6 @@ service IClientRPCService {
   TSConnectionInfoResp fetchAllConnectionsInfo();
 
   TSFetchWindowBatchResp fetchWindowBatch(1:TSFetchWindowBatchReq req);
+
+  TSFetchWindowBatchResp fetchWindowBatchV2(1:TSFetchWindowBatchReq req);
 }
\ No newline at end of file


[iotdb] 01/02: implement WindowConcatOperator

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9f03e74dae5a75e75cf5e4ad988d65ee1126cb8e
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Nov 14 16:21:20 2022 +0800

    implement WindowConcatOperator
---
 .../main/java/org/apache/iotdb/SessionExample.java |  4 +-
 .../process/{ => window}/WindowConcatOperator.java | 44 +++++++++---
 .../operator/process/window/WindowSliceQueue.java  | 80 ++++++++++++++++++++++
 .../process/{ => window}/WindowSplitOperator.java  |  3 +-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  4 +-
 5 files changed, 121 insertions(+), 14 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 9baf3a8ad2..a488d292a6 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -72,8 +72,8 @@ public class SessionExample {
     // set session fetchSize
     session.setFetchSize(10000);
 
-    List<String> queryPaths = Arrays.asList("root.sg1.d1.s1", "root.sg1.d2.s1");
-    List<Integer> indexes = Arrays.asList(0, 1, 2, 3);
+    List<String> queryPaths = Arrays.asList("root.sg1.d1.s1", "root.sg1.d1.s2");
+    List<Integer> indexes = Arrays.asList(1, 2, 6, 7);
     List<SessionDataSet> windowBatch =
         session.fetchWindowBatch(queryPaths, null, 0, 32, 4, 3, indexes);
     for (SessionDataSet window : windowBatch) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowConcatOperator.java
similarity index 64%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowConcatOperator.java
index 269b579feb..4dc9f14f2d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowConcatOperator.java
@@ -17,15 +17,15 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.execution.operator.process;
+package org.apache.iotdb.db.mpp.execution.operator.process.window;
 
 import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 
 import java.util.List;
 
@@ -34,13 +34,11 @@ public class WindowConcatOperator implements ProcessOperator {
   protected final OperatorContext operatorContext;
 
   protected final Operator child;
-  protected TsBlock inputTsBlock;
-  protected boolean canCallNext;
 
   private final ITimeRangeIterator sampleTimeRangeIterator;
   private TimeRange curTimeRange;
 
-  private final TsBlockBuilder resultTsBlockBuilder;
+  private final WindowSliceQueue windowSliceQueue;
 
   public WindowConcatOperator(
       OperatorContext operatorContext,
@@ -50,7 +48,7 @@ public class WindowConcatOperator implements ProcessOperator {
     this.operatorContext = operatorContext;
     this.child = child;
     this.sampleTimeRangeIterator = sampleTimeRangeIterator;
-    this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
+    this.windowSliceQueue = new WindowSliceQueue(outputDataTypes);
   }
 
   @Override
@@ -60,17 +58,45 @@ public class WindowConcatOperator implements ProcessOperator {
 
   @Override
   public TsBlock next() {
-    return child.next();
+    if (!child.hasNext()) {
+      curTimeRange = null;
+      return windowSliceQueue.outputWindow();
+    }
+
+    TsBlock inputTsBlock = child.next();
+    if (inputTsBlock == null) {
+      return null;
+    }
+
+    if (curTimeRange == null && sampleTimeRangeIterator.hasNextTimeRange()) {
+      curTimeRange = sampleTimeRangeIterator.nextTimeRange();
+      windowSliceQueue.updateTimeRange(curTimeRange);
+    }
+
+    if (inputTsBlock.getStartTime() > curTimeRange.getMax()) {
+      TsBlock outputWindow = windowSliceQueue.outputWindow();
+      if (sampleTimeRangeIterator.hasNextTimeRange()) {
+        curTimeRange = sampleTimeRangeIterator.nextTimeRange();
+        windowSliceQueue.updateTimeRange(curTimeRange);
+      } else {
+        curTimeRange = null;
+      }
+      windowSliceQueue.processTsBlock(inputTsBlock);
+      return outputWindow;
+    } else {
+      windowSliceQueue.processTsBlock(inputTsBlock);
+      return null;
+    }
   }
 
   @Override
   public boolean hasNext() {
-    return child.hasNext();
+    return curTimeRange != null || sampleTimeRangeIterator.hasNextTimeRange();
   }
 
   @Override
   public boolean isFinished() {
-    return child.isFinished();
+    return !this.hasNext();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSliceQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSliceQueue.java
new file mode 100644
index 0000000000..59531d2ced
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSliceQueue.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.window;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+
+public class WindowSliceQueue {
+
+  // cached window slice
+  private final Deque<TsBlock> deque = new LinkedList<>();
+
+  private TimeRange curTimeRange;
+
+  private final TsBlockBuilder windowBuilder;
+
+  public WindowSliceQueue(List<TSDataType> dataTypeList) {
+    this.windowBuilder = new TsBlockBuilder(dataTypeList);
+  }
+
+  public void processTsBlock(TsBlock tsBlock) {
+    deque.addLast(tsBlock);
+  }
+
+  public void updateTimeRange(TimeRange curTimeRange) {
+    this.curTimeRange = curTimeRange;
+    evictingExpiredSlice();
+  }
+
+  public void evictingExpiredSlice() {
+    while (!deque.isEmpty() && !curTimeRange.contains(deque.getFirst().getStartTime())) {
+      deque.removeFirst();
+    }
+  }
+
+  public TsBlock outputWindow() {
+    windowBuilder.reset();
+
+    TimeColumnBuilder timeColumnBuilder = windowBuilder.getTimeColumnBuilder();
+    ColumnBuilder[] columnBuilders = windowBuilder.getValueColumnBuilders();
+    int valueColumnCount = columnBuilders.length;
+
+    for (TsBlock windowSlice : deque) {
+      int positionCount = windowSlice.getPositionCount();
+      for (int index = 0; index < positionCount; index++) {
+        timeColumnBuilder.write(windowSlice.getTimeColumn(), index);
+        for (int columnIndex = 0; columnIndex < valueColumnCount; columnIndex++) {
+          columnBuilders[columnIndex].write(windowSlice.getColumn(columnIndex), index);
+        }
+      }
+      windowBuilder.declarePositions(positionCount);
+    }
+    return windowBuilder.build();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSplitOperator.java
similarity index 97%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSplitOperator.java
index 6b9544b1a1..039822429d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSplitOperator.java
@@ -17,11 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.execution.operator.process;
+package org.apache.iotdb.db.mpp.execution.operator.process.window;
 
 import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index d839451544..eb50d6ab37 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -54,8 +54,6 @@ import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOper
 import org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TagAggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.WindowConcatOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.WindowSplitOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BinaryConstantFill;
@@ -91,6 +89,8 @@ import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryOperator
 import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQuerySortOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryUtil;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.UpdateLastCacheOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.window.WindowConcatOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.window.WindowSplitOperator;
 import org.apache.iotdb.db.mpp.execution.operator.schema.CountMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.schema.DevicesCountOperator;
 import org.apache.iotdb.db.mpp.execution.operator.schema.DevicesSchemaScanOperator;