You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/20 13:51:22 UTC
[iotdb] branch ml/windowSet updated: fix bug
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ml/windowSet by this push:
new 0ffeb6c953 fix bug
0ffeb6c953 is described below
commit 0ffeb6c953fb9add30e1e63da6eced8bf6fd5238
Author: liuminghui233 <54...@qq.com>
AuthorDate: Sun Nov 20 21:47:36 2022 +0800
fix bug
---
client-py/SessionExample.py | 12 ++++++++---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../process/window/WindowConcatOperator.java | 3 ++-
.../operator/process/window/WindowSliceQueue.java | 4 ++++
.../process/window/WindowSplitOperator.java | 25 ++++++++++++++++++----
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 4 +++-
.../db/mpp/plan/planner/LogicalPlanVisitor.java | 3 ++-
7 files changed, 42 insertions(+), 11 deletions(-)
diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py
index b202bf2278..c7923b8e03 100644
--- a/client-py/SessionExample.py
+++ b/client-py/SessionExample.py
@@ -39,7 +39,13 @@ session.open(False)
ts_path_list = [
"root.eg.exchange.s0",
- "root.eg.exchange.s1"
+ "root.eg.exchange.s1",
+ "root.eg.exchange.s2",
+ "root.eg.exchange.s3",
+ "root.eg.exchange.s4",
+ "root.eg.exchange.s5",
+ "root.eg.exchange.s6",
+ "root.eg.exchange.s7"
]
fetch_args = {
@@ -47,8 +53,8 @@ fetch_args = {
"end_time": 1286640000000,
"interval": 86400000 * 96,
"sliding_step": 86400000,
- "indexes": [666, 555, 222]
- # "indexes": random.sample([i for i in range(0, 7501)], 100)
+ # "indexes": [222, 555, 666]
+ "indexes": random.sample([i for i in range(0, 7492)], 100)
}
result = session.fetch_window_batch(ts_path_list, None, fetch_args)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 67e17d1a80..39fa827014 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -547,7 +547,7 @@ public class IoTDBConfig {
private long cacheFileReaderClearPeriod = 100000;
/** the max executing time of query in ms. Unit: millisecond */
- private long queryTimeoutThreshold = 60000;
+ private long queryTimeoutThreshold = 600000000;
/** the max time to live of a session in ms. Unit: millisecond */
private int sessionTimeoutThreshold = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowConcatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowConcatOperator.java
index cda4a23a0f..febb615db8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowConcatOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowConcatOperator.java
@@ -91,7 +91,8 @@ public class WindowConcatOperator implements ProcessOperator {
@Override
public boolean hasNext() {
- return curTimeRange != null || sampleTimeRangeIterator.hasNextTimeRange();
+ return (!windowSliceQueue.isEmpty() || child.hasNext())
+ && (curTimeRange != null || sampleTimeRangeIterator.hasNextTimeRange());
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSliceQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSliceQueue.java
index 59531d2ced..52f4ebd895 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSliceQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSliceQueue.java
@@ -77,4 +77,8 @@ public class WindowSliceQueue {
}
return windowBuilder.build();
}
+
+ public boolean isEmpty() {
+ return deque.isEmpty();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSplitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSplitOperator.java
index 039822429d..052976f334 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSplitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSplitOperator.java
@@ -32,11 +32,15 @@ import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
public class WindowSplitOperator implements ProcessOperator {
+ private static final Logger LOGGER = LoggerFactory.getLogger(WindowSplitOperator.class);
+
protected final OperatorContext operatorContext;
protected final Operator child;
@@ -96,7 +100,13 @@ public class WindowSplitOperator implements ProcessOperator {
}
}
- if (!fetchData()) {
+ LOGGER.info(
+ "curTimeRange: {}, curTimeRangeSlice: {}, sliceLen: {}",
+ curTimeRange,
+ curTimeRangeSlice,
+ (curTimeRangeSlice.getMax() - curTimeRangeSlice.getMin()) / 86400000);
+
+ if (!fetchData() && child.hasNext()) {
return null;
} else {
curTimeRangeSlice = null;
@@ -120,12 +130,17 @@ public class WindowSplitOperator implements ProcessOperator {
}
private boolean consumeInput() {
- if (inputTsBlock == null) {
+ if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+ return false;
+ }
+
+ if (inputTsBlock.getEndTime() < curTimeRangeSlice.getMin()) {
+ inputTsBlock = null;
return false;
}
inputTsBlock = TsBlockUtil.skipPointsOutOfTimeRange(inputTsBlock, curTimeRangeSlice, true);
- if (inputTsBlock == null) {
+ if (inputTsBlock == null || inputTsBlock.isEmpty()) {
return false;
}
@@ -138,6 +153,7 @@ public class WindowSplitOperator implements ProcessOperator {
return true;
}
}
+ inputTsBlock = null;
return false;
}
@@ -153,7 +169,8 @@ public class WindowSplitOperator implements ProcessOperator {
@Override
public boolean hasNext() {
- return curTimeRangeSlice != null || sampleTimeRangeSliceIterator.hasNextTimeRange();
+ return (inputTsBlock != null || child.hasNext())
+ && (curTimeRangeSlice != null || sampleTimeRangeSliceIterator.hasNextTimeRange());
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index c44ea90041..cf5078f7c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -1314,7 +1314,9 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
long minTime =
groupByTimeParameter.getStartTime() + minIndex * groupByTimeParameter.getSlidingStep();
long maxTime =
- groupByTimeParameter.getStartTime() + maxIndex * groupByTimeParameter.getSlidingStep();
+ groupByTimeParameter.getStartTime()
+ + maxIndex * groupByTimeParameter.getSlidingStep()
+ + groupByTimeParameter.getInterval();
analysis.setGlobalTimeFilter(
FilterFactory.and(TimeFilter.gtEq(minTime), TimeFilter.ltEq(maxTime)));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index 14496b69cf..52527b0b7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -300,7 +300,8 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
.sorted()
.collect(Collectors.toList());
planBuilder
- .planRawDataSource(analysis.getSourceExpressions(), Ordering.ASC, null)
+ .planRawDataSource(
+ analysis.getSourceExpressions(), Ordering.ASC, analysis.getGlobalTimeFilter())
.planTransform(
analysis.getSourceTransformExpressions(), true, ZoneId.systemDefault(), Ordering.ASC)
.planWindowSplit(fetchWindowBatchStatement.getGroupByTimeParameter(), sortedSamplingIndexes)