You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/07/14 09:42:16 UTC

[incubator-iotdb] 04/08: sum

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch jira-768
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 00f33e3e6fd73400d1fef9983ad8546281a70c30
Author: HTHou <hh...@outlook.com>
AuthorDate: Fri Jun 19 14:19:51 2020 +0800

    sum
---
 .../query/dataset/groupby/GroupByTimeDataSet.java  |  2 +-
 .../db/query/executor/AggregationExecutor.java     | 22 ++++++----
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  2 +-
 .../org/apache/iotdb/db/utils/FilePathUtils.java   | 51 +++++++++++++++++++---
 4 files changed, 62 insertions(+), 15 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
index 224c04c..4fe932d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
@@ -58,7 +58,7 @@ public class GroupByTimeDataSet extends QueryDataSet {
     }
 
     Map<Integer, String> pathIndex = new HashMap<>();
-    Map<String, Float> finalPaths = FilePathUtils.getPathByLevel(plan.getPaths(), plan.getLevel(), pathIndex);
+    Map<String, Long> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex);
 
     // get all records from GroupByDataSet, then we merge every record
     if (logger.isDebugEnabled()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 94d117a..8cb4ff5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -48,7 +48,6 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
-import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.io.IOException;
 import java.util.*;
@@ -275,7 +274,7 @@ public class AggregationExecutor {
       aggregateResults.add(result);
     }
     aggregateWithValueFilter(aggregateResults, timestampGenerator, readersOfSelectedSeries);
-    return constructDataSet(aggregateResults, queryPlan);
+    return constructDataSet(aggregateResults, (AggregationPlan) queryPlan);
   }
 
   protected TimeGenerator getTimeGenerator(QueryContext context, RawDataQueryPlan queryPlan) throws StorageEngineException {
@@ -334,26 +333,33 @@ public class AggregationExecutor {
       Map<Integer, String> pathIndex = new HashMap<>();
       List<Path> paths = new ArrayList<>();
       List<TSDataType> dataTypes = new ArrayList<>();
-      Map<String, Long> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex);
       RowRecord curRecord = null;
       switch (aggregation) {
         case "count":
+          Map<String, Long> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex);
           curRecord = FilePathUtils.mergeRecordByPath(record, finalPaths, pathIndex);
           for (int i = 0; i < finalPaths.size(); i++) {
             dataTypes.add(TSDataType.INT64);
           }
           break;
+        case "sum":
+          Map<String, Long> finalPathsSum = FilePathUtils.getPathByLevel(plan, pathIndex);
+          curRecord = FilePathUtils.mergeRecordByPath(record, finalPathsSum, pathIndex);
+          for (int i = 0; i < finalPathsSum.size(); i++) {
+            dataTypes.add(TSDataType.INT64);
+          }
+          break;
         case "avg":
-          curRecord = FilePathUtils.avgRecordByPath(record, finalPaths, pathIndex);
-          for (int i = 0; i < finalPaths.size(); i++) {
+          Map<String, Float> finalPathsAvg = FilePathUtils.getPathByLevelAvg(plan, pathIndex);
+          curRecord = FilePathUtils.avgRecordByPath(record, finalPathsAvg, pathIndex);
+          for (int i = 0; i < finalPathsAvg.size(); i++) {
             dataTypes.add(TSDataType.FLOAT);
           }
           break;
+        default:
+          break;
       }
-
       
-      
-
       dataSet = new SingleDataSet(paths, dataTypes);
       dataSet.setRecord(curRecord);
     } else {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 8c852fc..df378d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -693,7 +693,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       Map<String, Long> finalPaths = FilePathUtils.getPathByLevel((AggregationPlan) plan, null);
       for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
         respColumns.add(((AggregationPlan) plan).getAggregations().get(0) + "(" + entry.getKey() + ")");
-        columnsTypes.add(entry.getValue().toString());
+        columnsTypes.add(TSDataType.INT64.toString());
       }
     } else {
       getWideQueryHeaders(plan, respColumns, columnsTypes);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
index 9a7310a..4270df8 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
@@ -98,6 +98,47 @@ public class FilePathUtils {
   }
 
   /**
+   * get paths from group by level, like root.sg1.d2.s0, root.sg1.d1.s1
+   * level=1, return [root.sg1, 0] and pathIndex turns to be [[0, root.sg1], [1, root.sg1]]
+   * @param rawPaths
+   * @param level
+   * @param pathIndex
+   * @return
+   */
+  public static Map<String, Float> getPathByLevelAvg(AggregationPlan plan, Map<Integer, String> pathIndex) {
+    // pathGroupByLevel -> count
+    Map<String, Float> finalPaths = new TreeMap<>();
+
+    List<Path> rawPaths = plan.getPaths();
+    int level = plan.getLevel();
+    int i = 0;
+    for (Path value : rawPaths) {
+      String[] tmpPath = MetaUtils.getNodeNames(value.getFullPath());
+
+      String key;
+      if (tmpPath.length <= level) {
+        key = value.getFullPath();
+      } else {
+        StringBuilder path = new StringBuilder();
+        for (int k = 0; k <= level; k++) {
+          if (k == 0) {
+            path.append(tmpPath[k]);
+          } else {
+            path.append(".").append(tmpPath[k]);
+          }
+        }
+        key = path.toString();
+      }
+      finalPaths.putIfAbsent(key, 0F);
+      if (pathIndex != null) {
+        pathIndex.put(i++, key);
+      }
+    }
+
+    return finalPaths;
+  }
+
+  /**
    * merge the raw record by level, for example
    * raw record [timestamp, root.sg1.d1.s0, root.sg1.d1.s1, root.sg1.d2.s2], level=1
    * and newRecord data is [100, 1, 1, 1]
@@ -136,15 +177,15 @@ public class FilePathUtils {
     return tmpRecord;
   }
 
-  public static RowRecord avgRecordByPath(RowRecord newRecord, Map<String, Long> finalPaths,
+  public static RowRecord avgRecordByPath(RowRecord newRecord, Map<String, Float> finalPaths,
       Map<Integer, String> pathIndex) {
     if (newRecord.getFields().size() < finalPaths.size()) {
       return null;
     }
 
     // reset final paths
-    for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
-      entry.setValue(0L);
+    for (Map.Entry<String, Float> entry : finalPaths.entrySet()) {
+      entry.setValue(0F);
     }
 
     RowRecord tmpRecord = new RowRecord(newRecord.getTimestamp());
@@ -156,8 +197,8 @@ public class FilePathUtils {
       }
     }
 
-    for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
-      tmpRecord.addField(Field.getField(entry.getValue(), TSDataType.INT64));
+    for (Map.Entry<String, Float> entry : finalPaths.entrySet()) {
+      tmpRecord.addField(Field.getField(entry.getValue(), TSDataType.FLOAT));
     }
 
     return tmpRecord;