You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/02/28 09:30:31 UTC

[incubator-iotdb] 01/01: parallel aggregation query

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch parallel_aggregation
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 3fb340e1408cd6371426f3d58b1d2d4d74144b79
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri Feb 28 17:30:07 2020 +0800

    parallel aggregation query
---
 .../db/query/executor/AggregationExecutor.java     | 237 ++++++++++++---------
 .../db/query/reader/series/IAggregateReader.java   |   3 +
 .../query/reader/series/SeriesAggregateReader.java |   5 +
 .../iotdb/db/query/reader/series/SeriesReader.java |   5 +
 .../iotdb/db/integration/IoTDBAggregationIT.java   |  10 +-
 .../integration/IoTDBAggregationSmallDataIT.java   |   2 +-
 6 files changed, 161 insertions(+), 101 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index c39a392..a5f56c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -25,6 +25,9 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.StorageEngineException;
@@ -36,6 +39,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.dataset.SingleDataSet;
 import org.apache.iotdb.db.query.factory.AggregateResultFactory;
+import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.series.IAggregateReader;
 import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
@@ -50,6 +54,9 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class AggregationExecutor {
 
@@ -58,6 +65,8 @@ public class AggregationExecutor {
   private List<String> aggregations;
   private IExpression expression;
 
+  private static final QueryTaskPoolManager pool = QueryTaskPoolManager.getInstance();
+
   /**
    * aggregation batch calculation size.
    **/
@@ -72,131 +81,169 @@ public class AggregationExecutor {
   }
 
   /**
-   * execute aggregate function with only time filter or no filter.
-   *
-   * @param context query context
+   * Aggregate one series
    */
-  public QueryDataSet executeWithoutValueFilter(QueryContext context)
-      throws StorageEngineException, IOException, QueryProcessException {
+  class AggregationTask implements Callable<Pair<Path, List<AggregateResult>>> {
 
-    Filter timeFilter = null;
-    if (expression != null) {
-      timeFilter = ((GlobalTimeExpression) expression).getFilter();
+    // path to aggregation result indexes
+    private Map.Entry<Path, List<Integer>> pathToAggrIndexes;
+    private IAggregateReader reader;
+
+    public AggregationTask(IAggregateReader reader, Map.Entry<Path, List<Integer>> pathToAggrIndexes) {
+      this.reader = reader;
+      this.pathToAggrIndexes = pathToAggrIndexes;
     }
 
-    // TODO use multi-thread
-    Map<Path, List<Integer>> pathToAggrIndexesMap = groupAggregationsBySeries(selectedSeries);
-    AggregateResult[] aggregateResultList = new AggregateResult[selectedSeries.size()];
-    for (Map.Entry<Path, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
-      List<AggregateResult> aggregateResults = aggregateOneSeries(entry, timeFilter, context);
-      int index = 0;
-      for (int i : entry.getValue()) {
-        aggregateResultList[i] = aggregateResults.get(index);
-        index++;
-      }
+    @Override
+    public Pair<Path, List<AggregateResult>> call() throws QueryProcessException, IOException {
+      return aggregateOneSeries();
     }
 
-    return constructDataSet(Arrays.asList(aggregateResultList));
-  }
+    /**
+     * get aggregation result for one series
+     *
+     * @return AggregateResult list
+     */
+    private Pair<Path, List<AggregateResult>> aggregateOneSeries()
+        throws IOException, QueryProcessException {
+      List<AggregateResult> aggregateResultList = new ArrayList<>();
+      List<Boolean> isCalculatedList = new ArrayList<>();
+      Path seriesPath = pathToAggrIndexes.getKey();
 
-  /**
-   * get aggregation result for one series
-   *
-   * @param pathToAggrIndexes entry of path to aggregation indexes map
-   * @param timeFilter time filter
-   * @param context query context
-   * @return AggregateResult list
-   */
-  private List<AggregateResult> aggregateOneSeries(
-      Map.Entry<Path, List<Integer>> pathToAggrIndexes,
-      Filter timeFilter, QueryContext context)
-      throws IOException, QueryProcessException, StorageEngineException {
-    List<AggregateResult> aggregateResultList = new ArrayList<>();
-    List<Boolean> isCalculatedList = new ArrayList<>();
-    Path seriesPath = pathToAggrIndexes.getKey();
-    TSDataType tsDataType = dataTypes.get(pathToAggrIndexes.getValue().get(0));
-
-    // construct series reader without value filter
-    QueryDataSource queryDataSource = QueryResourceManager.getInstance()
-        .getQueryDataSource(seriesPath, context, timeFilter);
-    // update filter by TTL
-    timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
-
-    IAggregateReader seriesReader = new SeriesAggregateReader(pathToAggrIndexes.getKey(),
-        tsDataType, context, queryDataSource, timeFilter, null, null);
-
-    for (int i : pathToAggrIndexes.getValue()) {
-      // construct AggregateResult
-      AggregateResult aggregateResult = AggregateResultFactory
-          .getAggrResultByName(aggregations.get(i), tsDataType);
-      aggregateResultList.add(aggregateResult);
-      isCalculatedList.add(false);
-    }
-    int remainingToCalculate = pathToAggrIndexes.getValue().size();
-
-    while (seriesReader.hasNextChunk()) {
-      // cal by chunk statistics
-      if (seriesReader.canUseCurrentChunkStatistics()) {
-        Statistics chunkStatistics = seriesReader.currentChunkStatistics();
-        for (int i = 0; i < aggregateResultList.size(); i++) {
-          if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
-            AggregateResult aggregateResult = aggregateResultList.get(i);
-            aggregateResult.updateResultFromStatistics(chunkStatistics);
-            if (aggregateResult.isCalculatedAggregationResult()) {
-              isCalculatedList.set(i, true);
-              remainingToCalculate--;
-              if (remainingToCalculate == 0) {
-                return aggregateResultList;
-              }
-            }
-          }
-        }
-        seriesReader.skipCurrentChunk();
-        continue;
+      for (int i : pathToAggrIndexes.getValue()) {
+        // construct AggregateResult
+        AggregateResult aggregateResult = AggregateResultFactory
+            .getAggrResultByName(aggregations.get(i), reader.getSeriesDataType());
+        aggregateResultList.add(aggregateResult);
+        isCalculatedList.add(false);
       }
-      while (seriesReader.hasNextPage()) {
-        //cal by page statistics
-        if (seriesReader.canUseCurrentPageStatistics()) {
-          Statistics pageStatistic = seriesReader.currentPageStatistics();
+      int remainingToCalculate = pathToAggrIndexes.getValue().size();
+
+      while (reader.hasNextChunk()) {
+        // cal by chunk statistics
+        if (reader.canUseCurrentChunkStatistics()) {
+          Statistics chunkStatistics = reader.currentChunkStatistics();
           for (int i = 0; i < aggregateResultList.size(); i++) {
             if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
               AggregateResult aggregateResult = aggregateResultList.get(i);
-              aggregateResult.updateResultFromStatistics(pageStatistic);
+              aggregateResult.updateResultFromStatistics(chunkStatistics);
               if (aggregateResult.isCalculatedAggregationResult()) {
                 isCalculatedList.set(i, true);
                 remainingToCalculate--;
                 if (remainingToCalculate == 0) {
-                  return aggregateResultList;
+                  return new Pair<>(seriesPath, aggregateResultList);
                 }
               }
             }
           }
-          seriesReader.skipCurrentPage();
+          reader.skipCurrentChunk();
           continue;
         }
-        // cal by page data
-        while (seriesReader.hasNextOverlappedPage()) {
-          BatchData nextOverlappedPageData = seriesReader.nextOverlappedPage();
-          for (int i = 0; i < aggregateResultList.size(); i++) {
-            if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
-              AggregateResult aggregateResult = aggregateResultList.get(i);
-              aggregateResult.updateResultFromPageData(nextOverlappedPageData);
-              nextOverlappedPageData.resetBatchData();
-              if (aggregateResult.isCalculatedAggregationResult()) {
-                isCalculatedList.set(i, true);
-                remainingToCalculate--;
-                if (remainingToCalculate == 0) {
-                  return aggregateResultList;
+        while (reader.hasNextPage()) {
+          //cal by page statistics
+          if (reader.canUseCurrentPageStatistics()) {
+            Statistics pageStatistic = reader.currentPageStatistics();
+            for (int i = 0; i < aggregateResultList.size(); i++) {
+              if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
+                AggregateResult aggregateResult = aggregateResultList.get(i);
+                aggregateResult.updateResultFromStatistics(pageStatistic);
+                if (aggregateResult.isCalculatedAggregationResult()) {
+                  isCalculatedList.set(i, true);
+                  remainingToCalculate--;
+                  if (remainingToCalculate == 0) {
+                    return new Pair<>(seriesPath, aggregateResultList);
+                  }
+                }
+              }
+            }
+            reader.skipCurrentPage();
+            continue;
+          }
+          // cal by page data
+          while (reader.hasNextOverlappedPage()) {
+            BatchData nextOverlappedPageData = reader.nextOverlappedPage();
+            for (int i = 0; i < aggregateResultList.size(); i++) {
+              if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
+                AggregateResult aggregateResult = aggregateResultList.get(i);
+                aggregateResult.updateResultFromPageData(nextOverlappedPageData);
+                nextOverlappedPageData.resetBatchData();
+                if (aggregateResult.isCalculatedAggregationResult()) {
+                  isCalculatedList.set(i, true);
+                  remainingToCalculate--;
+                  if (remainingToCalculate == 0) {
+                    return new Pair<>(seriesPath, aggregateResultList);
+                  }
                 }
               }
             }
           }
         }
       }
+      return new Pair<>(seriesPath, aggregateResultList);
+    }
+  }
+
+  /**
+   * execute aggregate function with only time filter or no filter.
+   *
+   * @param context query context
+   */
+  public QueryDataSet executeWithoutValueFilter(QueryContext context)
+      throws StorageEngineException, QueryProcessException {
+
+    Filter timeFilter = null;
+    if (expression != null) {
+      timeFilter = ((GlobalTimeExpression) expression).getFilter();
+    }
+
+    AggregateResult[] finalAggregateResults = new AggregateResult[selectedSeries.size()];
+    Map<Path, List<Integer>> pathToAggrIndexesMap = groupAggregationsBySeries(selectedSeries);
+
+    /*
+     * submit AggregationTask for each series
+     */
+    List<Future<Pair<Path, List<AggregateResult>>>> futureList = new ArrayList<>();
+    for (Map.Entry<Path, List<Integer>> pathToAggrIndex : pathToAggrIndexesMap.entrySet()) {
+
+      Path seriesPath = pathToAggrIndex.getKey();
+      // construct series reader without value filter
+      QueryDataSource queryDataSource = QueryResourceManager.getInstance()
+          .getQueryDataSource(seriesPath, context, timeFilter);
+      // update filter by TTL
+      timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+      TSDataType tsDataType = dataTypes.get(pathToAggrIndex.getValue().get(0));
+
+      IAggregateReader seriesReader = new SeriesAggregateReader(pathToAggrIndex.getKey(),
+          tsDataType, context, queryDataSource, timeFilter, null, null);
+
+      Future<Pair<Path, List<AggregateResult>>> future = pool
+          .submit(new AggregationTask(seriesReader, pathToAggrIndex));
+      futureList.add(future);
+    }
+
+    /*
+     * get AggregateResults for each series and put to final finalAggregateResults
+     */
+    for (Future<Pair<Path, List<AggregateResult>>> future: futureList) {
+      try {
+        Pair<Path, List<AggregateResult>> currentSeriesResults = future.get();
+        // final result index of current series
+        List<Integer> resultIndexList = pathToAggrIndexesMap.get(currentSeriesResults.left);
+        List<AggregateResult> resultList = currentSeriesResults.right;
+
+        // put current series results to final finalAggregateResults
+        for (int i = 0; i < resultIndexList.size(); i++) {
+          finalAggregateResults[resultIndexList.get(i)] = resultList.get(i);
+        }
+      } catch (Exception e) {
+        throw new QueryProcessException(e.getMessage());
+      }
     }
-    return aggregateResultList;
+    return constructDataSet(Arrays.asList(finalAggregateResults));
   }
 
+
   /**
    * execute aggregate function with value filter.
    *
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java
index bd22f3e..58e6894 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.query.reader.series;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 
@@ -50,4 +51,6 @@ public interface IAggregateReader {
   boolean hasNextOverlappedPage() throws IOException;
 
   BatchData nextOverlappedPage() throws IOException;
+
+  TSDataType getSeriesDataType();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
index d711f02..7081c6e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
@@ -93,6 +93,11 @@ public class SeriesAggregateReader implements IAggregateReader {
     return seriesReader.nextOverlappedPage();
   }
 
+  @Override
+  public TSDataType getSeriesDataType() {
+    return seriesReader.getSeriesDataType();
+  }
+
 
   private boolean containedByTimeFilter(Statistics statistics) {
     Filter timeFilter = seriesReader.getTimeFilter();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 8b1f795..c08fcdc 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -53,6 +53,7 @@ import java.util.*;
 public class SeriesReader {
 
   private final Path seriesPath;
+
   private final TSDataType dataType;
   private final QueryContext context;
   private final Filter timeFilter;
@@ -456,6 +457,10 @@ public class SeriesReader {
     return timeFilter;
   }
 
+  public TSDataType getSeriesDataType() {
+    return dataType;
+  }
+
   private class VersionPair<T> {
 
     protected long version;
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java
index da6d3bd..4d1d237 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java
@@ -463,7 +463,7 @@ public class IoTDBAggregationIT {
   }
 
   @Test
-  public void avgSumErrorTest() throws SQLException {
+  public void avgSumErrorTest() {
     try (Connection connection = DriverManager.
         getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
         Statement statement = connection.createStatement()) {
@@ -474,7 +474,7 @@ public class IoTDBAggregationIT {
         resultSet.next();
         fail();
       } catch (Exception e) {
-        Assert.assertEquals("500: Unsupported data type in aggregation AVG : TEXT", e.getMessage());
+        Assert.assertTrue(e.getMessage().contains("Unsupported data type in aggregation AVG : TEXT"));
       }
       try {
         statement.execute("SELECT sum(s3)" +
@@ -483,7 +483,7 @@ public class IoTDBAggregationIT {
         resultSet.next();
         fail();
       } catch (Exception e) {
-        Assert.assertEquals("500: Unsupported data type in aggregation SUM : TEXT", e.getMessage());
+        Assert.assertTrue(e.getMessage().contains("Unsupported data type in aggregation SUM : TEXT"));
       }
       try {
         statement.execute("SELECT avg(s4)" +
@@ -492,7 +492,7 @@ public class IoTDBAggregationIT {
         resultSet.next();
         fail();
       } catch (Exception e) {
-        Assert.assertEquals("500: Unsupported data type in aggregation AVG : BOOLEAN", e.getMessage());
+        Assert.assertTrue(e.getMessage().contains("Unsupported data type in aggregation AVG : BOOLEAN"));
       }
       try {
         statement.execute("SELECT sum(s4)" +
@@ -501,7 +501,7 @@ public class IoTDBAggregationIT {
         resultSet.next();
         fail();
       } catch (Exception e) {
-        Assert.assertEquals("500: Unsupported data type in aggregation SUM : BOOLEAN", e.getMessage());
+        Assert.assertTrue(e.getMessage().contains("Unsupported data type in aggregation SUM : BOOLEAN"));
       }
     } catch (Exception e) {
       e.printStackTrace();
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataIT.java
index 05fbcb1..59fef21 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataIT.java
@@ -205,7 +205,7 @@ public class IoTDBAggregationSmallDataIT {
             "SELECT max_value(d0.s0),max_value(d1.s1),max_value(d0.s3) FROM root.vehicle");
         fail();
       } catch (IoTDBSQLException e) {
-        Assert.assertEquals("500: Binary statistics does not support: max", e.getMessage());
+        Assert.assertTrue(e.getMessage().contains("Binary statistics does not support: max"));
       }
 
       hasResultSet = statement.execute(