You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/07/14 09:42:19 UTC
[incubator-iotdb] 07/08: support max min value and max min time
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch jira-768
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 0428bd10df8101d3e2b545fc69938fc60afbc5dd
Author: HTHou <hh...@outlook.com>
AuthorDate: Tue Jul 14 17:39:03 2020 +0800
support max min value and max min time
---
.../query/dataset/groupby/GroupByTimeDataSet.java | 2 +-
.../db/query/executor/AggregationExecutor.java | 55 +++++++--
.../org/apache/iotdb/db/service/TSServiceImpl.java | 11 +-
.../org/apache/iotdb/db/utils/FilePathUtils.java | 125 ++++++++++++++++-----
4 files changed, 150 insertions(+), 43 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
index e2005ce..caf50da 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
@@ -58,7 +58,7 @@ public class GroupByTimeDataSet extends QueryDataSet {
}
Map<Integer, String> pathIndex = new HashMap<>();
- Map<String, Long> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex);
+ Set<String> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex);
// get all records from GroupByDataSet, then we merge every record
if (logger.isDebugEnabled()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 5895280..922621a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -318,42 +318,79 @@ public class AggregationExecutor {
* using aggregate result data list construct QueryDataSet.
*
* @param aggregateResultList aggregate result list
+ * @throws QueryProcessException
*/
- private QueryDataSet constructDataSet(List<AggregateResult> aggregateResultList, AggregationPlan plan) {
+ private QueryDataSet constructDataSet(List<AggregateResult> aggregateResultList, AggregationPlan plan)
+ throws QueryProcessException {
RowRecord record = new RowRecord(0);
for (AggregateResult resultData : aggregateResultList) {
TSDataType dataType = resultData.getResultDataType();
record.addField(resultData.getResult(), dataType);
}
-
- String aggregation = plan.getAggregations().get(0);
SingleDataSet dataSet = null;
if (plan.getLevel() >= 0) {
- // current only support count operation
+ if (plan.getAggregations().size() > 1) {
+ //throw new QueryProcessException("Group by level doesn't support multiple aggregations");
+ }
+ // TODO: Check data type here
+
+ String aggregation = plan.getAggregations().get(0);
+
Map<Integer, String> pathIndex = new HashMap<>();
List<Path> paths = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
RowRecord curRecord = null;
+ Set<String> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex);
switch (aggregation) {
case "count":
- Map<String, Long> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex);
curRecord = FilePathUtils.mergeRecordByPath(record, finalPaths, pathIndex, TSDataType.INT64);
for (int i = 0; i < finalPaths.size(); i++) {
dataTypes.add(TSDataType.INT64);
}
break;
case "sum":
- Map<String, Long> finalPathsSum = FilePathUtils.getPathByLevel(plan, pathIndex);
- curRecord = FilePathUtils.mergeRecordByPath(record, finalPathsSum, pathIndex, TSDataType.DOUBLE);
- for (int i = 0; i < finalPathsSum.size(); i++) {
+ // Check datatype
+ curRecord = FilePathUtils.mergeRecordByPath(record, finalPaths, pathIndex, TSDataType.DOUBLE);
+ for (int i = 0; i < finalPaths.size(); i++) {
dataTypes.add(TSDataType.DOUBLE);
}
break;
case "avg":
+ // Check datatype
Map<String, Float> finalPathsAvg = FilePathUtils.getPathByLevelAvg(plan, pathIndex);
curRecord = FilePathUtils.avgRecordByPath(record, finalPathsAvg, pathIndex);
for (int i = 0; i < finalPathsAvg.size(); i++) {
- dataTypes.add(TSDataType.FLOAT);
+ dataTypes.add(TSDataType.DOUBLE);
+ }
+ break;
+ case "max_time":
+ curRecord = FilePathUtils.mergeMaxOrMinByPath(record, TSDataType.INT64, finalPaths,
+ pathIndex, true);
+ for (int i = 0; i < finalPaths.size(); i++) {
+ dataTypes.add(TSDataType.INT64);
+ }
+ break;
+ case "min_time":
+ curRecord = FilePathUtils.mergeMaxOrMinByPath(record, TSDataType.INT64, finalPaths,
+ pathIndex, false);
+ for (int i = 0; i < finalPaths.size(); i++) {
+ dataTypes.add(TSDataType.INT64);
+ }
+ break;
+ case "max_value":
+ // Check datatype
+ curRecord = FilePathUtils.mergeMaxOrMinByPath(record, plan.getDeduplicatedDataTypes().get(0),
+ finalPaths, pathIndex, true);
+ for (int i = 0; i < finalPaths.size(); i++) {
+ dataTypes.add(plan.getDeduplicatedDataTypes().get(0));
+ }
+ break;
+ case "min_value":
+ // Check datatype
+ curRecord = FilePathUtils.mergeMaxOrMinByPath(record, plan.getDeduplicatedDataTypes().get(0),
+ finalPaths, pathIndex, false);
+ for (int i = 0; i < finalPaths.size(); i++) {
+ dataTypes.add(plan.getDeduplicatedDataTypes().get(0));
}
break;
default:
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index ad464d4..a50d42f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -738,12 +738,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
// Last Query should return different respond instead of the static one
// because the query dataset and query id is different although the header of last query is same.
return StaticResps.LAST_RESP.deepCopy();
- } else if (plan instanceof AggregationPlan && ((AggregationPlan) plan).getLevel() >= 0) {
- Map<String, Long> finalPaths = FilePathUtils
- .getPathByLevel(((AggregationPlan) plan).getDeduplicatedPaths(),
- ((AggregationPlan) plan).getLevel(), null);
- for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
- respColumns.add(((AggregationPlan) plan).getAggregations().get(0) + "(" + entry.getKey() + ")");
+ } else if (plan instanceof AggregationPlan && ((AggregationPlan)plan).getLevel() >= 0) {
+ Set<String> finalPaths = FilePathUtils.getPathByLevel((AggregationPlan) plan, null);
+ TSDataType type = FilePathUtils.getTSDataType((AggregationPlan) plan);
+ for (String path : finalPaths) {
+ respColumns.add(((AggregationPlan) plan).getAggregations().get(0) + "(" + path + ")");
columnsTypes.add(type.toString());
}
} else {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
index 2057300..ecb41b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
@@ -19,10 +19,11 @@
package org.apache.iotdb.db.utils;
import java.io.File;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
+import java.util.TreeSet;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.metadata.MetaUtils;
@@ -65,9 +66,9 @@ public class FilePathUtils {
* @param pathIndex
* @return
*/
- public static Map<String, Long> getPathByLevel(AggregationPlan plan, Map<Integer, String> pathIndex) {
+ public static Set<String> getPathByLevel(AggregationPlan plan, Map<Integer, String> pathIndex) {
// pathGroupByLevel -> count
- Map<String, Long> finalPaths = new TreeMap<>();
+ Set<String> finalPaths = new TreeSet<>();
List<Path> rawPaths = plan.getPaths();
int level = plan.getLevel();
@@ -89,7 +90,7 @@ public class FilePathUtils {
}
key = path.toString();
}
- finalPaths.putIfAbsent(key, 0L);
+ finalPaths.add(key);
if (pathIndex != null) {
pathIndex.put(i++, key);
}
@@ -151,60 +152,121 @@ public class FilePathUtils {
* @return
*/
public static RowRecord mergeRecordByPath(RowRecord newRecord,
- Map<String, Long> finalPaths,
- Map<Integer, String> pathIndex,
- TSDataType type) {
+ Set<String> finalPaths, Map<Integer, String> pathIndex,
+ TSDataType type) {
if (newRecord.getFields().size() < finalPaths.size()) {
return null;
}
- Map<String, Object> finalPathMap = new HashMap<>();
+ Map<String, Object> finalPathMap = new TreeMap<>();
// reset final paths
- initFinalPathMap(finalPathMap, finalPaths, type);
+ initFinalPathMap(finalPathMap, finalPaths, type, 0);
RowRecord tmpRecord = new RowRecord(newRecord.getTimestamp());
for (int i = 0; i < newRecord.getFields().size(); i++) {
if (newRecord.getFields().get(i) != null) {
- finalPathMap.put(pathIndex.get(i), getValue(type, newRecord.getFields().get(i), finalPathMap.get(pathIndex.get(i))));
+ finalPathMap.put(pathIndex.get(i), getSum(type, newRecord.getFields().get(i),
+ finalPathMap.get(pathIndex.get(i))));
}
}
for (Map.Entry<String, Object> entry : finalPathMap.entrySet()) {
tmpRecord.addField(Field.getField(entry.getValue(), type));
}
-
return tmpRecord;
}
- private static void initFinalPathMap(Map<String, Object> finalPathMap, Map<String, Long> finalPaths, TSDataType type) {
+ private static void initFinalPathMap(Map<String, Object> finalPathMap,
+ Set<String> finalPaths, TSDataType type, int initValue) {
+ for (String path : finalPaths) {
+ switch (type) {
+ case INT32 :
+ finalPathMap.put(path, initValue);
+ break;
+ case INT64 :
+ if (initValue == 0) {
+ finalPathMap.put(path, 0L);
+ } else if (initValue > 0) {
+ finalPathMap.put(path, Long.MAX_VALUE);
+ } else {
+ finalPathMap.put(path, Long.MIN_VALUE);
+ }
+ break;
+ case FLOAT :
+ if (initValue == 0) {
+ finalPathMap.put(path, 0F);
+ } else if (initValue > 0) {
+ finalPathMap.put(path, Float.MAX_VALUE);
+ } else {
+ finalPathMap.put(path, Float.MIN_VALUE);
+ }
+ break;
+ case DOUBLE :
+ if (initValue == 0) {
+ finalPathMap.put(path, 0D);
+ } else if (initValue > 0) {
+ finalPathMap.put(path, Double.MAX_VALUE);
+ } else {
+ finalPathMap.put(path, Double.MIN_VALUE);
+ }
+ break;
+ default :
+ finalPathMap.put(path, 0L);
+ }
+ }
+ }
+
+ private static Object getSum(TSDataType type, Field field, Object before) {
switch (type) {
case INT64 :
- for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
- finalPathMap.put(entry.getKey(), 0L);
- }
- break;
+ return ((Long) before) + field.getLongV();
case DOUBLE :
- for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
- finalPathMap.put(entry.getKey(), 0D);
- }
- break;
+ return ((Double) before) + field.getDoubleV();
default :
- for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
- finalPathMap.put(entry.getKey(), 0L);
- }
+ return ((Long) before) + field.getLongV();
}
}
- private static Object getValue(TSDataType type, Field field, Object before) {
+ private static Object getMaxOrMin(TSDataType type, Field field, Object before, boolean isMax) {
switch (type) {
case INT64 :
- return ((Long) before) + field.getLongV();
+ return isMax ? Math.max(((Long) before), field.getLongV())
+ : Math.min(((Long) before), field.getLongV());
+ case INT32 :
+ return isMax ? Math.max(((Integer) before), field.getIntV())
+ : Math.min(((Integer) before), field.getIntV());
case DOUBLE :
- return ((Double) before) + field.getDoubleV();
+ return isMax ? Math.max(((Double) before), field.getDoubleV())
+ : Math.min(((Double) before), field.getDoubleV());
+ case FLOAT :
+ return isMax ? Math.max(((Float) before), field.getFloatV())
+ : Math.min(((Float) before), field.getFloatV());
default :
- return ((Long) before) + field.getLongV();
+ return isMax ? Math.max(((Long) before), field.getLongV())
+ : Math.min(((Long) before), field.getLongV());
+ }
+ }
+
+ public static RowRecord mergeMaxOrMinByPath(RowRecord newRecord, TSDataType type,
+ Set<String> finalPaths, Map<Integer, String> pathIndex, boolean isMax) {
+ if (newRecord.getFields().size() < finalPaths.size()) {
+ return null;
+ }
+ Map<String, Object> finalPathMap = new TreeMap<>();
+ // reset final paths
+ initFinalPathMap(finalPathMap, finalPaths, type, isMax ? Integer.MIN_VALUE : Integer.MAX_VALUE);
+ for (int i = 0; i < newRecord.getFields().size(); i++) {
+ if (newRecord.getFields().get(i) != null) {
+ finalPathMap.put(pathIndex.get(i), getMaxOrMin(type, newRecord.getFields().get(i),
+ finalPathMap.get(pathIndex.get(i)), isMax));
+ }
}
+ RowRecord tmpRecord = new RowRecord(newRecord.getTimestamp());
+ for (Map.Entry<String, Object> entry : finalPathMap.entrySet()) {
+ tmpRecord.addField(Field.getField(entry.getValue(), type));
+ }
+ return tmpRecord;
}
public static RowRecord avgRecordByPath(RowRecord newRecord, Map<String, Float> finalPaths,
@@ -239,8 +301,17 @@ public class FilePathUtils {
switch (aggregation) {
case "count" :
return TSDataType.INT64;
+ case "avg" :
case "sum" :
return TSDataType.DOUBLE;
+ case "first_value" :
+ case "last_value" :
+ case "max_value" :
+ case "min_value" :
+ return plan.getDeduplicatedDataTypes().get(0);
+ case "max_time" :
+ case "min_time" :
+ return TSDataType.INT64;
default :
return TSDataType.INT64;
}