You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/02/28 09:30:30 UTC

[incubator-iotdb] branch parallel_aggregation created (now 3fb340e)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a change to branch parallel_aggregation
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 3fb340e  parallel aggregation query

This branch includes the following new commits:

     new 3fb340e  parallel aggregation query

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: parallel aggregation query

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch parallel_aggregation
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 3fb340e1408cd6371426f3d58b1d2d4d74144b79
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri Feb 28 17:30:07 2020 +0800

    parallel aggregation query
---
 .../db/query/executor/AggregationExecutor.java     | 237 ++++++++++++---------
 .../db/query/reader/series/IAggregateReader.java   |   3 +
 .../query/reader/series/SeriesAggregateReader.java |   5 +
 .../iotdb/db/query/reader/series/SeriesReader.java |   5 +
 .../iotdb/db/integration/IoTDBAggregationIT.java   |  10 +-
 .../integration/IoTDBAggregationSmallDataIT.java   |   2 +-
 6 files changed, 161 insertions(+), 101 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index c39a392..a5f56c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -25,6 +25,9 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.StorageEngineException;
@@ -36,6 +39,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.dataset.SingleDataSet;
 import org.apache.iotdb.db.query.factory.AggregateResultFactory;
+import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.series.IAggregateReader;
 import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
