You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/15 15:21:59 UTC

[iotdb] branch ml/windowSet updated: add time filter

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ml/windowSet by this push:
     new 19e4e770df add time filter
19e4e770df is described below

commit 19e4e770df57fafb5e0938af0985ba01f84d0b29
Author: liuminghui233 <54...@qq.com>
AuthorDate: Tue Nov 15 23:19:48 2022 +0800

    add time filter
---
 client-py/SessionExample.py                        | 20 +++++++-----
 client-py/iotdb/Session.py                         |  1 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  4 +--
 .../process/window/WindowConcatOperator.java       |  2 +-
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  | 37 ++++++++++++++++++++++
 5 files changed, 53 insertions(+), 11 deletions(-)

diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py
index fa65a8d614..b202bf2278 100644
--- a/client-py/SessionExample.py
+++ b/client-py/SessionExample.py
@@ -17,6 +17,8 @@
 #
 
 # Uncomment the following line to use apache-iotdb module installed by pip3
+import random
+
 import numpy as np
 
 from iotdb.Session import Session
@@ -36,19 +38,21 @@ session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC
 session.open(False)
 
 ts_path_list = [
-    "root.sg1.d1.s1",
-    "root.sg1.d1.s2"
+    "root.eg.exchange.s0",
+    "root.eg.exchange.s1"
 ]
 
 fetch_args = {
-    "start_time": 0,
-    "end_time": 32,
-    "interval": 4,
-    "sliding_step": 1,
-    "indexes": [9, 0, 5, 3]
+    "start_time": 631123200000,
+    "end_time": 1286640000000,
+    "interval": 86400000 * 96,
+    "sliding_step": 86400000,
+    "indexes": [666, 555, 222]
+    # "indexes": random.sample([i for i in range(0, 7501)], 100)
 }
 
-print(session.fetch_window_batch(ts_path_list, "sin", fetch_args))
+result = session.fetch_window_batch(ts_path_list, None, fetch_args)
+print(result.shape)
 
 # # set and delete storage groups
 # session.set_storage_group("root.sg_test_01")
diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py
index 64be51e9ac..3633ccfd85 100644
--- a/client-py/iotdb/Session.py
+++ b/client-py/iotdb/Session.py
@@ -1483,6 +1483,7 @@ class Session(object):
                     )
 
                     window_df = window_session_data_set.to_df(window_session_data_set)
+                    print(window_df)
                     window_batch.append(window_df)
                 return np.array(window_batch)
             else:
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 60bc735da1..67e17d1a80 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -749,10 +749,10 @@ public class IoTDBConfig {
   private boolean enablePartition = true;
 
   /** Time partition interval for storage in milliseconds */
-  private long timePartitionIntervalForStorage = 604_800_000;
+  private long timePartitionIntervalForStorage = 31536000000L;
 
   /** Time partition interval for routing in milliseconds */
-  private long timePartitionIntervalForRouting = 604_800_000;
+  private long timePartitionIntervalForRouting = 31536000000L;
 
   /**
    * Level of TimeIndex, which records the start time and end time of TsFileResource. Currently,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowConcatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowConcatOperator.java
index 4dc9f14f2d..cda4a23a0f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowConcatOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowConcatOperator.java
@@ -64,7 +64,7 @@ public class WindowConcatOperator implements ProcessOperator {
     }
 
     TsBlock inputTsBlock = child.next();
-    if (inputTsBlock == null) {
+    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
       return null;
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index a1811979b5..c44ea90041 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -39,6 +39,8 @@ import org.apache.iotdb.db.exception.sql.MeasurementNotExistException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.TimeRangeIteratorFactory;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
@@ -120,6 +122,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
 import org.apache.iotdb.tsfile.read.filter.GroupByMonthFilter;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -1227,6 +1230,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     ISchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree);
     logger.info("[EndFetchSchema]");
 
+    analyzeGlobalTimeFilter(analysis, fetchWindowBatchStatement);
+
     // set source
     List<MeasurementPath> measurementPaths = schemaTree.getAllMeasurement();
     Set<Expression> sourceExpressions =
@@ -1282,6 +1287,38 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     return analysis;
   }
 
+  private void analyzeGlobalTimeFilter(Analysis analysis, FetchWindowBatchStatement statement) {
+    GroupByTimeParameter groupByTimeParameter = statement.getGroupByTimeParameter();
+    ITimeRangeIterator iterator =
+        TimeRangeIteratorFactory.getTimeRangeIterator(
+            groupByTimeParameter.getStartTime(),
+            groupByTimeParameter.getEndTime(),
+            groupByTimeParameter.getInterval(),
+            groupByTimeParameter.getSlidingStep(),
+            true,
+            false,
+            false,
+            true,
+            false);
+    long totalNum = iterator.getTotalIntervalNum();
+
+    long minIndex = Long.MAX_VALUE, maxIndex = -1;
+    for (Integer index : statement.getSamplingIndexes()) {
+      minIndex = Math.min(minIndex, index);
+      maxIndex = Math.max(maxIndex, index);
+      if (index < 0 || index >= totalNum) {
+        throw new SemanticException("index should be bulabula...");
+      }
+    }
+
+    long minTime =
+        groupByTimeParameter.getStartTime() + minIndex * groupByTimeParameter.getSlidingStep();
+    long maxTime =
+        groupByTimeParameter.getStartTime() + maxIndex * groupByTimeParameter.getSlidingStep();
+    analysis.setGlobalTimeFilter(
+        FilterFactory.and(TimeFilter.gtEq(minTime), TimeFilter.ltEq(maxTime)));
+  }
+
   @Override
   public Analysis visitInsert(InsertStatement insertStatement, MPPQueryContext context) {
     context.setQueryType(QueryType.WRITE);