You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/15 15:21:59 UTC
[iotdb] branch ml/windowSet updated: add time filter
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ml/windowSet by this push:
new 19e4e770df add time filter
19e4e770df is described below
commit 19e4e770df57fafb5e0938af0985ba01f84d0b29
Author: liuminghui233 <54...@qq.com>
AuthorDate: Tue Nov 15 23:19:48 2022 +0800
add time filter
---
client-py/SessionExample.py | 20 +++++++-----
client-py/iotdb/Session.py | 1 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 4 +--
.../process/window/WindowConcatOperator.java | 2 +-
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 37 ++++++++++++++++++++++
5 files changed, 53 insertions(+), 11 deletions(-)
diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py
index fa65a8d614..b202bf2278 100644
--- a/client-py/SessionExample.py
+++ b/client-py/SessionExample.py
@@ -17,6 +17,8 @@
#
# Uncomment the following line to use apache-iotdb module installed by pip3
+import random
+
import numpy as np
from iotdb.Session import Session
@@ -36,19 +38,21 @@ session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC
session.open(False)
ts_path_list = [
- "root.sg1.d1.s1",
- "root.sg1.d1.s2"
+ "root.eg.exchange.s0",
+ "root.eg.exchange.s1"
]
fetch_args = {
- "start_time": 0,
- "end_time": 32,
- "interval": 4,
- "sliding_step": 1,
- "indexes": [9, 0, 5, 3]
+ "start_time": 631123200000,
+ "end_time": 1286640000000,
+ "interval": 86400000 * 96,
+ "sliding_step": 86400000,
+ "indexes": [666, 555, 222]
+ # "indexes": random.sample([i for i in range(0, 7501)], 100)
}
-print(session.fetch_window_batch(ts_path_list, "sin", fetch_args))
+result = session.fetch_window_batch(ts_path_list, None, fetch_args)
+print(result.shape)
# # set and delete storage groups
# session.set_storage_group("root.sg_test_01")
diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py
index 64be51e9ac..3633ccfd85 100644
--- a/client-py/iotdb/Session.py
+++ b/client-py/iotdb/Session.py
@@ -1483,6 +1483,7 @@ class Session(object):
)
window_df = window_session_data_set.to_df(window_session_data_set)
+ print(window_df)
window_batch.append(window_df)
return np.array(window_batch)
else:
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 60bc735da1..67e17d1a80 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -749,10 +749,10 @@ public class IoTDBConfig {
private boolean enablePartition = true;
/** Time partition interval for storage in milliseconds */
- private long timePartitionIntervalForStorage = 604_800_000;
+ private long timePartitionIntervalForStorage = 31536000000L;
/** Time partition interval for routing in milliseconds */
- private long timePartitionIntervalForRouting = 604_800_000;
+ private long timePartitionIntervalForRouting = 31536000000L;
/**
* Level of TimeIndex, which records the start time and end time of TsFileResource. Currently,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowConcatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowConcatOperator.java
index 4dc9f14f2d..cda4a23a0f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowConcatOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowConcatOperator.java
@@ -64,7 +64,7 @@ public class WindowConcatOperator implements ProcessOperator {
}
TsBlock inputTsBlock = child.next();
- if (inputTsBlock == null) {
+ if (inputTsBlock == null || inputTsBlock.isEmpty()) {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index a1811979b5..c44ea90041 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -39,6 +39,8 @@ import org.apache.iotdb.db.exception.sql.MeasurementNotExistException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.TimeRangeIteratorFactory;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
@@ -120,6 +122,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
import org.apache.iotdb.tsfile.read.filter.GroupByMonthFilter;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -1227,6 +1230,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
ISchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree);
logger.info("[EndFetchSchema]");
+ analyzeGlobalTimeFilter(analysis, fetchWindowBatchStatement);
+
// set source
List<MeasurementPath> measurementPaths = schemaTree.getAllMeasurement();
Set<Expression> sourceExpressions =
@@ -1282,6 +1287,38 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
return analysis;
}
+ private void analyzeGlobalTimeFilter(Analysis analysis, FetchWindowBatchStatement statement) {
+ GroupByTimeParameter groupByTimeParameter = statement.getGroupByTimeParameter();
+ ITimeRangeIterator iterator =
+ TimeRangeIteratorFactory.getTimeRangeIterator(
+ groupByTimeParameter.getStartTime(),
+ groupByTimeParameter.getEndTime(),
+ groupByTimeParameter.getInterval(),
+ groupByTimeParameter.getSlidingStep(),
+ true,
+ false,
+ false,
+ true,
+ false);
+ long totalNum = iterator.getTotalIntervalNum();
+
+ long minIndex = Long.MAX_VALUE, maxIndex = -1;
+ for (Integer index : statement.getSamplingIndexes()) {
+ minIndex = Math.min(minIndex, index);
+ maxIndex = Math.max(maxIndex, index);
+ if (index < 0 || index >= totalNum) {
+ throw new SemanticException("index should be bulabula...");
+ }
+ }
+
+ long minTime =
+ groupByTimeParameter.getStartTime() + minIndex * groupByTimeParameter.getSlidingStep();
+ long maxTime =
+ groupByTimeParameter.getStartTime() + maxIndex * groupByTimeParameter.getSlidingStep();
+ analysis.setGlobalTimeFilter(
+ FilterFactory.and(TimeFilter.gtEq(minTime), TimeFilter.ltEq(maxTime)));
+ }
+
@Override
public Analysis visitInsert(InsertStatement insertStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);