You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/03/01 13:50:03 UTC

[incubator-iotdb] branch optimize_series_reader updated: new groupby (#862)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch optimize_series_reader
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/optimize_series_reader by this push:
     new 34476e5  new groupby (#862)
34476e5 is described below

commit 34476e5dab28880cfe659aef01b346f35b0d11b4
Author: Dawei Liu <at...@163.com>
AuthorDate: Sun Mar 1 21:49:54 2020 +0800

    new groupby (#862)
    
    * new multithread groupby and  GroupByExecutor
---
 .../db/query/aggregation/AggregateResult.java      |   9 +-
 .../db/query/aggregation/impl/AvgAggrResult.java   |   6 +
 .../groupby/GroupByWithoutValueFilterDataSet.java  | 376 +++++++++++----------
 .../reader/series/SeriesRawDataBatchReader.java    |   2 +-
 4 files changed, 221 insertions(+), 172 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
index 4b88bca..5483bd3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
@@ -74,11 +74,12 @@ public abstract class AggregateResult {
    * @param dataInThisPage the data in Page
    */
   public abstract void updateResultFromPageData(BatchData dataInThisPage) throws IOException;
+
   /**
    * Aggregate results cannot be calculated using Statistics directly, using the data in each page
    *
    * @param dataInThisPage the data in Page
-   * @param bound calculate points whose time < bound
+   * @param bound          calculate points whose time < bound
    */
   public abstract void updateResultFromPageData(BatchData dataInThisPage, long bound)
       throws IOException;
@@ -170,6 +171,12 @@ public abstract class AggregateResult {
 
   public void reset() {
     hasResult = false;
+    booleanValue = false;
+    doubleValue = 0;
+    floatValue = 0;
+    intValue = 0;
+    longValue = 0;
+    binaryValue = null;
   }
 
   protected Object getValue() {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
index 2c4fd7a..1e44444 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
@@ -143,4 +143,10 @@ public class AvgAggrResult extends AggregateResult {
     return cnt;
   }
 
+  @Override
+  public void reset() {
+    super.reset();
+    cnt = 0;
+    avg = 0;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index f756816..944b566 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -19,6 +19,14 @@
 
 package org.apache.iotdb.db.query.dataset.groupby;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -27,35 +35,30 @@ import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.factory.AggregateResultFactory;
+import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
 import org.apache.iotdb.db.query.reader.series.IAggregateReader;
 import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.*;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
 
-  /**
-   * Merges same series to one map. For example: Given: paths: s1, s2, s3, s1 and aggregations:
-   * count, sum, count, sum seriesMap: s1 -> 0, 3; s2 -> 1; s3 -> 2
-   */
-  private Map<Path, List<Integer>> pathToAggrIndexesMap;
+  private static final Logger logger = LoggerFactory
+      .getLogger(GroupByWithoutValueFilterDataSet.class);
 
-  /**
-   * Maps path and its aggregate reader
-   */
-  private Map<Path, IAggregateReader> aggregateReaders;
-  private List<BatchData> cachedBatchDataList;
-  private GroupByPlan groupByPlan;
+  private Map<Path, GroupByExecutor> pathExecutors = new HashMap<>();
+  private TimeRange timeRange;
 
   /**
    * constructor.
@@ -64,45 +67,33 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
       throws StorageEngineException {
     super(context, groupByPlan);
 
-    this.pathToAggrIndexesMap = new HashMap<>();
-    this.aggregateReaders = new HashMap<>();
-    this.cachedBatchDataList = new ArrayList<>();
-    for (int i = 0; i < paths.size(); i++) {
-      cachedBatchDataList.add(null);
-    }
     initGroupBy(context, groupByPlan);
   }
 
-  /**
-   * init reader and aggregate function.
-   */
   private void initGroupBy(QueryContext context, GroupByPlan groupByPlan)
       throws StorageEngineException {
     IExpression expression = groupByPlan.getExpression();
-    this.groupByPlan = groupByPlan;
 
     Filter timeFilter = null;
-    // init reader
     if (expression != null) {
       timeFilter = ((GlobalTimeExpression) expression).getFilter();
     }
 
     for (int i = 0; i < paths.size(); i++) {
       Path path = paths.get(i);
-      List<Integer> indexList = pathToAggrIndexesMap
-          .computeIfAbsent(path, key -> new ArrayList<>());
-      indexList.add(i);
-      if (!aggregateReaders.containsKey(path)) {
-
-        QueryDataSource queryDataSource = QueryResourceManager.getInstance()
-            .getQueryDataSource(path, context, timeFilter);
-        // update filter by TTL
-        timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
-
-        IAggregateReader seriesReader = new SeriesAggregateReader(path, dataTypes.get(i), context,
-            queryDataSource, timeFilter, null, null);
-        aggregateReaders.put(path, seriesReader);
-      }
+
+      QueryDataSource queryDataSource = QueryResourceManager.getInstance()
+          .getQueryDataSource(path, context, timeFilter);
+      // update filter by TTL
+      timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+      //init reader
+      pathExecutors.putIfAbsent(path,
+          new GroupByExecutor(path, dataTypes.get(i), context, queryDataSource, timeFilter));
+
+      AggregateResult aggrResult = AggregateResultFactory
+          .getAggrResultByName(groupByPlan.getDeduplicatedAggregations().get(i),
+              dataTypes.get(i));
+      pathExecutors.get(path).addAggregateResult(aggrResult, i);
     }
   }
 
@@ -114,157 +105,202 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
     }
     hasCachedTimeInterval = false;
     RowRecord record = new RowRecord(curStartTime);
-    AggregateResult[] aggregateResultList = new AggregateResult[paths.size()];
-    for (Map.Entry<Path, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
-      List<AggregateResult> aggregateResults;
+    timeRange = new TimeRange(curStartTime, curEndTime - 1);
+
+    final AggregateResult[] fields = new AggregateResult[paths.size()];
+    final List<Future> asyncResult = new ArrayList(pathExecutors.size());
+
+    for (Entry<Path, GroupByExecutor> executorEntry : pathExecutors.entrySet()) {
+      asyncResult.add(QueryTaskPoolManager.getInstance().submit((Callable<?>) () -> {
+        GroupByExecutor executor = executorEntry.getValue();
+
+        executor.resetAggregateResults();
+        List<Pair<AggregateResult, Integer>> aggregations = executor.calcResult();
+        for (int i = 0; i < aggregations.size(); i++) {
+          fields[aggregations.get(i).right] = aggregations.get(i).left;
+        }
+        return null;
+      }));
+    }
+    //waiting for data
+    for (Future future : asyncResult) {
       try {
-        aggregateResults = nextIntervalAggregation(entry);
-      } catch (QueryProcessException e) {
+        future.get();
+      } catch (Exception e) {
+        logger.error("GroupByWithoutValueFilterDataSet execute has error,{}", e);
         throw new IOException(e);
       }
-      int index = 0;
-      for (int i : entry.getValue()) {
-        aggregateResultList[i] = aggregateResults.get(index);
-        index++;
-      }
     }
-    if (aggregateResultList.length == 0) {
-      record.addField(new Field(null));
-    } else {
-      for (AggregateResult res : aggregateResultList) {
-        record.addField(res.getResult(), res.getResultDataType());
+
+    for (AggregateResult res : fields) {
+      if (res == null) {
+        record.addField(new Field(null));
+        continue;
       }
+      record.addField(res.getResult(), res.getResultDataType());
     }
     return record;
   }
 
-  /**
-   * calculate the group by result of one series
-   *
-   * @param pathToAggrIndexes entry of path to aggregation indexes map
-   */
-  private List<AggregateResult> nextIntervalAggregation(Map.Entry<Path,
-      List<Integer>> pathToAggrIndexes) throws IOException, QueryProcessException {
-    List<AggregateResult> aggregateResultList = new ArrayList<>();
-    List<Boolean> isCalculatedList = new ArrayList<>();
-    List<Integer> indexList = pathToAggrIndexes.getValue();
-
-    int remainingToCalculate = indexList.size();
-    TSDataType tsDataType = groupByPlan.getDeduplicatedDataTypes().get(indexList.get(0));
-
-    for (int index : indexList) {
-      AggregateResult result = AggregateResultFactory
-          .getAggrResultByName(groupByPlan.getDeduplicatedAggregations().get(index), tsDataType);
-      aggregateResultList.add(result);
-
-      BatchData lastBatch = cachedBatchDataList.get(index);
-
-      calcBatchData(result, lastBatch);
-      if (isEndCalc(result, lastBatch)) {
-        isCalculatedList.add(true);
-        remainingToCalculate--;
-        if (remainingToCalculate == 0) {
-          return aggregateResultList;
+
+  private class GroupByExecutor {
+
+    private IAggregateReader reader;
+    private BatchData preCachedData;
+    //<aggFunction - indexForRecord> of path
+    private List<Pair<AggregateResult, Integer>> results = new ArrayList<>();
+
+    public GroupByExecutor(Path path, TSDataType dataType, QueryContext context,
+        QueryDataSource dataSource, Filter timeFilter) {
+      this.reader = new SeriesAggregateReader(path, dataType, context,
+          dataSource, timeFilter, null, null);
+      this.preCachedData = null;
+    }
+
+    public IAggregateReader getReader() {
+      return reader;
+    }
+
+    public void addAggregateResult(AggregateResult aggrResult, int index) {
+      results.add(new Pair<>(aggrResult, index));
+    }
+
+    public boolean isEndCalc() {
+      for (Pair<AggregateResult, Integer> result : results) {
+        if (result.left.isCalculatedAggregationResult() == false) {
+          return false;
         }
-      } else {
-        isCalculatedList.add(false);
       }
+      return true;
     }
-    TimeRange timeRange = new TimeRange(curStartTime, curEndTime - 1);
-    IAggregateReader reader = aggregateReaders.get(pathToAggrIndexes.getKey());
-
-    while (reader.hasNextChunk()) {
-      // cal by chunk statistics
-      Statistics chunkStatistics = reader.currentChunkStatistics();
-      if (chunkStatistics.getStartTime() >= curEndTime) {
-        return aggregateResultList;
+
+    public boolean calcFromCacheData() throws IOException {
+      calcFromBatch(preCachedData);
+      // The result is calculated from the cache
+      if ((preCachedData != null && preCachedData.getMaxTimestamp() >= curEndTime) || isEndCalc()) {
+        return true;
       }
-      if (reader.canUseCurrentChunkStatistics() && timeRange.contains(
-          new TimeRange(chunkStatistics.getStartTime(), chunkStatistics.getEndTime()))) {
-        for (int i = 0; i < aggregateResultList.size(); i++) {
-          if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
-            AggregateResult result = aggregateResultList.get(i);
-            result.updateResultFromStatistics(chunkStatistics);
-            if (result.isCalculatedAggregationResult()) {
-              isCalculatedList.set(i, true);
-              remainingToCalculate--;
-              if (remainingToCalculate == 0) {
-                return aggregateResultList;
-              }
-            }
-          }
-        }
-        reader.skipCurrentChunk();
-        continue;
+      return false;
+    }
+
+    public void calcFromBatch(BatchData batchData) throws IOException {
+      // is error data
+      if (batchData == null
+          || !batchData.hasCurrent()
+          || batchData.getMaxTimestamp() < curStartTime
+          || batchData.currentTime() >= curEndTime) {
+        return;
       }
 
-      while (reader.hasNextPage()) {
-        //cal by page statistics
-        Statistics pageStatistics = reader.currentPageStatistics();
-        if (pageStatistics.getStartTime() >= curEndTime) {
-          return aggregateResultList;
+      for (Pair<AggregateResult, Integer> result : results) {
+        //current agg method has been calculated
+        if (result.left.isCalculatedAggregationResult()) {
+          continue;
         }
-        if (reader.canUseCurrentPageStatistics() && timeRange.contains(
-            new TimeRange(pageStatistics.getStartTime(), pageStatistics.getEndTime()))) {
-          for (int i = 0; i < aggregateResultList.size(); i++) {
-            if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
-              AggregateResult result = aggregateResultList.get(i);
-              result.updateResultFromStatistics(pageStatistics);
-              if (result.isCalculatedAggregationResult()) {
-                isCalculatedList.set(i, true);
-                remainingToCalculate--;
-                if (remainingToCalculate == 0) {
-                  return aggregateResultList;
-                }
-              }
-            }
-          }
-          reader.skipCurrentPage();
+        //lazy reset batch data for calculation
+        batchData.resetBatchData();
+        //skip points that cannot be calculated
+        while (batchData.currentTime() < curStartTime && batchData.hasCurrent()) {
+          batchData.next();
+        }
+        if (batchData.hasCurrent()) {
+          result.left.updateResultFromPageData(batchData, curEndTime);
+        }
+      }
+      //can calc for next interval
+      if (batchData.getMaxTimestamp() >= curEndTime) {
+        preCachedData = batchData;
+      }
+    }
+
+    public void calcFromStatistics(Statistics pageStatistics)
+        throws QueryProcessException {
+      for (Pair<AggregateResult, Integer> result : results) {
+        //cacl is compile
+        if (result.left.isCalculatedAggregationResult()) {
           continue;
-        } else {
-          BatchData batchData = reader.nextPage();
-          for (int i = 0; i < aggregateResultList.size(); i++) {
-            if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
-              AggregateResult result = aggregateResultList.get(i);
-              calcBatchData(result, batchData);
-              int idx = pathToAggrIndexes.getValue().get(i);
-              if (batchData.hasCurrent()) {
-                cachedBatchDataList.set(idx, batchData);
-              }
-              if (isEndCalc(result, null)) {
-                isCalculatedList.set(i, true);
-                remainingToCalculate--;
-                if (remainingToCalculate == 0) {
-                  break;
-                }
-              }
-            }
-          }
         }
+        result.left.updateResultFromStatistics(pageStatistics);
       }
     }
-    return aggregateResultList;
-  }
 
-  private boolean isEndCalc(AggregateResult function, BatchData lastBatch) {
-    return (lastBatch != null && lastBatch.hasCurrent() && lastBatch.currentTime() >= curEndTime)
-        || function.isCalculatedAggregationResult();
-  }
+    private List<Pair<AggregateResult, Integer>> calcResult()
+        throws IOException, QueryProcessException {
+      if (calcFromCacheData()) {
+        return results;
+      }
 
-  /**
-   * this batchData >= curEndTime
-   */
-  private void calcBatchData(AggregateResult result, BatchData batchData) throws IOException {
-    if (batchData == null || !batchData.hasCurrent()) {
-      return;
+      //read page data firstly
+      if (readAndCalcFromPage()) {
+        return results;
+      }
+
+      //read chunk finally
+      while (reader.hasNextChunk()) {
+        Statistics chunkStatistics = reader.currentChunkStatistics();
+        if (chunkStatistics.getStartTime() >= curEndTime) {
+          return results;
+        }
+        //calc from chunkMetaData
+        if (reader.canUseCurrentChunkStatistics() && timeRange.contains(
+            new TimeRange(chunkStatistics.getStartTime(), chunkStatistics.getEndTime()))) {
+          calcFromStatistics(chunkStatistics);
+          reader.skipCurrentChunk();
+          continue;
+        }
+        if (readAndCalcFromPage()) {
+          return results;
+        }
+      }
+      return results;
     }
-    while (batchData.hasCurrent() && batchData.currentTime() < curStartTime) {
-      batchData.next();
+
+    // clear all results
+    public void resetAggregateResults() {
+      for (Pair<AggregateResult, Integer> result : results) {
+        result.left.reset();
+      }
     }
-    if (batchData.hasCurrent()) {
-      result.updateResultFromPageData(batchData, curEndTime);
-      // reset batch data for next calculation
-      batchData.resetBatchData();
+
+
+    private boolean readAndCalcFromPage() throws IOException, QueryProcessException {
+      while (reader.hasNextPage()) {
+        Statistics pageStatistics = reader.currentPageStatistics();
+        //must be non overlapped page
+        if (pageStatistics != null) {
+          //current page max than time range
+          if (pageStatistics.getStartTime() >= curEndTime) {
+            return true;
+          }
+          //can use pageHeader
+          if (reader.canUseCurrentPageStatistics() && timeRange.contains(
+              new TimeRange(pageStatistics.getStartTime(), pageStatistics.getEndTime()))) {
+            calcFromStatistics(pageStatistics);
+            reader.skipCurrentPage();
+            if (isEndCalc()) {
+              return true;
+            }
+            continue;
+          }
+        }
+        // calc from page data
+        BatchData batchData = reader.nextPage();
+        if (batchData == null || !batchData.hasCurrent()) {
+          continue;
+        }
+        // stop calc and cached current batchData
+        if (batchData.currentTime() >= curEndTime) {
+          preCachedData = batchData;
+          return true;
+        }
+
+        calcFromBatch(batchData);
+        if (isEndCalc() || batchData.currentTime() >= curEndTime) {
+          return true;
+        }
+      }
+      return false;
     }
   }
+
 }
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
index 19f3810..9e5e654 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
@@ -71,7 +71,7 @@ public class SeriesRawDataBatchReader implements ManagedSeriesReader {
     }
 
     /*
-     * consume page data secondly
+     * consume page data firstly
      */
     if (readPageData()) {
       return hasCachedBatchData = true;