You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/20 13:51:22 UTC

[iotdb] branch ml/windowSet updated: fix bug

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ml/windowSet by this push:
     new 0ffeb6c953 fix bug
0ffeb6c953 is described below

commit 0ffeb6c953fb9add30e1e63da6eced8bf6fd5238
Author: liuminghui233 <54...@qq.com>
AuthorDate: Sun Nov 20 21:47:36 2022 +0800

    fix bug
---
 client-py/SessionExample.py                        | 12 ++++++++---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../process/window/WindowConcatOperator.java       |  3 ++-
 .../operator/process/window/WindowSliceQueue.java  |  4 ++++
 .../process/window/WindowSplitOperator.java        | 25 ++++++++++++++++++----
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  4 +++-
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    |  3 ++-
 7 files changed, 42 insertions(+), 11 deletions(-)

diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py
index b202bf2278..c7923b8e03 100644
--- a/client-py/SessionExample.py
+++ b/client-py/SessionExample.py
@@ -39,7 +39,13 @@ session.open(False)
 
 ts_path_list = [
     "root.eg.exchange.s0",
-    "root.eg.exchange.s1"
+    "root.eg.exchange.s1",
+    "root.eg.exchange.s2",
+    "root.eg.exchange.s3",
+    "root.eg.exchange.s4",
+    "root.eg.exchange.s5",
+    "root.eg.exchange.s6",
+    "root.eg.exchange.s7"
 ]
 
 fetch_args = {
@@ -47,8 +53,8 @@ fetch_args = {
     "end_time": 1286640000000,
     "interval": 86400000 * 96,
     "sliding_step": 86400000,
-    "indexes": [666, 555, 222]
-    # "indexes": random.sample([i for i in range(0, 7501)], 100)
+    # "indexes": [222, 555, 666]
+    "indexes": random.sample([i for i in range(0, 7492)], 100)
 }
 
 result = session.fetch_window_batch(ts_path_list, None, fetch_args)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 67e17d1a80..39fa827014 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -547,7 +547,7 @@ public class IoTDBConfig {
   private long cacheFileReaderClearPeriod = 100000;
 
   /** the max executing time of query in ms. Unit: millisecond */
-  private long queryTimeoutThreshold = 60000;
+  private long queryTimeoutThreshold = 600000000;
 
   /** the max time to live of a session in ms. Unit: millisecond */
   private int sessionTimeoutThreshold = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowConcatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowConcatOperator.java
index cda4a23a0f..febb615db8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowConcatOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowConcatOperator.java
@@ -91,7 +91,8 @@ public class WindowConcatOperator implements ProcessOperator {
 
   @Override
   public boolean hasNext() {
-    return curTimeRange != null || sampleTimeRangeIterator.hasNextTimeRange();
+    return (!windowSliceQueue.isEmpty() || child.hasNext())
+        && (curTimeRange != null || sampleTimeRangeIterator.hasNextTimeRange());
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSliceQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSliceQueue.java
index 59531d2ced..52f4ebd895 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSliceQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSliceQueue.java
@@ -77,4 +77,8 @@ public class WindowSliceQueue {
     }
     return windowBuilder.build();
   }
+
+  public boolean isEmpty() {
+    return deque.isEmpty();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSplitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSplitOperator.java
index 039822429d..052976f334 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSplitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSplitOperator.java
@@ -32,11 +32,15 @@ import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 
 public class WindowSplitOperator implements ProcessOperator {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(WindowSplitOperator.class);
+
   protected final OperatorContext operatorContext;
 
   protected final Operator child;
@@ -96,7 +100,13 @@ public class WindowSplitOperator implements ProcessOperator {
       }
     }
 
-    if (!fetchData()) {
+    LOGGER.info(
+        "curTimeRange: {}, curTimeRangeSlice: {}, sliceLen: {}",
+        curTimeRange,
+        curTimeRangeSlice,
+        (curTimeRangeSlice.getMax() - curTimeRangeSlice.getMin()) / 86400000);
+
+    if (!fetchData() && child.hasNext()) {
       return null;
     } else {
       curTimeRangeSlice = null;
@@ -120,12 +130,17 @@ public class WindowSplitOperator implements ProcessOperator {
   }
 
   private boolean consumeInput() {
-    if (inputTsBlock == null) {
+    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+      return false;
+    }
+
+    if (inputTsBlock.getEndTime() < curTimeRangeSlice.getMin()) {
+      inputTsBlock = null;
       return false;
     }
 
     inputTsBlock = TsBlockUtil.skipPointsOutOfTimeRange(inputTsBlock, curTimeRangeSlice, true);
-    if (inputTsBlock == null) {
+    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
       return false;
     }
 
@@ -138,6 +153,7 @@ public class WindowSplitOperator implements ProcessOperator {
         return true;
       }
     }
+    inputTsBlock = null;
     return false;
   }
 
@@ -153,7 +169,8 @@ public class WindowSplitOperator implements ProcessOperator {
 
   @Override
   public boolean hasNext() {
-    return curTimeRangeSlice != null || sampleTimeRangeSliceIterator.hasNextTimeRange();
+    return (inputTsBlock != null || child.hasNext())
+        && (curTimeRangeSlice != null || sampleTimeRangeSliceIterator.hasNextTimeRange());
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index c44ea90041..cf5078f7c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -1314,7 +1314,9 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     long minTime =
         groupByTimeParameter.getStartTime() + minIndex * groupByTimeParameter.getSlidingStep();
     long maxTime =
-        groupByTimeParameter.getStartTime() + maxIndex * groupByTimeParameter.getSlidingStep();
+        groupByTimeParameter.getStartTime()
+            + maxIndex * groupByTimeParameter.getSlidingStep()
+            + groupByTimeParameter.getInterval();
     analysis.setGlobalTimeFilter(
         FilterFactory.and(TimeFilter.gtEq(minTime), TimeFilter.ltEq(maxTime)));
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index 14496b69cf..52527b0b7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -300,7 +300,8 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
             .sorted()
             .collect(Collectors.toList());
     planBuilder
-        .planRawDataSource(analysis.getSourceExpressions(), Ordering.ASC, null)
+        .planRawDataSource(
+            analysis.getSourceExpressions(), Ordering.ASC, analysis.getGlobalTimeFilter())
         .planTransform(
             analysis.getSourceTransformExpressions(), true, ZoneId.systemDefault(), Ordering.ASC)
         .planWindowSplit(fetchWindowBatchStatement.getGroupByTimeParameter(), sortedSamplingIndexes)