You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/07/14 09:42:19 UTC

[incubator-iotdb] 07/08: support max min value and max min time

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch jira-768
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 0428bd10df8101d3e2b545fc69938fc60afbc5dd
Author: HTHou <hh...@outlook.com>
AuthorDate: Tue Jul 14 17:39:03 2020 +0800

    support max min value and max min time
---
 .../query/dataset/groupby/GroupByTimeDataSet.java  |   2 +-
 .../db/query/executor/AggregationExecutor.java     |  55 +++++++--
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  11 +-
 .../org/apache/iotdb/db/utils/FilePathUtils.java   | 125 ++++++++++++++++-----
 4 files changed, 150 insertions(+), 43 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
index e2005ce..caf50da 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
@@ -58,7 +58,7 @@ public class GroupByTimeDataSet extends QueryDataSet {
     }
 
     Map<Integer, String> pathIndex = new HashMap<>();
-    Map<String, Long> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex);
+    Set<String> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex);
 
     // get all records from GroupByDataSet, then we merge every record
     if (logger.isDebugEnabled()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 5895280..922621a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -318,42 +318,79 @@ public class AggregationExecutor {
    * using aggregate result data list construct QueryDataSet.
    *
    * @param aggregateResultList aggregate result list
+   * @throws QueryProcessException 
    */
-  private QueryDataSet constructDataSet(List<AggregateResult> aggregateResultList, AggregationPlan plan) {
+  private QueryDataSet constructDataSet(List<AggregateResult> aggregateResultList, AggregationPlan plan) 
+      throws QueryProcessException {
     RowRecord record = new RowRecord(0);
     for (AggregateResult resultData : aggregateResultList) {
       TSDataType dataType = resultData.getResultDataType();
       record.addField(resultData.getResult(), dataType);
     }
-
-    String aggregation = plan.getAggregations().get(0);
     SingleDataSet dataSet = null;
     if (plan.getLevel() >= 0) {
-      // current only support count operation
+      if (plan.getAggregations().size() > 1) {
+        //throw new QueryProcessException("Group by level doesn't support multiple aggregations");
+      }
+      // TODO: Check data type here
+      
+      String aggregation = plan.getAggregations().get(0);
+      
       Map<Integer, String> pathIndex = new HashMap<>();
       List<Path> paths = new ArrayList<>();
       List<TSDataType> dataTypes = new ArrayList<>();
       RowRecord curRecord = null;
+      Set<String> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex);
       switch (aggregation) {
         case "count":
-          Map<String, Long> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex);
           curRecord = FilePathUtils.mergeRecordByPath(record, finalPaths, pathIndex, TSDataType.INT64);
           for (int i = 0; i < finalPaths.size(); i++) {
             dataTypes.add(TSDataType.INT64);
           }
           break;
         case "sum":
-          Map<String, Long> finalPathsSum = FilePathUtils.getPathByLevel(plan, pathIndex);
-          curRecord = FilePathUtils.mergeRecordByPath(record, finalPathsSum, pathIndex, TSDataType.DOUBLE);
-          for (int i = 0; i < finalPathsSum.size(); i++) {
+          // Check datatype
+          curRecord = FilePathUtils.mergeRecordByPath(record, finalPaths, pathIndex, TSDataType.DOUBLE);
+          for (int i = 0; i < finalPaths.size(); i++) {
             dataTypes.add(TSDataType.DOUBLE);
           }
           break;
         case "avg":
+          // Check datatype
           Map<String, Float> finalPathsAvg = FilePathUtils.getPathByLevelAvg(plan, pathIndex);
           curRecord = FilePathUtils.avgRecordByPath(record, finalPathsAvg, pathIndex);
           for (int i = 0; i < finalPathsAvg.size(); i++) {
-            dataTypes.add(TSDataType.FLOAT);
+            dataTypes.add(TSDataType.DOUBLE);
+          }
+          break;
+        case "max_time":
+          curRecord = FilePathUtils.mergeMaxOrMinByPath(record, TSDataType.INT64, finalPaths, 
+              pathIndex, true);
+          for (int i = 0; i < finalPaths.size(); i++) {
+            dataTypes.add(TSDataType.INT64);
+          }
+          break;
+        case "min_time":
+          curRecord = FilePathUtils.mergeMaxOrMinByPath(record, TSDataType.INT64, finalPaths,
+              pathIndex, false);
+          for (int i = 0; i < finalPaths.size(); i++) {
+            dataTypes.add(TSDataType.INT64);
+          }
+          break;
+        case "max_value":
+          // Check datatype
+          curRecord = FilePathUtils.mergeMaxOrMinByPath(record, plan.getDeduplicatedDataTypes().get(0), 
+              finalPaths, pathIndex, true);
+          for (int i = 0; i < finalPaths.size(); i++) {
+            dataTypes.add(plan.getDeduplicatedDataTypes().get(0));
+          }
+          break;
+        case "min_value":
+          // Check datatype
+          curRecord = FilePathUtils.mergeMaxOrMinByPath(record, plan.getDeduplicatedDataTypes().get(0), 
+              finalPaths, pathIndex, false);
+          for (int i = 0; i < finalPaths.size(); i++) {
+            dataTypes.add(plan.getDeduplicatedDataTypes().get(0));
           }
           break;
         default:
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index ad464d4..a50d42f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -738,12 +738,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       // Last Query should return different respond instead of the static one
       // because the query dataset and query id is different although the header of last query is same.
       return StaticResps.LAST_RESP.deepCopy();
-    } else if (plan instanceof AggregationPlan && ((AggregationPlan) plan).getLevel() >= 0) {
-      Map<String, Long> finalPaths = FilePathUtils
-          .getPathByLevel(((AggregationPlan) plan).getDeduplicatedPaths(),
-              ((AggregationPlan) plan).getLevel(), null);
-      for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
-        respColumns.add(((AggregationPlan) plan).getAggregations().get(0) + "(" + entry.getKey() + ")");
+    } else if (plan instanceof AggregationPlan && ((AggregationPlan)plan).getLevel() >= 0) {
+      Set<String> finalPaths = FilePathUtils.getPathByLevel((AggregationPlan) plan, null);
+      TSDataType type = FilePathUtils.getTSDataType((AggregationPlan) plan);
+      for (String path : finalPaths) {
+        respColumns.add(((AggregationPlan) plan).getAggregations().get(0) + "(" + path + ")");
         columnsTypes.add(type.toString());
       }
     } else {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
index 2057300..ecb41b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
@@ -19,10 +19,11 @@
 package org.apache.iotdb.db.utils;
 
 import java.io.File;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
 
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.metadata.MetaUtils;
@@ -65,9 +66,9 @@ public class FilePathUtils {
    * @param pathIndex
    * @return
    */
-  public static Map<String, Long> getPathByLevel(AggregationPlan plan, Map<Integer, String> pathIndex) {
+  public static Set<String> getPathByLevel(AggregationPlan plan, Map<Integer, String> pathIndex) {
     // pathGroupByLevel -> count
-    Map<String, Long> finalPaths = new TreeMap<>();
+    Set<String> finalPaths = new TreeSet<>();
 
     List<Path> rawPaths = plan.getPaths();
     int level = plan.getLevel();
@@ -89,7 +90,7 @@ public class FilePathUtils {
         }
         key = path.toString();
       }
-      finalPaths.putIfAbsent(key, 0L);
+      finalPaths.add(key);
       if (pathIndex != null) {
         pathIndex.put(i++, key);
       }
@@ -151,60 +152,121 @@ public class FilePathUtils {
    * @return
    */
   public static RowRecord mergeRecordByPath(RowRecord newRecord,
-                                      Map<String, Long> finalPaths,
-                                      Map<Integer, String> pathIndex,
-                                      TSDataType type) {
+      Set<String> finalPaths, Map<Integer, String> pathIndex,
+      TSDataType type) {
     if (newRecord.getFields().size() < finalPaths.size()) {
       return null;
     }
 
-    Map<String, Object> finalPathMap = new HashMap<>();
+    Map<String, Object> finalPathMap = new TreeMap<>();
     // reset final paths
-    initFinalPathMap(finalPathMap, finalPaths, type);
+    initFinalPathMap(finalPathMap, finalPaths, type, 0);
 
     RowRecord tmpRecord = new RowRecord(newRecord.getTimestamp());
 
     for (int i = 0; i < newRecord.getFields().size(); i++) {
       if (newRecord.getFields().get(i) != null) {
-        finalPathMap.put(pathIndex.get(i), getValue(type, newRecord.getFields().get(i), finalPathMap.get(pathIndex.get(i))));
+        finalPathMap.put(pathIndex.get(i), getSum(type, newRecord.getFields().get(i), 
+            finalPathMap.get(pathIndex.get(i))));
       }
     }
 
     for (Map.Entry<String, Object> entry : finalPathMap.entrySet()) {
       tmpRecord.addField(Field.getField(entry.getValue(), type));
     }
-
     return tmpRecord;
   }
 
-  private static void initFinalPathMap(Map<String, Object> finalPathMap, Map<String, Long> finalPaths, TSDataType type) {
+  private static void initFinalPathMap(Map<String, Object> finalPathMap,
+      Set<String> finalPaths, TSDataType type, int initValue) {
+    for (String path : finalPaths) {
+      switch (type) {
+        case INT32 :
+          finalPathMap.put(path, initValue);
+          break;
+        case INT64 :
+          if (initValue == 0) {
+            finalPathMap.put(path, 0L);
+          } else if (initValue > 0) {
+            finalPathMap.put(path, Long.MAX_VALUE);
+          } else {
+            finalPathMap.put(path, Long.MIN_VALUE);
+          }
+          break;
+        case FLOAT :
+          if (initValue == 0) {
+            finalPathMap.put(path, 0F);
+          } else if (initValue > 0) {
+            finalPathMap.put(path, Float.MAX_VALUE);
+          } else {
+            finalPathMap.put(path, Float.MIN_VALUE);
+          }
+          break;
+        case DOUBLE :
+          if (initValue == 0) {
+            finalPathMap.put(path, 0D);
+          } else if (initValue > 0) {
+            finalPathMap.put(path, Double.MAX_VALUE);
+          } else {
+            finalPathMap.put(path, Double.MIN_VALUE);
+          }
+          break;
+        default :
+          finalPathMap.put(path, 0L);
+      }
+    }
+  }
+
+  private static Object getSum(TSDataType type, Field field, Object before) {
     switch (type) {
       case INT64 :
-        for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
-          finalPathMap.put(entry.getKey(), 0L);
-        }
-        break;
+        return ((Long) before) + field.getLongV(); 
       case DOUBLE :
-        for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
-          finalPathMap.put(entry.getKey(), 0D);
-        }
-        break;
+        return ((Double) before) + field.getDoubleV(); 
       default :
-        for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
-          finalPathMap.put(entry.getKey(), 0L);
-        }
+        return ((Long) before) + field.getLongV(); 
     }
   }
 
-  private static Object getValue(TSDataType type, Field field, Object before) {
+  private static Object getMaxOrMin(TSDataType type, Field field, Object before, boolean isMax) {
     switch (type) {
       case INT64 :
-        return ((Long) before) + field.getLongV(); 
+        return isMax ? Math.max(((Long) before), field.getLongV())
+            : Math.min(((Long) before), field.getLongV());
+      case INT32 :
+        return isMax ? Math.max(((Integer) before), field.getIntV())
+            : Math.min(((Integer) before), field.getIntV());
       case DOUBLE :
-        return ((Double) before) + field.getDoubleV(); 
+        return isMax ? Math.max(((Double) before), field.getDoubleV())
+            : Math.min(((Double) before), field.getDoubleV());
+      case FLOAT :
+        return isMax ? Math.max(((Float) before), field.getFloatV())
+            : Math.min(((Float) before), field.getFloatV());
       default :
-        return ((Long) before) + field.getLongV(); 
+        return isMax ? Math.max(((Long) before), field.getLongV())
+            : Math.min(((Long) before), field.getLongV());
+    }
+  }
+
+  public static RowRecord mergeMaxOrMinByPath(RowRecord newRecord, TSDataType type,
+      Set<String> finalPaths, Map<Integer, String> pathIndex, boolean isMax) {
+    if (newRecord.getFields().size() < finalPaths.size()) {
+      return null;
+    }
+    Map<String, Object> finalPathMap = new TreeMap<>();
+    // reset final paths
+    initFinalPathMap(finalPathMap, finalPaths, type, isMax ? Integer.MIN_VALUE : Integer.MAX_VALUE);
+    for (int i = 0; i < newRecord.getFields().size(); i++) {
+      if (newRecord.getFields().get(i) != null) {
+        finalPathMap.put(pathIndex.get(i), getMaxOrMin(type, newRecord.getFields().get(i), 
+            finalPathMap.get(pathIndex.get(i)), isMax));
+      }
     }
+    RowRecord tmpRecord = new RowRecord(newRecord.getTimestamp());
+    for (Map.Entry<String, Object> entry : finalPathMap.entrySet()) {
+      tmpRecord.addField(Field.getField(entry.getValue(), type));
+    }
+    return tmpRecord;
   }
 
   public static RowRecord avgRecordByPath(RowRecord newRecord, Map<String, Float> finalPaths,
@@ -239,8 +301,17 @@ public class FilePathUtils {
     switch (aggregation) {
       case "count" :
         return TSDataType.INT64;
+      case "avg" :
       case "sum" :
         return TSDataType.DOUBLE;
+      case "first_value" :
+      case "last_value" :
+      case "max_value" :
+      case "min_value" :
+        return plan.getDeduplicatedDataTypes().get(0);
+      case "max_time" :
+      case "min_time" :
+        return TSDataType.INT64;
       default :
         return TSDataType.INT64;
     }