You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/07/14 09:42:17 UTC

[incubator-iotdb] 05/08: support sum

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch jira-768
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 8c3b9af22bc9ed68c12a77bb4757effece2ce456
Author: HTHou <hh...@outlook.com>
AuthorDate: Mon Jun 22 15:36:38 2020 +0800

    support sum
---
 .../query/dataset/groupby/GroupByTimeDataSet.java  |  3 +-
 .../db/query/executor/AggregationExecutor.java     |  6 +--
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  3 +-
 .../org/apache/iotdb/db/utils/FilePathUtils.java   | 58 +++++++++++++++++++---
 4 files changed, 57 insertions(+), 13 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
index 4fe932d..e2005ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
@@ -65,7 +65,8 @@ public class GroupByTimeDataSet extends QueryDataSet {
       logger.debug("only group by level, paths:" + groupByTimePlan.getPaths());
     }
     while (dataSet != null && dataSet.hasNextWithoutConstraint()) {
-      RowRecord curRecord = FilePathUtils.mergeRecordByPath(dataSet.nextWithoutConstraint(), finalPaths, pathIndex);
+      RowRecord curRecord = FilePathUtils
+          .mergeRecordByPath(dataSet.nextWithoutConstraint(), finalPaths, pathIndex, dataTypes.get(0));
       if (curRecord != null) {
         records.add(curRecord);
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 8cb4ff5..5895280 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -337,16 +337,16 @@ public class AggregationExecutor {
       switch (aggregation) {
         case "count":
           Map<String, Long> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex);
-          curRecord = FilePathUtils.mergeRecordByPath(record, finalPaths, pathIndex);
+          curRecord = FilePathUtils.mergeRecordByPath(record, finalPaths, pathIndex, TSDataType.INT64);
           for (int i = 0; i < finalPaths.size(); i++) {
             dataTypes.add(TSDataType.INT64);
           }
           break;
         case "sum":
           Map<String, Long> finalPathsSum = FilePathUtils.getPathByLevel(plan, pathIndex);
-          curRecord = FilePathUtils.mergeRecordByPath(record, finalPathsSum, pathIndex);
+          curRecord = FilePathUtils.mergeRecordByPath(record, finalPathsSum, pathIndex, TSDataType.DOUBLE);
           for (int i = 0; i < finalPathsSum.size(); i++) {
-            dataTypes.add(TSDataType.INT64);
+            dataTypes.add(TSDataType.DOUBLE);
           }
           break;
         case "avg":
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index df378d6..5be346d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -691,9 +691,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       return StaticResps.LAST_RESP.deepCopy();
     } else if (plan instanceof AggregationPlan && ((AggregationPlan)plan).getLevel() >= 0) {
       Map<String, Long> finalPaths = FilePathUtils.getPathByLevel((AggregationPlan) plan, null);
+      TSDataType type = FilePathUtils.getTSDataType((AggregationPlan) plan);
       for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
         respColumns.add(((AggregationPlan) plan).getAggregations().get(0) + "(" + entry.getKey() + ")");
-        columnsTypes.add(TSDataType.INT64.toString());
+        columnsTypes.add(type.toString());
       }
     } else {
       getWideQueryHeaders(plan, respColumns, columnsTypes);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
index 4270df8..2057300 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.utils;
 
 import java.io.File;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -151,32 +152,61 @@ public class FilePathUtils {
    */
   public static RowRecord mergeRecordByPath(RowRecord newRecord,
                                       Map<String, Long> finalPaths,
-                                      Map<Integer, String> pathIndex) {
+                                      Map<Integer, String> pathIndex,
+                                      TSDataType type) {
     if (newRecord.getFields().size() < finalPaths.size()) {
       return null;
     }
 
+    Map<String, Object> finalPathMap = new HashMap<>();
     // reset final paths
-    for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
-      entry.setValue(0L);
-    }
+    initFinalPathMap(finalPathMap, finalPaths, type);
 
     RowRecord tmpRecord = new RowRecord(newRecord.getTimestamp());
 
     for (int i = 0; i < newRecord.getFields().size(); i++) {
       if (newRecord.getFields().get(i) != null) {
-        finalPaths.put(pathIndex.get(i),
-          finalPaths.get(pathIndex.get(i)) + newRecord.getFields().get(i).getLongV());
+        finalPathMap.put(pathIndex.get(i), getValue(type, newRecord.getFields().get(i), finalPathMap.get(pathIndex.get(i))));
       }
     }
 
-    for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
-      tmpRecord.addField(Field.getField(entry.getValue(), TSDataType.INT64));
+    for (Map.Entry<String, Object> entry : finalPathMap.entrySet()) {
+      tmpRecord.addField(Field.getField(entry.getValue(), type));
     }
 
     return tmpRecord;
   }
 
+  private static void initFinalPathMap(Map<String, Object> finalPathMap, Map<String, Long> finalPaths, TSDataType type) {
+    switch (type) {
+      case INT64 :
+        for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
+          finalPathMap.put(entry.getKey(), 0L);
+        }
+        break;
+      case DOUBLE :
+        for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
+          finalPathMap.put(entry.getKey(), 0D);
+        }
+        break;
+      default :
+        for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
+          finalPathMap.put(entry.getKey(), 0L);
+        }
+    }
+  }
+
+  private static Object getValue(TSDataType type, Field field, Object before) {
+    switch (type) {
+      case INT64 :
+        return ((Long) before) + field.getLongV(); 
+      case DOUBLE :
+        return ((Double) before) + field.getDoubleV(); 
+      default :
+        return ((Long) before) + field.getLongV(); 
+    }
+  }
+
   public static RowRecord avgRecordByPath(RowRecord newRecord, Map<String, Float> finalPaths,
       Map<Integer, String> pathIndex) {
     if (newRecord.getFields().size() < finalPaths.size()) {
@@ -204,4 +234,16 @@ public class FilePathUtils {
     return tmpRecord;
   }
 
+  public static TSDataType getTSDataType(AggregationPlan plan) {
+    String aggregation = plan.getAggregations().get(0);
+    switch (aggregation) {
+      case "count" :
+        return TSDataType.INT64;
+      case "sum" :
+        return TSDataType.DOUBLE;
+      default :
+        return TSDataType.INT64;
+    }
+  }
+
 }