You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/14 07:05:13 UTC

[iotdb] branch ml/windowSet updated (c8b333f1f8 -> cf1cffa7f9)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a change to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from c8b333f1f8 update interface define
     new 38cc84b784 change interface name
     new 21df9f4ea8 add WindowConcatNode
     new 64b0342513 add WindowConcatOperator
     new 9c7d6c7bbd add SampleWindowSliceIterator
     new cf1cffa7f9 modify WindowSplitOperator

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../main/java/org/apache/iotdb/SessionExample.java |  7 +-
 .../iotdb/session/it/IoTDBFetchWindowSetIT.java    |  2 +-
 .../timerangeiterator/SampleWindowIterator.java    |  6 +-
 .../SampleWindowSliceIterator.java}                | 86 ++++++++++++----------
 .../TimeRangeIteratorFactory.java                  | 10 ++-
 ...fsetOperator.java => WindowConcatOperator.java} | 61 +++++++--------
 .../operator/process/WindowSplitOperator.java      | 27 +++++--
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  | 16 ++--
 .../iotdb/db/mpp/plan/constant/StatementType.java  |  2 +-
 .../db/mpp/plan/parser/StatementGenerator.java     |  8 +-
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    | 16 ++++
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    | 13 ++--
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 47 +++++++++++-
 .../planner/distribution/DistributionPlanner.java  |  4 +-
 .../SimpleFragmentParallelPlanner.java             |  4 +-
 .../mpp/plan/planner/plan/node/PlanNodeType.java   |  9 ++-
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |  5 ++
 ...{WindowSplitNode.java => WindowConcatNode.java} | 22 +++---
 .../db/mpp/plan/statement/StatementVisitor.java    |  6 +-
 ...atement.java => FetchWindowBatchStatement.java} | 15 ++--
 .../service/thrift/impl/ClientRPCServiceImpl.java  | 27 +++----
 .../db/service/thrift/impl/TSServiceImpl.java      |  6 +-
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   |  2 +-
 .../db/mpp/aggregation/TimeRangeIteratorTest.java  | 71 +++++++++++++++++-
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   | 10 +--
 .../java/org/apache/iotdb/session/ISession.java    |  2 +-
 .../java/org/apache/iotdb/session/Session.java     |  4 +-
 .../apache/iotdb/session/SessionConnection.java    | 24 +++---
 thrift/src/main/thrift/client.thrift               |  8 +-
 29 files changed, 340 insertions(+), 180 deletions(-)
 copy server/src/main/java/org/apache/iotdb/db/{utils/timerangeiterator/PreAggrWindowWithNaturalMonthIterator.java => mpp/aggregation/timerangeiterator/SampleWindowSliceIterator.java} (64%)
 copy server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/{OffsetOperator.java => WindowConcatOperator.java} (57%)
 copy server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/{WindowSplitNode.java => WindowConcatNode.java} (86%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/{FetchWindowSetStatement.java => FetchWindowBatchStatement.java} (85%)


[iotdb] 02/05: add WindowConcatNode

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 21df9f4ea8a1d2cdecb936e5fc9ad83c0ca28556
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Nov 14 10:21:25 2022 +0800

    add WindowConcatNode
---
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  16 +--
 .../iotdb/db/mpp/plan/constant/StatementType.java  |   2 +-
 .../db/mpp/plan/parser/StatementGenerator.java     |   4 +-
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    |  16 +++
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    |  13 +-
 .../planner/distribution/DistributionPlanner.java  |   4 +-
 .../SimpleFragmentParallelPlanner.java             |   4 +-
 .../mpp/plan/planner/plan/node/PlanNodeType.java   |   9 +-
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |   5 +
 .../plan/node/process/WindowConcatNode.java        | 140 +++++++++++++++++++++
 .../db/mpp/plan/statement/StatementVisitor.java    |   6 +-
 ...atement.java => FetchWindowBatchStatement.java} |  15 +--
 12 files changed, 200 insertions(+), 34 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index ad46dbdfb1..76beb5b7ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -68,7 +68,7 @@ import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
 import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
 import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
 import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
-import org.apache.iotdb.db.mpp.plan.statement.crud.FetchWindowSetStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.FetchWindowBatchStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -1208,17 +1208,17 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
   }
 
   @Override
