You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/07/17 04:24:29 UTC

[incubator-iotdb] 01/01: improve group by

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ImproveGroupBy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit c3786dfa8a44eeb74a5b38de5f6a8cd7d90ce219
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Fri Jul 17 12:23:58 2020 +0800

    improve group by
---
 .../query/dataset/groupby/LocalGroupByExecutor.java   | 19 +++++++++++++++----
 .../apache/iotdb/tsfile/read/common/BatchData.java    | 13 +++++++++++++
 2 files changed, 28 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
index 007843c..3b1d9df 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
@@ -42,12 +42,16 @@ import java.util.Set;
 
 public class LocalGroupByExecutor implements GroupByExecutor {
 
-  private IAggregateReader reader;
+  private final IAggregateReader reader;
   private BatchData preCachedData;
 
   // Aggregate result buffer of this path
-  private List<AggregateResult> results = new ArrayList<>();
-  private TimeRange timeRange;
+  private final List<AggregateResult> results = new ArrayList<>();
+  private final TimeRange timeRange;
+
+  // used for resetting the batch data to the last index
+  private int lastReadCurArrayIndex;
+  private int lastReadCurListIndex;
 
   public LocalGroupByExecutor(Path path, Set<String> allSensors, TSDataType dataType,
       QueryContext context, Filter timeFilter, TsFileFilter fileFilter)
@@ -60,6 +64,8 @@ public class LocalGroupByExecutor implements GroupByExecutor {
         timeFilter, null, fileFilter);
     this.preCachedData = null;
     timeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE);
+    lastReadCurArrayIndex = 0;
+    lastReadCurListIndex = 0;
   }
 
   @Override
@@ -96,7 +102,7 @@ public class LocalGroupByExecutor implements GroupByExecutor {
         continue;
       }
       // lazy reset batch data for calculation
-      batchData.resetBatchData();
+      batchData.resetBatchData(lastReadCurArrayIndex, lastReadCurListIndex);
       // skip points that cannot be calculated
       while (batchData.hasCurrent() && batchData.currentTime() < curStartTime) {
         batchData.next();
@@ -105,6 +111,8 @@ public class LocalGroupByExecutor implements GroupByExecutor {
         result.updateResultFromPageData(batchData, curEndTime);
       }
     }
+    lastReadCurArrayIndex = batchData.getReadCurArrayIndex();
+    lastReadCurListIndex = batchData.getReadCurListIndex();
     // can calc for next interval
     if (batchData.getMaxTimestamp() >= curEndTime) {
       preCachedData = batchData;
@@ -221,6 +229,9 @@ public class LocalGroupByExecutor implements GroupByExecutor {
         return true;
       }
 
+      // reset the last position to zero
+      lastReadCurArrayIndex = 0;
+      lastReadCurListIndex = 0;
       calcFromBatch(batchData, curStartTime, curEndTime);
 
       // judge whether the calculation finished
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index a893c1b..b4d6053 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -580,4 +580,17 @@ public class BatchData implements Serializable {
     this.readCurArrayIndex = 0;
     this.readCurListIndex = 0;
   }
+
+  public void resetBatchData(int readCurArrayIndex, int readCurListIndex) {
+    this.readCurArrayIndex = readCurArrayIndex;
+    this.readCurListIndex = readCurListIndex;
+  }
+
+  public int getReadCurListIndex() {
+    return readCurListIndex;
+  }
+
+  public int getReadCurArrayIndex() {
+    return readCurArrayIndex;
+  }
 }