You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/07/14 09:42:15 UTC

[incubator-iotdb] 03/08: jira 768

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch jira-768
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 991515bddf1c03272185bb6a590ceecf1a0dcca3
Author: HTHou <hh...@outlook.com>
AuthorDate: Thu Jun 18 15:41:48 2020 +0800

    jira 768
---
 .../db/query/executor/AggregationExecutor.java     | 30 +++++++++----
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  4 +-
 .../org/apache/iotdb/db/utils/FilePathUtils.java   | 52 +++++++++++++++-------
 3 files changed, 59 insertions(+), 27 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index aee2237..94d117a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.io.IOException;
 import java.util.*;
@@ -319,27 +320,40 @@ public class AggregationExecutor {
    *
    * @param aggregateResultList aggregate result list
    */
-  private QueryDataSet constructDataSet(List<AggregateResult> aggregateResultList, RawDataQueryPlan plan) {
+  private QueryDataSet constructDataSet(List<AggregateResult> aggregateResultList, AggregationPlan plan) {
     RowRecord record = new RowRecord(0);
     for (AggregateResult resultData : aggregateResultList) {
       TSDataType dataType = resultData.getResultDataType();
       record.addField(resultData.getResult(), dataType);
     }
 
+    String aggregation = plan.getAggregations().get(0);
     SingleDataSet dataSet = null;
-    if (((AggregationPlan)plan).getLevel() >= 0) {
+    if (plan.getLevel() >= 0) {
       // current only support count operation
       Map<Integer, String> pathIndex = new HashMap<>();
-      Map<String, TSDataType> finalPaths = FilePathUtils.getPathByLevel((AggregationPlan) plan, pathIndex);
-
-      RowRecord curRecord = FilePathUtils.mergeRecordByPath(record, finalPaths, pathIndex);
-
       List<Path> paths = new ArrayList<>();
       List<TSDataType> dataTypes = new ArrayList<>();
-      for (int i = 0; i < finalPaths.size(); i++) {
-        dataTypes.add(TSDataType.DOUBLE);
+      Map<String, Long> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex);
+      RowRecord curRecord = null;
+      switch (aggregation) {
+        case "count":
+          curRecord = FilePathUtils.mergeRecordByPath(record, finalPaths, pathIndex);
+          for (int i = 0; i < finalPaths.size(); i++) {
+            dataTypes.add(TSDataType.INT64);
+          }
+          break;
+        case "avg":
+          curRecord = FilePathUtils.avgRecordByPath(record, finalPaths, pathIndex);
+          for (int i = 0; i < finalPaths.size(); i++) {
+            dataTypes.add(TSDataType.FLOAT);
+          }
+          break;
       }
 
+      
+      
+
       dataSet = new SingleDataSet(paths, dataTypes);
       dataSet.setRecord(curRecord);
     } else {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 3b51b3c..8c852fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -690,8 +690,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       // because the query dataset and query id is different although the header of last query is same.
       return StaticResps.LAST_RESP.deepCopy();
     } else if (plan instanceof AggregationPlan && ((AggregationPlan)plan).getLevel() >= 0) {
-      Map<String, TSDataType> finalPaths = FilePathUtils.getPathByLevel((AggregationPlan) plan, null);
-      for (Map.Entry<String, TSDataType> entry : finalPaths.entrySet()) {
+      Map<String, Long> finalPaths = FilePathUtils.getPathByLevel((AggregationPlan) plan, null);
+      for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
         respColumns.add(((AggregationPlan) plan).getAggregations().get(0) + "(" + entry.getKey() + ")");
         columnsTypes.add(entry.getValue().toString());
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
index 84b11e2..9a7310a 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
@@ -64,13 +64,12 @@ public class FilePathUtils {
    * @param pathIndex
    * @return
    */
-  public static Map<String, TSDataType> getPathByLevel(AggregationPlan plan, Map<Integer, String> pathIndex) {
+  public static Map<String, Long> getPathByLevel(AggregationPlan plan, Map<Integer, String> pathIndex) {
     // pathGroupByLevel -> count
-    Map<String, TSDataType> finalPaths = new TreeMap<>();
+    Map<String, Long> finalPaths = new TreeMap<>();
 
     List<Path> rawPaths = plan.getPaths();
     int level = plan.getLevel();
-    String aggregation = plan.getAggregations().get(0);
     int i = 0;
     for (Path value : rawPaths) {
       String[] tmpPath = MetaUtils.getNodeNames(value.getFullPath());
@@ -89,15 +88,7 @@ public class FilePathUtils {
         }
         key = path.toString();
       }
-      switch (aggregation) {
-        case "sum" :
-          finalPaths.putIfAbsent(key, TSDataType.INT64);
-          break;
-        case "avg" :
-          finalPaths.putIfAbsent(key, TSDataType.INT64);
-          break;
-      }
-      finalPaths.putIfAbsent(key, (float) 0);
+      finalPaths.putIfAbsent(key, 0L);
       if (pathIndex != null) {
         pathIndex.put(i++, key);
       }
@@ -118,15 +109,42 @@ public class FilePathUtils {
    * @return
    */
   public static RowRecord mergeRecordByPath(RowRecord newRecord,
-                                      Map<String, Float> finalPaths,
+                                      Map<String, Long> finalPaths,
                                       Map<Integer, String> pathIndex) {
     if (newRecord.getFields().size() < finalPaths.size()) {
       return null;
     }
 
     // reset final paths
-    for (Map.Entry<String, Float> entry : finalPaths.entrySet()) {
-      entry.setValue((float) 0);
+    for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
+      entry.setValue(0L);
+    }
+
+    RowRecord tmpRecord = new RowRecord(newRecord.getTimestamp());
+
+    for (int i = 0; i < newRecord.getFields().size(); i++) {
+      if (newRecord.getFields().get(i) != null) {
+        finalPaths.put(pathIndex.get(i),
+          finalPaths.get(pathIndex.get(i)) + newRecord.getFields().get(i).getLongV());
+      }
+    }
+
+    for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
+      tmpRecord.addField(Field.getField(entry.getValue(), TSDataType.INT64));
+    }
+
+    return tmpRecord;
+  }
+
+  public static RowRecord avgRecordByPath(RowRecord newRecord, Map<String, Long> finalPaths,
+      Map<Integer, String> pathIndex) {
+    if (newRecord.getFields().size() < finalPaths.size()) {
+      return null;
+    }
+
+    // reset final paths
+    for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
+      entry.setValue(0L);
     }
 
     RowRecord tmpRecord = new RowRecord(newRecord.getTimestamp());
@@ -138,8 +156,8 @@ public class FilePathUtils {
       }
     }
 
-    for (Map.Entry<String, Float> entry : finalPaths.entrySet()) {
-      tmpRecord.addField(Field.getField(entry.getValue(), TSDataType.FLOAT));
+    for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
+      tmpRecord.addField(Field.getField(entry.getValue(), TSDataType.INT64));
     }
 
     return tmpRecord;