-  public Analysis visitFetchWindowSet(
-      FetchWindowSetStatement fetchWindowSetStatement, MPPQueryContext context) {
+  public Analysis visitFetchWindowBatch(
+      FetchWindowBatchStatement fetchWindowBatchStatement, MPPQueryContext context) {
     Analysis analysis = new Analysis();
-    analysis.setStatement(fetchWindowSetStatement);
+    analysis.setStatement(fetchWindowBatchStatement);
 
     // check for semantic errors
-    fetchWindowSetStatement.semanticCheck();
+    fetchWindowBatchStatement.semanticCheck();
 
     // concat path and construct path pattern tree
     PathPatternTree patternTree = new PathPatternTree();
-    for (PartialPath path : fetchWindowSetStatement.getQueryPaths()) {
+    for (PartialPath path : fetchWindowBatchStatement.getQueryPaths()) {
       patternTree.appendFullPath(path);
     }
 
@@ -1237,8 +1237,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     analysis.setSourceExpressions(sourceExpressions);
 
     // set transform
-    if (fetchWindowSetStatement.getFunctionName() != null) {
-      String functionName = fetchWindowSetStatement.getFunctionName();
+    if (fetchWindowBatchStatement.getFunctionName() != null) {
+      String functionName = fetchWindowBatchStatement.getFunctionName();
       Set<Expression> sourceTransformExpressions =
           sourceExpressions.stream()
               .map(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/StatementType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/StatementType.java
index bb6f9d9d64..ee2eb65edc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/StatementType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/StatementType.java
@@ -153,5 +153,5 @@ public enum StatementType {
 
   DEACTIVATE_TEMPLATE,
 
-  FETCH_WINDOW_SET
+  FETCH_WINDOW_BATCH
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index d0b5f5167b..3825bb2af2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -37,7 +37,7 @@ import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
 import org.apache.iotdb.db.mpp.plan.statement.component.SelectComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
 import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
-import org.apache.iotdb.db.mpp.plan.statement.crud.FetchWindowSetStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.FetchWindowBatchStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -181,7 +181,7 @@ public class StatementGenerator {
 
   public static Statement createStatement(TSFetchWindowBatchReq fetchWindowSetReq)
       throws IllegalPathException {
-    FetchWindowSetStatement statement = new FetchWindowSetStatement();
+    FetchWindowBatchStatement statement = new FetchWindowBatchStatement();
 
     // set queryPaths
     List<PartialPath> queryPaths = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index a212cb6326..9d362fa306 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -64,6 +64,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.WindowConcatNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.WindowSplitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
@@ -900,6 +901,21 @@ public class LogicalPlanBuilder {
     return this;
   }
 
+  public LogicalPlanBuilder planWindowConcat(
+      GroupByTimeParameter groupByTimeParameter, List<Integer> samplingIndexes) {
+    if (!groupByTimeParameter.hasOverlap()) {
+      return this;
+    }
+
+    this.root =
+        new WindowConcatNode(
+            context.getQueryId().genPlanNodeId(),
+            this.getRoot(),
+            groupByTimeParameter,
+            samplingIndexes);
+    return this;
+  }
+
   /** Meta Query* */
   public LogicalPlanBuilder planTimeSeriesSchemaSource(
       PartialPath pathPattern,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index ef1d9b8502..3d3c02b2e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -43,7 +43,7 @@ import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
 import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
-import org.apache.iotdb.db.mpp.plan.statement.crud.FetchWindowSetStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.FetchWindowBatchStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -291,16 +291,19 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
   }
 
   @Override
-  public PlanNode visitFetchWindowSet(
-      FetchWindowSetStatement fetchWindowSetStatement, MPPQueryContext context) {
+  public PlanNode visitFetchWindowBatch(
+      FetchWindowBatchStatement fetchWindowBatchStatement, MPPQueryContext context) {
     LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
     planBuilder
         .planRawDataSource(analysis.getSourceExpressions(), Ordering.ASC, null)
         .planTransform(
             analysis.getSourceTransformExpressions(), true, ZoneId.systemDefault(), Ordering.ASC)
         .planWindowSplit(
-            fetchWindowSetStatement.getGroupByTimeParameter(),
-            fetchWindowSetStatement.getSamplingIndexes());
+            fetchWindowBatchStatement.getGroupByTimeParameter(),
+            fetchWindowBatchStatement.getSamplingIndexes())
+        .planWindowConcat(
+            fetchWindowBatchStatement.getGroupByTimeParameter(),
+            fetchWindowBatchStatement.getSamplingIndexes());
 
     return planBuilder.getRoot();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
index 8a6267353b..6307310f17 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -32,7 +32,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
-import org.apache.iotdb.db.mpp.plan.statement.crud.FetchWindowSetStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.FetchWindowBatchStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
 
 import java.util.List;
@@ -69,7 +69,7 @@ public class DistributionPlanner {
     PlanNode rootAfterRewrite = rewriteSource();
     PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
     if (analysis.getStatement() instanceof QueryStatement
-        || analysis.getStatement() instanceof FetchWindowSetStatement) {
+        || analysis.getStatement() instanceof FetchWindowBatchStatement) {
       analysis
           .getRespDatasetHeader()
           .setColumnToTsBlockIndexMap(rootWithExchange.getOutputColumnNames());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 03341cf2be..3b79ad4748 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -34,7 +34,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
-import org.apache.iotdb.db.mpp.plan.statement.crud.FetchWindowSetStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.FetchWindowBatchStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
@@ -116,7 +116,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
     fragmentInstance.setHostDataNode(selectTargetDataNode(regionReplicaSet));
 
     if (analysis.getStatement() instanceof QueryStatement
-        || analysis.getStatement() instanceof FetchWindowSetStatement) {
+        || analysis.getStatement() instanceof FetchWindowBatchStatement) {
       fragmentInstance.getFragment().generateTypeProvider(queryContext.getTypeProvider());
     }
     instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index 800459849b..7588c8f4e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -64,6 +64,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggre
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.WindowConcatNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.WindowSplitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
@@ -151,7 +153,8 @@ public enum PlanNodeType {
   DEACTIVATE_TEMPLATE_NODE((short) 61),
   INTO((short) 62),
   DEVICE_VIEW_INTO((short) 63),
-  WINDOW_SPLIT((short) 64);
+  WINDOW_SPLIT((short) 64),
+  WINDOW_CONCAT((short) 65);
 
   public static final int BYTES = Short.BYTES;
 
@@ -328,6 +331,10 @@ public enum PlanNodeType {
         return IntoNode.deserialize(buffer);
       case 63:
         return DeviceViewIntoNode.deserialize(buffer);
+      case 64:
+        return WindowSplitNode.deserialize(buffer);
+      case 65:
+        return WindowConcatNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 6a064cd90d..7d3fa1dc19 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggre
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.WindowConcatNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.WindowSplitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
@@ -331,4 +332,8 @@ public abstract class PlanVisitor<R, C> {
   public R visitWindowSplit(WindowSplitNode node, C context) {
     return visitPlan(node, context);
   }
+
+  public R visitWindowConcat(WindowConcatNode node, C context) {
+    return visitPlan(node, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/WindowConcatNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/WindowConcatNode.java
new file mode 100644
index 0000000000..4cf16d88d0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/WindowConcatNode.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class WindowConcatNode extends SingleChildProcessNode {
+
+  private final GroupByTimeParameter groupByTimeParameter;
+  private final List<Integer> samplingIndexes;
+
+  public WindowConcatNode(
+      PlanNodeId id,
+      PlanNode child,
+      GroupByTimeParameter groupByTimeParameter,
+      List<Integer> samplingIndexes) {
+    super(id, child);
+    this.groupByTimeParameter = groupByTimeParameter;
+    this.samplingIndexes = samplingIndexes;
+  }
+
+  public WindowConcatNode(
+      PlanNodeId id, GroupByTimeParameter groupByTimeParameter, List<Integer> samplingIndexes) {
+    super(id);
+    this.groupByTimeParameter = groupByTimeParameter;
+    this.samplingIndexes = samplingIndexes;
+  }
+
+  public GroupByTimeParameter getGroupByTimeParameter() {
+    return groupByTimeParameter;
+  }
+
+  public List<Integer> getSamplingIndexes() {
+    return samplingIndexes;
+  }
+
+  @Override
+  public PlanNode clone() {
+    return new WindowConcatNode(getPlanNodeId(), groupByTimeParameter, samplingIndexes);
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return child.getOutputColumnNames();
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.WINDOW_CONCAT.serialize(byteBuffer);
+    groupByTimeParameter.serialize(byteBuffer);
+    ReadWriteIOUtils.write(samplingIndexes.size(), byteBuffer);
+    for (Integer index : samplingIndexes) {
+      ReadWriteIOUtils.write(index, byteBuffer);
+    }
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws IOException {
+    PlanNodeType.WINDOW_CONCAT.serialize(stream);
+    groupByTimeParameter.serialize(stream);
+    ReadWriteIOUtils.write(samplingIndexes.size(), stream);
+    for (Integer index : samplingIndexes) {
+      ReadWriteIOUtils.write(index, stream);
+    }
+  }
+
+  public static WindowConcatNode deserialize(ByteBuffer byteBuffer) {
+    GroupByTimeParameter groupByTimeParameter = GroupByTimeParameter.deserialize(byteBuffer);
+
+    int listSize = ReadWriteIOUtils.readInt(byteBuffer);
+    List<Integer> samplingIndexes = new ArrayList<>(listSize);
+    while (listSize > 0) {
+      samplingIndexes.add(ReadWriteIOUtils.readInt(byteBuffer));
+      listSize--;
+    }
+
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new WindowConcatNode(planNodeId, groupByTimeParameter, samplingIndexes);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    WindowConcatNode that = (WindowConcatNode) o;
+    return groupByTimeParameter.equals(that.groupByTimeParameter)
+        && samplingIndexes.equals(that.samplingIndexes);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode(), groupByTimeParameter, samplingIndexes);
+  }
+
+  @Override
+  public String toString() {
+    return String.format("WindowConcatNode-%s", getPlanNodeId());
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitWindowConcat(this, context);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index be03262f80..7d83e3fc01 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.mpp.plan.statement;
 
 import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
-import org.apache.iotdb.db.mpp.plan.statement.crud.FetchWindowSetStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.FetchWindowBatchStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -208,8 +208,8 @@ public abstract class StatementVisitor<R, C> {
     return visitStatement(queryStatement, context);
   }
 
-  public R visitFetchWindowSet(FetchWindowSetStatement fetchWindowSetStatement, C context) {
-    return visitStatement(fetchWindowSetStatement, context);
+  public R visitFetchWindowBatch(FetchWindowBatchStatement fetchWindowBatchStatement, C context) {
+    return visitStatement(fetchWindowBatchStatement, context);
   }
 
   // Insert Statement
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FetchWindowSetStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FetchWindowBatchStatement.java
similarity index 85%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FetchWindowSetStatement.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FetchWindowBatchStatement.java
index fa1b815cda..756c5cc86d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FetchWindowSetStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FetchWindowBatchStatement.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.mpp.plan.statement.crud;
 
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.mpp.plan.constant.StatementType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
@@ -28,16 +27,16 @@ import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 
 import java.util.List;
 
-public class FetchWindowSetStatement extends Statement {
+public class FetchWindowBatchStatement extends Statement {
 
   private List<PartialPath> queryPaths;
   private String functionName;
   private GroupByTimeParameter groupByTimeParameter;
   private List<Integer> samplingIndexes;
 
-  public FetchWindowSetStatement() {
+  public FetchWindowBatchStatement() {
     super();
-    statementType = StatementType.FETCH_WINDOW_SET;
+    statementType = StatementType.FETCH_WINDOW_BATCH;
   }
 
   public List<PartialPath> getQueryPaths() {
@@ -79,12 +78,8 @@ public class FetchWindowSetStatement extends Statement {
 
   @Override
   public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
-    return visitor.visitFetchWindowSet(this, context);
+    return visitor.visitFetchWindowBatch(this, context);
   }
 
-  public void semanticCheck() {
-    if (groupByTimeParameter.hasOverlap()) {
-      throw new SemanticException("");
-    }
-  }
+  public void semanticCheck() {}
 }


[iotdb] 03/05: add WindowConcatOperator

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 64b03425133cc6b758e93762b92994818638d7fe
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Nov 14 10:27:00 2022 +0800

    add WindowConcatOperator
---
 .../TimeRangeIteratorFactory.java                  |  3 +-
 ...plitOperator.java => WindowConcatOperator.java} | 82 ++--------------------
 .../operator/process/WindowSplitOperator.java      | 12 ++--
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 32 ++++++++-
 4 files changed, 45 insertions(+), 84 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java
index cd0ac8a08e..552cca4bb2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java
@@ -80,7 +80,8 @@ public class TimeRangeIteratorFactory {
       long endTime,
       long interval,
       long slidingStep,
-      List<Integer> samplingIndexes) {
+      List<Integer> samplingIndexes,
+      boolean outputPartialTimeWindow) {
     return new SampleWindowIterator(startTime, endTime, interval, slidingStep, samplingIndexes);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java
similarity index 50%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java
index d8d4ebc1f7..1f40016f9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java
@@ -26,15 +26,10 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockUtil;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
-
-import com.google.common.util.concurrent.ListenableFuture;
 
 import java.util.List;
 
-public class WindowSplitOperator implements ProcessOperator {
+public class WindowConcatOperator implements ProcessOperator {
 
   protected final OperatorContext operatorContext;
 
@@ -47,7 +42,7 @@ public class WindowSplitOperator implements ProcessOperator {
 
   private final TsBlockBuilder resultTsBlockBuilder;
 
-  public WindowSplitOperator(
+  public WindowConcatOperator(
       OperatorContext operatorContext,
       Operator child,
       ITimeRangeIterator sampleTimeRangeIterator,
@@ -60,87 +55,22 @@ public class WindowSplitOperator implements ProcessOperator {
 
   @Override
   public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  @Override
-  public ListenableFuture<?> isBlocked() {
-    return child.isBlocked();
+    return null;
   }
 
   @Override
   public TsBlock next() {
-    // reset operator state
-    canCallNext = true;
-
-    if (curTimeRange == null && sampleTimeRangeIterator.hasNextTimeRange()) {
-      // move to next time window
-      curTimeRange = sampleTimeRangeIterator.nextTimeRange();
-    }
-
-    if (!fetchData()) {
-      return null;
-    } else {
-      curTimeRange = null;
-      TsBlock resultTsBlock = resultTsBlockBuilder.build();
-      resultTsBlockBuilder.reset();
-      return resultTsBlock;
-    }
-  }
-
-  private boolean fetchData() {
-    while (!consumeInput()) {
-      // NOTE: child.next() can only be invoked once
-      if (child.hasNext() && canCallNext) {
-        inputTsBlock = child.next();
-        canCallNext = false;
-      } else {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  private boolean consumeInput() {
-    if (inputTsBlock == null) {
-      return false;
-    }
-
-    inputTsBlock = TsBlockUtil.skipPointsOutOfTimeRange(inputTsBlock, curTimeRange, true);
-    if (inputTsBlock == null) {
-      return false;
-    }
-
-    for (int readIndex = 0; readIndex < inputTsBlock.getPositionCount(); readIndex++) {
-      long time = inputTsBlock.getTimeByIndex(readIndex);
-      if (curTimeRange.contains(time)) {
-        writeData(readIndex);
-      } else {
-        inputTsBlock = inputTsBlock.subTsBlock(readIndex);
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private void writeData(int readIndex) {
-    TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
-    timeColumnBuilder.writeLong(inputTsBlock.getTimeByIndex(readIndex));
-    ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
-    for (int columnIndex = 0; columnIndex < columnBuilders.length; columnIndex++) {
-      columnBuilders[columnIndex].write(inputTsBlock.getColumn(columnIndex), readIndex);
-    }
-    resultTsBlockBuilder.declarePosition();
+    return null;
   }
 
   @Override
   public boolean hasNext() {
-    return curTimeRange != null || sampleTimeRangeIterator.hasNextTimeRange();
+    return false;
   }
 
   @Override
   public boolean isFinished() {
-    return !this.hasNext();
+    return false;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
index d8d4ebc1f7..f2ee804f75 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
@@ -42,7 +42,7 @@ public class WindowSplitOperator implements ProcessOperator {
   protected TsBlock inputTsBlock;
   protected boolean canCallNext;
 
-  private final ITimeRangeIterator sampleTimeRangeIterator;
+  private final ITimeRangeIterator sampleTimeRangeSliceIterator;
   private TimeRange curTimeRange;
 
   private final TsBlockBuilder resultTsBlockBuilder;
@@ -50,11 +50,11 @@ public class WindowSplitOperator implements ProcessOperator {
   public WindowSplitOperator(
       OperatorContext operatorContext,
       Operator child,
-      ITimeRangeIterator sampleTimeRangeIterator,
+      ITimeRangeIterator sampleTimeRangeSliceIterator,
       List<TSDataType> outputDataTypes) {
     this.operatorContext = operatorContext;
     this.child = child;
-    this.sampleTimeRangeIterator = sampleTimeRangeIterator;
+    this.sampleTimeRangeSliceIterator = sampleTimeRangeSliceIterator;
     this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
   }
 
@@ -73,9 +73,9 @@ public class WindowSplitOperator implements ProcessOperator {
     // reset operator state
     canCallNext = true;
 
-    if (curTimeRange == null && sampleTimeRangeIterator.hasNextTimeRange()) {
+    if (curTimeRange == null && sampleTimeRangeSliceIterator.hasNextTimeRange()) {
       // move to next time window
-      curTimeRange = sampleTimeRangeIterator.nextTimeRange();
+      curTimeRange = sampleTimeRangeSliceIterator.nextTimeRange();
     }
 
     if (!fetchData()) {
@@ -135,7 +135,7 @@ public class WindowSplitOperator implements ProcessOperator {
 
   @Override
   public boolean hasNext() {
-    return curTimeRange != null || sampleTimeRangeIterator.hasNextTimeRange();
+    return curTimeRange != null || sampleTimeRangeSliceIterator.hasNextTimeRange();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 4c69cd5720..888c3f94a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -54,6 +54,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOper
 import org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TagAggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.WindowConcatOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.WindowSplitOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
@@ -149,6 +150,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggre
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.WindowConcatNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.WindowSplitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
@@ -1596,7 +1598,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
             groupByTimeParameter.getEndTime(),
             groupByTimeParameter.getInterval(),
             groupByTimeParameter.getSlidingStep(),
-            node.getSamplingIndexes());
+            node.getSamplingIndexes(),
+            true);
 
     List<TSDataType> outputDataTypes = getOutputColumnTypes(node, context.getTypeProvider());
 
@@ -1604,6 +1607,33 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     return new WindowSplitOperator(operatorContext, child, timeRangeIterator, outputDataTypes);
   }
 
+  @Override
+  public Operator visitWindowConcat(WindowConcatNode node, LocalExecutionPlanContext context) {
+    Operator child = node.getChild().accept(this, context);
+    OperatorContext operatorContext =
+        context
+            .getInstanceContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                WindowConcatOperator.class.getSimpleName());
+
+    GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+    ITimeRangeIterator timeRangeIterator =
+        TimeRangeIteratorFactory.getSampleTimeRangeIterator(
+            groupByTimeParameter.getStartTime(),
+            groupByTimeParameter.getEndTime(),
+            groupByTimeParameter.getInterval(),
+            groupByTimeParameter.getSlidingStep(),
+            node.getSamplingIndexes(),
+            false);
+
+    List<TSDataType> outputDataTypes = getOutputColumnTypes(node, context.getTypeProvider());
+
+    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+    return new WindowConcatOperator(operatorContext, child, timeRangeIterator, outputDataTypes);
+  }
+
   @Override
   public Operator visitSchemaFetchMerge(
       SchemaFetchMergeNode node, LocalExecutionPlanContext context) {


[iotdb] 05/05: modify WindowSplitOperator

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit cf1cffa7f9bad2b738a27d902a07e4a4b32558d3
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Nov 14 15:04:54 2022 +0800

    modify WindowSplitOperator
---
 .../main/java/org/apache/iotdb/SessionExample.java |  4 +--
 .../operator/process/WindowConcatOperator.java     | 14 ++++-----
 .../operator/process/WindowSplitOperator.java      | 33 ++++++++++++++++------
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 17 +++++++++--
 4 files changed, 49 insertions(+), 19 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 8760400c91..9baf3a8ad2 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -73,9 +73,9 @@ public class SessionExample {
     session.setFetchSize(10000);
 
     List<String> queryPaths = Arrays.asList("root.sg1.d1.s1", "root.sg1.d2.s1");
-    List<Integer> indexes = Arrays.asList(1, 3, 5, 7);
+    List<Integer> indexes = Arrays.asList(0, 1, 2, 3);
     List<SessionDataSet> windowBatch =
-        session.fetchWindowBatch(queryPaths, null, 1, 40, 2, 2, indexes);
+        session.fetchWindowBatch(queryPaths, null, 0, 32, 4, 3, indexes);
     for (SessionDataSet window : windowBatch) {
       System.out.println(window.getColumnNames());
       while (window.hasNext()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java
index 1f40016f9a..269b579feb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java
@@ -55,36 +55,36 @@ public class WindowConcatOperator implements ProcessOperator {
 
   @Override
   public OperatorContext getOperatorContext() {
-    return null;
+    return operatorContext;
   }
 
   @Override
   public TsBlock next() {
-    return null;
+    return child.next();
   }
 
   @Override
   public boolean hasNext() {
-    return false;
+    return child.hasNext();
   }
 
   @Override
   public boolean isFinished() {
-    return false;
+    return child.isFinished();
   }
 
   @Override
   public long calculateMaxPeekMemory() {
-    return 0;
+    return child.calculateMaxPeekMemory();
   }
 
   @Override
   public long calculateMaxReturnSize() {
-    return 0;
+    return child.calculateMaxReturnSize();
   }
 
   @Override
   public long calculateRetainedSizeAfterCallingNext() {
-    return 0;
+    return child.calculateRetainedSizeAfterCallingNext();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
index f2ee804f75..6b9544b1a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
@@ -42,18 +42,23 @@ public class WindowSplitOperator implements ProcessOperator {
   protected TsBlock inputTsBlock;
   protected boolean canCallNext;
 
-  private final ITimeRangeIterator sampleTimeRangeSliceIterator;
+  private final ITimeRangeIterator sampleTimeRangeIterator;
   private TimeRange curTimeRange;
 
+  private final ITimeRangeIterator sampleTimeRangeSliceIterator;
+  private TimeRange curTimeRangeSlice;
+
   private final TsBlockBuilder resultTsBlockBuilder;
 
   public WindowSplitOperator(
       OperatorContext operatorContext,
       Operator child,
+      ITimeRangeIterator sampleTimeRangeIterator,
       ITimeRangeIterator sampleTimeRangeSliceIterator,
       List<TSDataType> outputDataTypes) {
     this.operatorContext = operatorContext;
     this.child = child;
+    this.sampleTimeRangeIterator = sampleTimeRangeIterator;
     this.sampleTimeRangeSliceIterator = sampleTimeRangeSliceIterator;
     this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
   }
@@ -73,15 +78,27 @@ public class WindowSplitOperator implements ProcessOperator {
     // reset operator state
     canCallNext = true;
 
-    if (curTimeRange == null && sampleTimeRangeSliceIterator.hasNextTimeRange()) {
-      // move to next time window
-      curTimeRange = sampleTimeRangeSliceIterator.nextTimeRange();
+    if (curTimeRange == null && sampleTimeRangeIterator.hasNextTimeRange()) {
+      curTimeRange = sampleTimeRangeIterator.nextTimeRange();
+    }
+
+    while (curTimeRangeSlice == null && sampleTimeRangeSliceIterator.hasNextTimeRange()) {
+      curTimeRangeSlice = sampleTimeRangeSliceIterator.nextTimeRange();
+      if (curTimeRangeSlice.getMin() > curTimeRange.getMax()) {
+        if (sampleTimeRangeIterator.hasNextTimeRange()) {
+          curTimeRange = sampleTimeRangeIterator.nextTimeRange();
+        }
+        if (curTimeRangeSlice.getMin() > curTimeRange.getMax()
+            || curTimeRangeSlice.getMax() < curTimeRange.getMin()) {
+          curTimeRangeSlice = null;
+        }
+      }
     }
 
     if (!fetchData()) {
       return null;
     } else {
-      curTimeRange = null;
+      curTimeRangeSlice = null;
       TsBlock resultTsBlock = resultTsBlockBuilder.build();
       resultTsBlockBuilder.reset();
       return resultTsBlock;
@@ -106,14 +123,14 @@ public class WindowSplitOperator implements ProcessOperator {
       return false;
     }
 
-    inputTsBlock = TsBlockUtil.skipPointsOutOfTimeRange(inputTsBlock, curTimeRange, true);
+    inputTsBlock = TsBlockUtil.skipPointsOutOfTimeRange(inputTsBlock, curTimeRangeSlice, true);
     if (inputTsBlock == null) {
       return false;
     }
 
     for (int readIndex = 0; readIndex < inputTsBlock.getPositionCount(); readIndex++) {
       long time = inputTsBlock.getTimeByIndex(readIndex);
-      if (curTimeRange.contains(time)) {
+      if (curTimeRangeSlice.contains(time)) {
         writeData(readIndex);
       } else {
         inputTsBlock = inputTsBlock.subTsBlock(readIndex);
@@ -135,7 +152,7 @@ public class WindowSplitOperator implements ProcessOperator {
 
   @Override
   public boolean hasNext() {
-    return curTimeRange != null || sampleTimeRangeSliceIterator.hasNextTimeRange();
+    return curTimeRangeSlice != null || sampleTimeRangeSliceIterator.hasNextTimeRange();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 888c3f94a7..d839451544 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -1592,7 +1592,15 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                 WindowSplitOperator.class.getSimpleName());
 
     GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
-    ITimeRangeIterator timeRangeIterator =
+    ITimeRangeIterator sampleTimeRangeIterator =
+        TimeRangeIteratorFactory.getSampleTimeRangeIterator(
+            groupByTimeParameter.getStartTime(),
+            groupByTimeParameter.getEndTime(),
+            groupByTimeParameter.getInterval(),
+            groupByTimeParameter.getSlidingStep(),
+            node.getSamplingIndexes(),
+            false);
+    ITimeRangeIterator sampleTimeRangeSliceIterator =
         TimeRangeIteratorFactory.getSampleTimeRangeIterator(
             groupByTimeParameter.getStartTime(),
             groupByTimeParameter.getEndTime(),
@@ -1604,7 +1612,12 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     List<TSDataType> outputDataTypes = getOutputColumnTypes(node, context.getTypeProvider());
 
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
-    return new WindowSplitOperator(operatorContext, child, timeRangeIterator, outputDataTypes);
+    return new WindowSplitOperator(
+        operatorContext,
+        child,
+        sampleTimeRangeIterator,
+        sampleTimeRangeSliceIterator,
+        outputDataTypes);
   }
 
   @Override


[iotdb] 04/05: add SampleWindowSliceIterator

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9c7d6c7bbddb9a3169017e65978fbc7194cd4773
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Nov 14 14:00:29 2022 +0800

    add SampleWindowSliceIterator
---
 .../timerangeiterator/SampleWindowIterator.java    |   6 +-
 .../SampleWindowSliceIterator.java                 | 129 +++++++++++++++++++++
 .../TimeRangeIteratorFactory.java                  |   7 +-
 .../db/mpp/aggregation/TimeRangeIteratorTest.java  |  71 +++++++++++-
 4 files changed, 207 insertions(+), 6 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java
index 0ba0dd06cf..f95e3a287f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java
@@ -59,7 +59,7 @@ public class SampleWindowIterator implements ITimeRangeIterator {
   public TimeRange nextTimeRange() {
     while (allTimeRangeIterator.hasNextTimeRange()) {
       TimeRange timeRange = allTimeRangeIterator.nextTimeRange();
-      if (timeRangeIndex + 1 == samplingIndexes.get(sampleIndex)) {
+      if (timeRangeIndex == samplingIndexes.get(sampleIndex)) {
         curTimeRange = timeRange;
         timeRangeIndex++;
         sampleIndex++;
@@ -72,7 +72,7 @@ public class SampleWindowIterator implements ITimeRangeIterator {
 
   @Override
   public boolean isAscending() {
-    throw new UnsupportedOperationException();
+    return true;
   }
 
   @Override
@@ -82,6 +82,6 @@ public class SampleWindowIterator implements ITimeRangeIterator {
 
   @Override
   public long getTotalIntervalNum() {
-    throw new UnsupportedOperationException();
+    return samplingIndexes.size();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowSliceIterator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowSliceIterator.java
new file mode 100644
index 0000000000..4c01f63fac
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowSliceIterator.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation.timerangeiterator;
+
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+
+import java.util.List;
+
+public class SampleWindowSliceIterator implements ITimeRangeIterator {
+
+  private static final int HEAP_MAX_SIZE = 100;
+  private final TimeSelector timeBoundaryHeap;
+
+  private final SampleWindowIterator sampleWindowIterator;
+
+  private long curStartTimeForIterator;
+  private long lastEndTime;
+  private TimeRange curTimeRange;
+  private boolean hasCachedTimeRange;
+
+  public SampleWindowSliceIterator(
+      long startTime,
+      long endTime,
+      long interval,
+      long slidingStep,
+      List<Integer> samplingIndexes) {
+    this.timeBoundaryHeap = new TimeSelector(HEAP_MAX_SIZE, true);
+    this.sampleWindowIterator =
+        new SampleWindowIterator(startTime, endTime, interval, slidingStep, samplingIndexes);
+    initHeap();
+  }
+
+  private void initHeap() {
+    TimeRange firstTimeRange = sampleWindowIterator.nextTimeRange();
+    timeBoundaryHeap.add(firstTimeRange.getMin());
+    timeBoundaryHeap.add(firstTimeRange.getMax() + 1);
+    curStartTimeForIterator = firstTimeRange.getMin();
+    tryToExpandHeap();
+  }
+
+  private void tryToExpandHeap() {
+    TimeRange timeRangeToExpand;
+    while (sampleWindowIterator.hasNextTimeRange() && timeBoundaryHeap.size() < HEAP_MAX_SIZE) {
+      timeRangeToExpand = sampleWindowIterator.nextTimeRange();
+      timeBoundaryHeap.add(timeRangeToExpand.getMin());
+      timeBoundaryHeap.add(timeRangeToExpand.getMax() + 1);
+      curStartTimeForIterator = timeRangeToExpand.getMin();
+    }
+  }
+
+  @Override
+  public TimeRange getFirstTimeRange() {
+    long retStartTime = timeBoundaryHeap.pollFirst();
+    lastEndTime = timeBoundaryHeap.first();
+    return new TimeRange(retStartTime, lastEndTime);
+  }
+
+  @Override
+  public boolean hasNextTimeRange() {
+    if (hasCachedTimeRange) {
+      return true;
+    }
+    if (curTimeRange == null) {
+      curTimeRange = getFirstTimeRange();
+      hasCachedTimeRange = true;
+      return true;
+    }
+
+    if (lastEndTime >= curStartTimeForIterator) {
+      tryToExpandHeap();
+    }
+    if (timeBoundaryHeap.isEmpty()) {
+      return false;
+    }
+    long retStartTime = timeBoundaryHeap.pollFirst();
+    if (retStartTime >= curStartTimeForIterator) {
+      tryToExpandHeap();
+    }
+    if (timeBoundaryHeap.isEmpty()) {
+      return false;
+    }
+    lastEndTime = timeBoundaryHeap.first();
+    curTimeRange = new TimeRange(retStartTime, lastEndTime);
+    hasCachedTimeRange = true;
+    return true;
+  }
+
+  @Override
+  public TimeRange nextTimeRange() {
+    if (hasCachedTimeRange || hasNextTimeRange()) {
+      hasCachedTimeRange = false;
+      return getFinalTimeRange(curTimeRange, true);
+    }
+    return null;
+  }
+
+  @Override
+  public boolean isAscending() {
+    return true;
+  }
+
+  @Override
+  public long currentOutputTime() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getTotalIntervalNum() {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java
index 552cca4bb2..bfbc69061b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java
@@ -82,6 +82,11 @@ public class TimeRangeIteratorFactory {
       long slidingStep,
       List<Integer> samplingIndexes,
       boolean outputPartialTimeWindow) {
-    return new SampleWindowIterator(startTime, endTime, interval, slidingStep, samplingIndexes);
+    if (outputPartialTimeWindow && interval > slidingStep) {
+      return new SampleWindowSliceIterator(
+          startTime, endTime, interval, slidingStep, samplingIndexes);
+    } else {
+      return new SampleWindowIterator(startTime, endTime, interval, slidingStep, samplingIndexes);
+    }
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java
index 1d3f3ccf79..b9bd3b3fe8 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Arrays;
+
 public class TimeRangeIteratorTest {
 
   private static final long MS_TO_MONTH = 30 * 86400_000L;
@@ -294,9 +296,69 @@ public class TimeRangeIteratorTest {
         res4);
   }
 
-  private void checkRes(ITimeRangeIterator timeRangeIterator, String[] res) {
-    Assert.assertEquals(res.length, timeRangeIterator.getTotalIntervalNum());
+  @Test
+  public void testSampleTimeRange() {
+    String[] res1 = {"[ 0 : 3 ]", "[ 3 : 6 ]", "[ 6 : 9 ]", "[ 9 : 12 ]"};
+    String[] res2 = {"[ 3 : 6 ]", "[ 12 : 15 ]", "[ 18 : 21 ]", "[ 27 : 30 ]"};
+    String[] res3 = {"[ 3 : 6 ]", "[ 6 : 9 ]", "[ 18 : 21 ]", "[ 21 : 24 ]"};
+
+    long startTime = 0, endTime = 32, interval = 4, slidingStep = 3;
+
+    checkRes(
+        TimeRangeIteratorFactory.getSampleTimeRangeIterator(
+            startTime, endTime, interval, slidingStep, Arrays.asList(0, 1, 2, 3), false),
+        res1);
+    checkRes(
+        TimeRangeIteratorFactory.getSampleTimeRangeIterator(
+            startTime, endTime, interval, slidingStep, Arrays.asList(1, 4, 6, 9), false),
+        res2);
+    checkRes(
+        TimeRangeIteratorFactory.getSampleTimeRangeIterator(
+            startTime, endTime, interval, slidingStep, Arrays.asList(1, 2, 6, 7), false),
+        res3);
+  }
+
+  @Test
+  public void testSampleTimeRangeSlice() {
+    String[] res1 = {
+      "[ 0 : 2 ]", "[ 3 : 3 ]", "[ 4 : 5 ]", "[ 6 : 6 ]", "[ 7 : 8 ]", "[ 9 : 9 ]", "[ 10 : 12 ]"
+    };
+    String[] res2 = {
+      "[ 3 : 6 ]",
+      "[ 7 : 11 ]", // pad
+      "[ 12 : 15 ]",
+      "[ 16 : 17 ]", // pad
+      "[ 18 : 21 ]",
+      "[ 22 : 26 ]", // pad
+      "[ 27 : 30 ]"
+    };
+    String[] res3 = {
+      "[ 3 : 5 ]",
+      "[ 6 : 6 ]",
+      "[ 7 : 9 ]",
+      "[ 10 : 17 ]", // pad
+      "[ 18 : 20 ]",
+      "[ 21 : 21 ]",
+      "[ 22 : 24 ]"
+    };
+
+    long startTime = 0, endTime = 32, interval = 4, slidingStep = 3;
 
+    checkResWithoutNum(
+        TimeRangeIteratorFactory.getSampleTimeRangeIterator(
+            startTime, endTime, interval, slidingStep, Arrays.asList(0, 1, 2, 3), true),
+        res1);
+    checkResWithoutNum(
+        TimeRangeIteratorFactory.getSampleTimeRangeIterator(
+            startTime, endTime, interval, slidingStep, Arrays.asList(1, 4, 6, 9), true),
+        res2);
+    checkResWithoutNum(
+        TimeRangeIteratorFactory.getSampleTimeRangeIterator(
+            startTime, endTime, interval, slidingStep, Arrays.asList(1, 2, 6, 7), true),
+        res3);
+  }
+
+  private void checkResWithoutNum(ITimeRangeIterator timeRangeIterator, String[] res) {
     boolean isAscending = timeRangeIterator.isAscending();
     int cnt = isAscending ? 0 : res.length - 1;
 
@@ -307,4 +369,9 @@ public class TimeRangeIteratorTest {
       cnt += isAscending ? 1 : -1;
     }
   }
+
+  private void checkRes(ITimeRangeIterator timeRangeIterator, String[] res) {
+    Assert.assertEquals(res.length, timeRangeIterator.getTotalIntervalNum());
+    checkResWithoutNum(timeRangeIterator, res);
+  }
 }


[iotdb] 01/05: change interface name

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 38cc84b784d2e458091d7e0798be3aff82d42fe6
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Nov 14 10:07:34 2022 +0800

    change interface name
---
 .../main/java/org/apache/iotdb/SessionExample.java |  5 ++--
 .../iotdb/session/it/IoTDBFetchWindowSetIT.java    |  2 +-
 .../db/mpp/plan/parser/StatementGenerator.java     |  4 ++--
 .../service/thrift/impl/ClientRPCServiceImpl.java  | 27 ++++++++++------------
 .../db/service/thrift/impl/TSServiceImpl.java      |  6 ++---
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   |  2 +-
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   | 10 ++++----
 .../java/org/apache/iotdb/session/ISession.java    |  2 +-
 .../java/org/apache/iotdb/session/Session.java     |  4 ++--
 .../apache/iotdb/session/SessionConnection.java    | 24 +++++++++++--------
 thrift/src/main/thrift/client.thrift               |  8 +++----
 11 files changed, 48 insertions(+), 46 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index fbe0c63c8a..8760400c91 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -74,8 +74,9 @@ public class SessionExample {
 
     List<String> queryPaths = Arrays.asList("root.sg1.d1.s1", "root.sg1.d2.s1");
     List<Integer> indexes = Arrays.asList(1, 3, 5, 7);
-    List<SessionDataSet> windowSet = session.fetchWindowSet(queryPaths, null, 1, 40, 2, 2, indexes);
-    for (SessionDataSet window : windowSet) {
+    List<SessionDataSet> windowBatch =
+        session.fetchWindowBatch(queryPaths, null, 1, 40, 2, 2, indexes);
+    for (SessionDataSet window : windowBatch) {
       System.out.println(window.getColumnNames());
       while (window.hasNext()) {
         System.out.println(window.next());
diff --git a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBFetchWindowSetIT.java b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBFetchWindowSetIT.java
index 1e31bd5c83..5c56924b03 100644
--- a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBFetchWindowSetIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBFetchWindowSetIT.java
@@ -172,7 +172,7 @@ public class IoTDBFetchWindowSetIT {
       List<String> queryPaths = Arrays.asList("root.sg1.d1.s1", "root.sg1.d2.s1");
       List<Integer> indexes = Arrays.asList(1, 3, 5, 7);
       List<SessionDataSet> windowSet =
-          session.fetchWindowSet(queryPaths, null, 0, 20, 2, 2, indexes);
+          session.fetchWindowBatch(queryPaths, null, 0, 20, 2, 2, indexes);
       Assert.assertEquals(indexes.size(), windowSet.size());
       for (SessionDataSet window : windowSet) {
         while (window.hasNext()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index dadd1b3466..d0b5f5167b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -70,7 +70,7 @@ import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
 import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetReq;
+import org.apache.iotdb.service.rpc.thrift.TSFetchWindowBatchReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
@@ -179,7 +179,7 @@ public class StatementGenerator {
     return lastQueryStatement;
   }
 
-  public static Statement createStatement(TSFetchWindowSetReq fetchWindowSetReq, ZoneId zoneId)
+  public static Statement createStatement(TSFetchWindowBatchReq fetchWindowSetReq)
       throws IllegalPathException {
     FetchWindowSetStatement statement = new FetchWindowSetStatement();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 1d7d875999..61363a1d6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -88,8 +88,8 @@ import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
 import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
 import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
 import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetResp;
+import org.apache.iotdb.service.rpc.thrift.TSFetchWindowBatchReq;
+import org.apache.iotdb.service.rpc.thrift.TSFetchWindowBatchResp;
 import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
@@ -132,7 +132,6 @@ import static org.apache.iotdb.db.service.basic.ServiceProvider.AUDIT_LOGGER;
 import static org.apache.iotdb.db.service.basic.ServiceProvider.CONFIG;
 import static org.apache.iotdb.db.service.basic.ServiceProvider.CURRENT_RPC_VERSION;
 import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_FREQUENCY_RECORDER;
-import static org.apache.iotdb.db.service.basic.ServiceProvider.SESSION_MANAGER;
 import static org.apache.iotdb.db.service.basic.ServiceProvider.SLOW_SQL_LOGGER;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
@@ -421,19 +420,18 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
   }
 
   @Override
-  public TSFetchWindowSetResp fetchWindowSet(TSFetchWindowSetReq req) throws TException {
+  public TSFetchWindowBatchResp fetchWindowBatch(TSFetchWindowBatchReq req) throws TException {
     if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
-      return RpcUtils.getTSFetchWindowSetResp(getNotLoggedInStatus());
+      return RpcUtils.getTSFetchWindowBatchResp(getNotLoggedInStatus());
     }
     long startTime = System.currentTimeMillis();
     try {
-      Statement s =
-          StatementGenerator.createStatement(req, SESSION_MANAGER.getCurrSession().getZoneId());
+      Statement s = StatementGenerator.createStatement(req);
 
       // permission check
       TSStatus status = AuthorityChecker.checkAuthority(s, SESSION_MANAGER.getCurrSession());
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        return RpcUtils.getTSFetchWindowSetResp(status);
+        return RpcUtils.getTSFetchWindowBatchResp(status);
       }
 
       QUERY_FREQUENCY_RECORDER.incrementAndGet();
@@ -458,14 +456,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
 
       try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
-        TSFetchWindowSetResp resp =
-            createTSFetchWindowSetResp(queryExecution.getDatasetHeader(), queryId);
-        resp.setQueryResultList(QueryDataSetUtils.convertTsBlocksToWindowSet(queryExecution));
+        TSFetchWindowBatchResp resp =
+            createTSFetchWindowBatchResp(queryExecution.getDatasetHeader());
+        resp.setWindowBatch(QueryDataSetUtils.convertTsBlocksToWindowBatch(queryExecution));
         return resp;
       }
     } catch (Exception e) {
       // TODO call the coordinator to release query resource
-      return RpcUtils.getTSFetchWindowSetResp(
+      return RpcUtils.getTSFetchWindowBatchResp(
           onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_RAW_DATA_QUERY));
     } finally {
       addOperationLatency(Operation.EXECUTE_QUERY, startTime);
@@ -1798,12 +1796,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
     return resp;
   }
 
-  private TSFetchWindowSetResp createTSFetchWindowSetResp(DatasetHeader header, long queryId) {
-    TSFetchWindowSetResp resp = RpcUtils.getTSFetchWindowSetResp(TSStatusCode.SUCCESS_STATUS);
+  private TSFetchWindowBatchResp createTSFetchWindowBatchResp(DatasetHeader header) {
+    TSFetchWindowBatchResp resp = RpcUtils.getTSFetchWindowBatchResp(TSStatusCode.SUCCESS_STATUS);
     resp.setColumnNameList(header.getRespColumns());
     resp.setColumnTypeList(header.getRespDataTypeList());
     resp.setColumnNameIndexMap(header.getColumnNameIndexMap());
-    resp.setQueryId(queryId);
     return resp;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 9dd837b131..7470898fac 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -103,8 +103,8 @@ import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
 import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
 import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
 import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetResp;
+import org.apache.iotdb.service.rpc.thrift.TSFetchWindowBatchReq;
+import org.apache.iotdb.service.rpc.thrift.TSFetchWindowBatchResp;
 import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
@@ -279,7 +279,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
   }
 
   @Override
-  public TSFetchWindowSetResp fetchWindowSet(TSFetchWindowSetReq req) throws TException {
+  public TSFetchWindowBatchResp fetchWindowBatch(TSFetchWindowBatchReq req) throws TException {
     return null;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 9ac3cbd404..d053e4b953 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -400,7 +400,7 @@ public class QueryDataSetUtils {
     return res;
   }
 
-  public static List<List<ByteBuffer>> convertTsBlocksToWindowSet(IQueryExecution queryExecution)
+  public static List<List<ByteBuffer>> convertTsBlocksToWindowBatch(IQueryExecution queryExecution)
       throws IoTDBException {
     List<List<ByteBuffer>> windowSet = new ArrayList<>();
 
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index cbedcf75e2..01668c4e24 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxTSStatus;
 import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
 import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetResp;
+import org.apache.iotdb.service.rpc.thrift.TSFetchWindowBatchResp;
 
 import java.lang.reflect.Proxy;
 import java.text.SimpleDateFormat;
@@ -226,16 +226,16 @@ public class RpcUtils {
     return resp;
   }
 
-  public static TSFetchWindowSetResp getTSFetchWindowSetResp(TSStatus status) {
-    TSFetchWindowSetResp resp = new TSFetchWindowSetResp();
+  public static TSFetchWindowBatchResp getTSFetchWindowBatchResp(TSStatus status) {
+    TSFetchWindowBatchResp resp = new TSFetchWindowBatchResp();
     TSStatus tsStatus = new TSStatus(status);
     resp.setStatus(tsStatus);
     return resp;
   }
 
-  public static TSFetchWindowSetResp getTSFetchWindowSetResp(TSStatusCode tsStatusCode) {
+  public static TSFetchWindowBatchResp getTSFetchWindowBatchResp(TSStatusCode tsStatusCode) {
     TSStatus status = getStatus(tsStatusCode);
-    return getTSFetchWindowSetResp(status);
+    return getTSFetchWindowBatchResp(status);
   }
 
   public static final String DEFAULT_TIME_FORMAT = "default";
diff --git a/session/src/main/java/org/apache/iotdb/session/ISession.java b/session/src/main/java/org/apache/iotdb/session/ISession.java
index 1b140e0d9f..4e8b42a864 100644
--- a/session/src/main/java/org/apache/iotdb/session/ISession.java
+++ b/session/src/main/java/org/apache/iotdb/session/ISession.java
@@ -441,7 +441,7 @@ public interface ISession extends AutoCloseable {
 
   TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException;
 
-  List<SessionDataSet> fetchWindowSet(
+  List<SessionDataSet> fetchWindowBatch(
       List<String> queryPaths,
       String functionName,
       long startTime,
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 68e35b367b..5523f9ea73 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -3264,7 +3264,7 @@ public class Session implements ISession {
   }
 
   @Override
-  public List<SessionDataSet> fetchWindowSet(
+  public List<SessionDataSet> fetchWindowBatch(
       List<String> queryPaths,
       String functionName,
       long startTime,
@@ -3273,7 +3273,7 @@ public class Session implements ISession {
       long slidingStep,
       List<Integer> indexes)
       throws StatementExecutionException {
-    return defaultSessionConnection.fetchWindowSet(
+    return defaultSessionConnection.fetchWindowBatch(
         queryPaths, functionName, startTime, endTime, interval, slidingStep, indexes);
   }
 
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 6966b89ae8..1cfa1666c3 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -39,8 +39,8 @@ import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
 import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetResp;
+import org.apache.iotdb.service.rpc.thrift.TSFetchWindowBatchReq;
+import org.apache.iotdb.service.rpc.thrift.TSFetchWindowBatchResp;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
@@ -485,7 +485,7 @@ public class SessionConnection {
         tsExecuteStatementResp.isIgnoreTimeStamp());
   }
 
-  public List<SessionDataSet> fetchWindowSet(
+  public List<SessionDataSet> fetchWindowBatch(
       List<String> queryPaths,
       String functionName,
       long startTime,
@@ -494,31 +494,35 @@ public class SessionConnection {
       long slidingStep,
       List<Integer> indexes)
       throws StatementExecutionException {
-    TSFetchWindowSetReq req =
-        new TSFetchWindowSetReq(
+    TSFetchWindowBatchReq req =
+        new TSFetchWindowBatchReq(
             sessionId,
             statementId,
             queryPaths,
             new TGroupByTimeParameter(startTime, endTime, interval, slidingStep, indexes));
-    TSFetchWindowSetResp resp;
+    if (functionName != null) {
+      req.setFunctionName(functionName);
+    }
+
+    TSFetchWindowBatchResp resp;
     try {
-      resp = client.fetchWindowSet(req);
+      resp = client.fetchWindowBatch(req);
       RpcUtils.verifySuccess(resp.getStatus());
     } catch (TException e) {
       throw new StatementExecutionException("");
     }
 
     List<SessionDataSet> windowSet = new ArrayList<>();
-    for (List<ByteBuffer> queryResult : resp.getQueryResultList()) {
+    for (List<ByteBuffer> window : resp.getWindowBatch()) {
       SessionDataSet sessionDataSet =
           new SessionDataSet(
               resp.columnNameList,
               resp.columnTypeList,
               resp.columnNameIndexMap,
-              resp.queryId,
+              -1,
               statementId,
               sessionId,
-              queryResult);
+              window);
       windowSet.add(sessionDataSet);
     }
     return windowSet;
diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift
index 9474d0645b..6d5f6cfc11 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -421,7 +421,7 @@ struct TGroupByTimeParameter {
   5: required list<i32> indexes
 }
 
-struct TSFetchWindowSetReq {
+struct TSFetchWindowBatchReq {
   1: required i64 sessionId
   2: required i64 statementId
   3: required list<string> queryPaths
@@ -429,12 +429,12 @@ struct TSFetchWindowSetReq {
   5: required TGroupByTimeParameter groupByTimeParameter
 }
 
-struct TSFetchWindowSetResp {
+struct TSFetchWindowBatchResp {
   1: required common.TSStatus status
   2: required list<string> columnNameList
   3: required list<string> columnTypeList
   4: required map<string, i32> columnNameIndexMap
-  5: required list<TSQueryDataSet> windowSet
+  5: required list<list<binary>> windowBatch
 }
 
 // The sender and receiver need to check some info to confirm validity
@@ -585,5 +585,5 @@ service IClientRPCService {
 
   TSConnectionInfoResp fetchAllConnectionsInfo();
 
-  TSFetchWindowSetResp fetchWindowSet(1:TSFetchWindowSetReq req);
+  TSFetchWindowBatchResp fetchWindowBatch(1:TSFetchWindowBatchReq req);
 }
\ No newline at end of file