@@ -50,6 +54,9 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class AggregationExecutor {
 
@@ -58,6 +65,8 @@ public class AggregationExecutor {
   private List<String> aggregations;
   private IExpression expression;
 
+  private static final QueryTaskPoolManager pool = QueryTaskPoolManager.getInstance();
+
   /**
    * aggregation batch calculation size.
    **/
@@ -72,131 +81,169 @@ public class AggregationExecutor {
   }
 
   /**
-   * execute aggregate function with only time filter or no filter.
-   *
-   * @param context query context
+   * Aggregate one series
    */
-  public QueryDataSet executeWithoutValueFilter(QueryContext context)
-      throws StorageEngineException, IOException, QueryProcessException {
+  class AggregationTask implements Callable<Pair<Path, List<AggregateResult>>> {
 
-    Filter timeFilter = null;
-    if (expression != null) {
-      timeFilter = ((GlobalTimeExpression) expression).getFilter();
+    // path to aggregation result indexes
+    private Map.Entry<Path, List<Integer>> pathToAggrIndexes;
+    private IAggregateReader reader;
+
+    public AggregationTask(IAggregateReader reader, Map.Entry<Path, List<Integer>> pathToAggrIndexes) {
+      this.reader = reader;
+      this.pathToAggrIndexes = pathToAggrIndexes;
     }
 
-    // TODO use multi-thread
-    Map<Path, List<Integer>> pathToAggrIndexesMap = groupAggregationsBySeries(selectedSeries);
-    AggregateResult[] aggregateResultList = new AggregateResult[selectedSeries.size()];
-    for (Map.Entry<Path, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
-      List<AggregateResult> aggregateResults = aggregateOneSeries(entry, timeFilter, context);
-      int index = 0;
-      for (int i : entry.getValue()) {
-        aggregateResultList[i] = aggregateResults.get(index);
-        index++;
-      }
+    @Override
+    public Pair<Path, List<AggregateResult>> call() throws QueryProcessException, IOException {
+      return aggregateOneSeries();
     }
 
-    return constructDataSet(Arrays.asList(aggregateResultList));
-  }
+    /**
+     * get aggregation result for one series
+     *
+     * @return AggregateResult list
+     */
+    private Pair<Path, List<AggregateResult>> aggregateOneSeries()
+        throws IOException, QueryProcessException {
+      List<AggregateResult> aggregateResultList = new ArrayList<>();
+      List<Boolean> isCalculatedList = new ArrayList<>();
+      Path seriesPath = pathToAggrIndexes.getKey();
 
-  /**
-   * get aggregation result for one series
-   *
-   * @param pathToAggrIndexes entry of path to aggregation indexes map
-   * @param timeFilter time filter
-   * @param context query context
-   * @return AggregateResult list
-   */
-  private List<AggregateResult> aggregateOneSeries(
-      Map.Entry<Path, List<Integer>> pathToAggrIndexes,
-      Filter timeFilter, QueryContext context)
-      throws IOException, QueryProcessException, StorageEngineException {
-    List<AggregateResult> aggregateResultList = new ArrayList<>();
-    List<Boolean> isCalculatedList = new ArrayList<>();
-    Path seriesPath = pathToAggrIndexes.getKey();
-    TSDataType tsDataType = dataTypes.get(pathToAggrIndexes.getValue().get(0));
-
-    // construct series reader without value filter
-    QueryDataSource queryDataSource = QueryResourceManager.getInstance()
-        .getQueryDataSource(seriesPath, context, timeFilter);
-    // update filter by TTL
-    timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
-
-    IAggregateReader seriesReader = new SeriesAggregateReader(pathToAggrIndexes.getKey(),
-        tsDataType, context, queryDataSource, timeFilter, null, null);
-
-    for (int i : pathToAggrIndexes.getValue()) {
-      // construct AggregateResult
-      AggregateResult aggregateResult = AggregateResultFactory
-          .getAggrResultByName(aggregations.get(i), tsDataType);
-      aggregateResultList.add(aggregateResult);
-      isCalculatedList.add(false);
-    }
-    int remainingToCalculate = pathToAggrIndexes.getValue().size();
-
-    while (seriesReader.hasNextChunk()) {
-      // cal by chunk statistics
-      if (seriesReader.canUseCurrentChunkStatistics()) {
-        Statistics chunkStatistics = seriesReader.currentChunkStatistics();
-        for (int i = 0; i < aggregateResultList.size(); i++) {
-          if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
-            AggregateResult aggregateResult = aggregateResultList.get(i);
-            aggregateResult.updateResultFromStatistics(chunkStatistics);
-            if (aggregateResult.isCalculatedAggregationResult()) {
-              isCalculatedList.set(i, true);
-              remainingToCalculate--;
-              if (remainingToCalculate == 0) {
-                return aggregateResultList;
-              }
-            }
-          }
-        }
-        seriesReader.skipCurrentChunk();
-        continue;
+      for (int i : pathToAggrIndexes.getValue()) {
+        // construct AggregateResult
+        AggregateResult aggregateResult = AggregateResultFactory
+            .getAggrResultByName(aggregations.get(i), reader.getSeriesDataType());
+        aggregateResultList.add(aggregateResult);
+        isCalculatedList.add(false);
       }
-      while (seriesReader.hasNextPage()) {
-        //cal by page statistics
-        if (seriesReader.canUseCurrentPageStatistics()) {
-          Statistics pageStatistic = seriesReader.currentPageStatistics();
+      int remainingToCalculate = pathToAggrIndexes.getValue().size();
+
+      while (reader.hasNextChunk()) {
+        // cal by chunk statistics
+        if (reader.canUseCurrentChunkStatistics()) {
+          Statistics chunkStatistics = reader.currentChunkStatistics();
           for (int i = 0; i < aggregateResultList.size(); i++) {
             if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
               AggregateResult aggregateResult = aggregateResultList.get(i);
-              aggregateResult.updateResultFromStatistics(pageStatistic);
+              aggregateResult.updateResultFromStatistics(chunkStatistics);
               if (aggregateResult.isCalculatedAggregationResult()) {
                 isCalculatedList.set(i, true);
                 remainingToCalculate--;
                 if (remainingToCalculate == 0) {
-                  return aggregateResultList;
+                  return new Pair<>(seriesPath, aggregateResultList);
                 }
               }
             }
           }
-          seriesReader.skipCurrentPage();
+          reader.skipCurrentChunk();
           continue;
         }
-        // cal by page data
-        while (seriesReader.hasNextOverlappedPage()) {
-          BatchData nextOverlappedPageData = seriesReader.nextOverlappedPage();
-          for (int i = 0; i < aggregateResultList.size(); i++) {
-            if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
-              AggregateResult aggregateResult = aggregateResultList.get(i);
-              aggregateResult.updateResultFromPageData(nextOverlappedPageData);
-              nextOverlappedPageData.resetBatchData();
-              if (aggregateResult.isCalculatedAggregationResult()) {
-                isCalculatedList.set(i, true);
-                remainingToCalculate--;
-                if (remainingToCalculate == 0) {
-                  return aggregateResultList;
+        while (reader.hasNextPage()) {
+          //cal by page statistics
+          if (reader.canUseCurrentPageStatistics()) {
+            Statistics pageStatistic = reader.currentPageStatistics();
+            for (int i = 0; i < aggregateResultList.size(); i++) {
+              if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
+                AggregateResult aggregateResult = aggregateResultList.get(i);
+                aggregateResult.updateResultFromStatistics(pageStatistic);
+                if (aggregateResult.isCalculatedAggregationResult()) {
+                  isCalculatedList.set(i, true);
+                  remainingToCalculate--;
+                  if (remainingToCalculate == 0) {
+                    return new Pair<>(seriesPath, aggregateResultList);
+                  }
+                }
+              }
+            }
+            reader.skipCurrentPage();
+            continue;
+          }
+          // cal by page data
+          while (reader.hasNextOverlappedPage()) {
+            BatchData nextOverlappedPageData = reader.nextOverlappedPage();
+            for (int i = 0; i < aggregateResultList.size(); i++) {
+              if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
+                AggregateResult aggregateResult = aggregateResultList.get(i);
+                aggregateResult.updateResultFromPageData(nextOverlappedPageData);
+                nextOverlappedPageData.resetBatchData();
+                if (aggregateResult.isCalculatedAggregationResult()) {
+                  isCalculatedList.set(i, true);
+                  remainingToCalculate--;
+                  if (remainingToCalculate == 0) {
+                    return new Pair<>(seriesPath, aggregateResultList);
+                  }
                 }
               }
             }
           }
         }
       }
+      return new Pair<>(seriesPath, aggregateResultList);
+    }
+  }
+
+  /**
+   * execute aggregate function with only time filter or no filter.
+   *
+   * @param context query context
+   */
+  public QueryDataSet executeWithoutValueFilter(QueryContext context)
+      throws StorageEngineException, QueryProcessException {
+
+    Filter timeFilter = null;
+    if (expression != null) {
+      timeFilter = ((GlobalTimeExpression) expression).getFilter();
+    }
+
+    AggregateResult[] finalAggregateResults = new AggregateResult[selectedSeries.size()];
+    Map<Path, List<Integer>> pathToAggrIndexesMap = groupAggregationsBySeries(selectedSeries);
+
+    /*
+     * submit AggregationTask for each series
+     */
+    List<Future<Pair<Path, List<AggregateResult>>>> futureList = new ArrayList<>();
+    for (Map.Entry<Path, List<Integer>> pathToAggrIndex : pathToAggrIndexesMap.entrySet()) {
+
+      Path seriesPath = pathToAggrIndex.getKey();
+      // construct series reader without value filter
+      QueryDataSource queryDataSource = QueryResourceManager.getInstance()
+          .getQueryDataSource(seriesPath, context, timeFilter);
+      // update filter by TTL
+      timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+      TSDataType tsDataType = dataTypes.get(pathToAggrIndex.getValue().get(0));
+
+      IAggregateReader seriesReader = new SeriesAggregateReader(pathToAggrIndex.getKey(),
+          tsDataType, context, queryDataSource, timeFilter, null, null);
+
+      Future<Pair<Path, List<AggregateResult>>> future = pool
+          .submit(new AggregationTask(seriesReader, pathToAggrIndex));
+      futureList.add(future);
+    }
+
+    /*
+     * get AggregateResults for each series and put to final finalAggregateResults
+     */
+    for (Future<Pair<Path, List<AggregateResult>>> future: futureList) {
+      try {
+        Pair<Path, List<AggregateResult>> currentSeriesResults = future.get();
+        // final result index of current series
+        List<Integer> resultIndexList = pathToAggrIndexesMap.get(currentSeriesResults.left);
+        List<AggregateResult> resultList = currentSeriesResults.right;
+
+        // put current series results to final finalAggregateResults
+        for (int i = 0; i < resultIndexList.size(); i++) {
+          finalAggregateResults[resultIndexList.get(i)] = resultList.get(i);
+        }
+      } catch (Exception e) {
+        throw new QueryProcessException(e.getMessage());
+      }
     }
-    return aggregateResultList;
+    return constructDataSet(Arrays.asList(finalAggregateResults));
   }
 
+
   /**
    * execute aggregate function with value filter.
    *
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java
index bd22f3e..58e6894 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.query.reader.series;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 
@@ -50,4 +51,6 @@ public interface IAggregateReader {
   boolean hasNextOverlappedPage() throws IOException;
 
   BatchData nextOverlappedPage() throws IOException;
+
+  TSDataType getSeriesDataType();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
index d711f02..7081c6e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
@@ -93,6 +93,11 @@ public class SeriesAggregateReader implements IAggregateReader {
     return seriesReader.nextOverlappedPage();
   }
 
+  @Override
+  public TSDataType getSeriesDataType() {
+    return seriesReader.getSeriesDataType();
+  }
+
 
   private boolean containedByTimeFilter(Statistics statistics) {
     Filter timeFilter = seriesReader.getTimeFilter();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 8b1f795..c08fcdc 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -53,6 +53,7 @@ import java.util.*;
 public class SeriesReader {
 
   private final Path seriesPath;
+
   private final TSDataType dataType;
   private final QueryContext context;
   private final Filter timeFilter;
@@ -456,6 +457,10 @@ public class SeriesReader {
     return timeFilter;
   }
 
+  public TSDataType getSeriesDataType() {
+    return dataType;
+  }
+
   private class VersionPair<T> {
 
     protected long version;
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java
index da6d3bd..4d1d237 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java
@@ -463,7 +463,7 @@ public class IoTDBAggregationIT {
   }
 
   @Test
-  public void avgSumErrorTest() throws SQLException {
+  public void avgSumErrorTest() {
     try (Connection connection = DriverManager.
         getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
         Statement statement = connection.createStatement()) {
@@ -474,7 +474,7 @@ public class IoTDBAggregationIT {
         resultSet.next();
         fail();
       } catch (Exception e) {
-        Assert.assertEquals("500: Unsupported data type in aggregation AVG : TEXT", e.getMessage());
+        Assert.assertTrue(e.getMessage().contains("Unsupported data type in aggregation AVG : TEXT"));
       }
       try {
         statement.execute("SELECT sum(s3)" +
@@ -483,7 +483,7 @@ public class IoTDBAggregationIT {
         resultSet.next();
         fail();
       } catch (Exception e) {
-        Assert.assertEquals("500: Unsupported data type in aggregation SUM : TEXT", e.getMessage());
+        Assert.assertTrue(e.getMessage().contains("Unsupported data type in aggregation SUM : TEXT"));
       }
       try {
         statement.execute("SELECT avg(s4)" +
@@ -492,7 +492,7 @@ public class IoTDBAggregationIT {
         resultSet.next();
         fail();
       } catch (Exception e) {
-        Assert.assertEquals("500: Unsupported data type in aggregation AVG : BOOLEAN", e.getMessage());
+        Assert.assertTrue(e.getMessage().contains("Unsupported data type in aggregation AVG : BOOLEAN"));
       }
       try {
         statement.execute("SELECT sum(s4)" +
@@ -501,7 +501,7 @@ public class IoTDBAggregationIT {
         resultSet.next();
         fail();
       } catch (Exception e) {
-        Assert.assertEquals("500: Unsupported data type in aggregation SUM : BOOLEAN", e.getMessage());
+        Assert.assertTrue(e.getMessage().contains("Unsupported data type in aggregation SUM : BOOLEAN"));
       }
     } catch (Exception e) {
       e.printStackTrace();
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataIT.java
index 05fbcb1..59fef21 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataIT.java
@@ -205,7 +205,7 @@ public class IoTDBAggregationSmallDataIT {
             "SELECT max_value(d0.s0),max_value(d1.s1),max_value(d0.s3) FROM root.vehicle");
         fail();
       } catch (IoTDBSQLException e) {
-        Assert.assertEquals("500: Binary statistics does not support: max", e.getMessage());
+        Assert.assertTrue(e.getMessage().contains("Binary statistics does not support: max"));
       }
 
       hasResultSet = statement.execute(