You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/07/14 09:42:17 UTC
[incubator-iotdb] 05/08: support sum
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch jira-768
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 8c3b9af22bc9ed68c12a77bb4757effece2ce456
Author: HTHou <hh...@outlook.com>
AuthorDate: Mon Jun 22 15:36:38 2020 +0800
support sum
---
.../query/dataset/groupby/GroupByTimeDataSet.java | 3 +-
.../db/query/executor/AggregationExecutor.java | 6 +--
.../org/apache/iotdb/db/service/TSServiceImpl.java | 3 +-
.../org/apache/iotdb/db/utils/FilePathUtils.java | 58 +++++++++++++++++++---
4 files changed, 57 insertions(+), 13 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
index 4fe932d..e2005ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
@@ -65,7 +65,8 @@ public class GroupByTimeDataSet extends QueryDataSet {
logger.debug("only group by level, paths:" + groupByTimePlan.getPaths());
}
while (dataSet != null && dataSet.hasNextWithoutConstraint()) {
- RowRecord curRecord = FilePathUtils.mergeRecordByPath(dataSet.nextWithoutConstraint(), finalPaths, pathIndex);
+ RowRecord curRecord = FilePathUtils
+ .mergeRecordByPath(dataSet.nextWithoutConstraint(), finalPaths, pathIndex, dataTypes.get(0));
if (curRecord != null) {
records.add(curRecord);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 8cb4ff5..5895280 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -337,16 +337,16 @@ public class AggregationExecutor {
switch (aggregation) {
case "count":
Map<String, Long> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex);
- curRecord = FilePathUtils.mergeRecordByPath(record, finalPaths, pathIndex);
+ curRecord = FilePathUtils.mergeRecordByPath(record, finalPaths, pathIndex, TSDataType.INT64);
for (int i = 0; i < finalPaths.size(); i++) {
dataTypes.add(TSDataType.INT64);
}
break;
case "sum":
Map<String, Long> finalPathsSum = FilePathUtils.getPathByLevel(plan, pathIndex);
- curRecord = FilePathUtils.mergeRecordByPath(record, finalPathsSum, pathIndex);
+ curRecord = FilePathUtils.mergeRecordByPath(record, finalPathsSum, pathIndex, TSDataType.DOUBLE);
for (int i = 0; i < finalPathsSum.size(); i++) {
- dataTypes.add(TSDataType.INT64);
+ dataTypes.add(TSDataType.DOUBLE);
}
break;
case "avg":
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index df378d6..5be346d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -691,9 +691,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return StaticResps.LAST_RESP.deepCopy();
} else if (plan instanceof AggregationPlan && ((AggregationPlan)plan).getLevel() >= 0) {
Map<String, Long> finalPaths = FilePathUtils.getPathByLevel((AggregationPlan) plan, null);
+ TSDataType type = FilePathUtils.getTSDataType((AggregationPlan) plan);
for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
respColumns.add(((AggregationPlan) plan).getAggregations().get(0) + "(" + entry.getKey() + ")");
- columnsTypes.add(TSDataType.INT64.toString());
+ columnsTypes.add(type.toString());
}
} else {
getWideQueryHeaders(plan, respColumns, columnsTypes);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
index 4270df8..2057300 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.utils;
import java.io.File;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -151,32 +152,61 @@ public class FilePathUtils {
*/
public static RowRecord mergeRecordByPath(RowRecord newRecord,
Map<String, Long> finalPaths,
- Map<Integer, String> pathIndex) {
+ Map<Integer, String> pathIndex,
+ TSDataType type) {
if (newRecord.getFields().size() < finalPaths.size()) {
return null;
}
+ Map<String, Object> finalPathMap = new HashMap<>();
// reset final paths
- for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
- entry.setValue(0L);
- }
+ initFinalPathMap(finalPathMap, finalPaths, type);
RowRecord tmpRecord = new RowRecord(newRecord.getTimestamp());
for (int i = 0; i < newRecord.getFields().size(); i++) {
if (newRecord.getFields().get(i) != null) {
- finalPaths.put(pathIndex.get(i),
- finalPaths.get(pathIndex.get(i)) + newRecord.getFields().get(i).getLongV());
+ finalPathMap.put(pathIndex.get(i), getValue(type, newRecord.getFields().get(i), finalPathMap.get(pathIndex.get(i))));
}
}
- for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
- tmpRecord.addField(Field.getField(entry.getValue(), TSDataType.INT64));
+ for (Map.Entry<String, Object> entry : finalPathMap.entrySet()) {
+ tmpRecord.addField(Field.getField(entry.getValue(), type));
}
return tmpRecord;
}
+ private static void initFinalPathMap(Map<String, Object> finalPathMap, Map<String, Long> finalPaths, TSDataType type) {
+ switch (type) {
+ case INT64 :
+ for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
+ finalPathMap.put(entry.getKey(), 0L);
+ }
+ break;
+ case DOUBLE :
+ for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
+ finalPathMap.put(entry.getKey(), 0D);
+ }
+ break;
+ default :
+ for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
+ finalPathMap.put(entry.getKey(), 0L);
+ }
+ }
+ }
+
+ private static Object getValue(TSDataType type, Field field, Object before) {
+ switch (type) {
+ case INT64 :
+ return ((Long) before) + field.getLongV();
+ case DOUBLE :
+ return ((Double) before) + field.getDoubleV();
+ default :
+ return ((Long) before) + field.getLongV();
+ }
+ }
+
public static RowRecord avgRecordByPath(RowRecord newRecord, Map<String, Float> finalPaths,
Map<Integer, String> pathIndex) {
if (newRecord.getFields().size() < finalPaths.size()) {
@@ -204,4 +234,16 @@ public class FilePathUtils {
return tmpRecord;
}
+ public static TSDataType getTSDataType(AggregationPlan plan) {
+ String aggregation = plan.getAggregations().get(0);
+ switch (aggregation) {
+ case "count" :
+ return TSDataType.INT64;
+ case "sum" :
+ return TSDataType.DOUBLE;
+ default :
+ return TSDataType.INT64;
+ }
+ }
+
}