You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/07/14 09:42:16 UTC
[incubator-iotdb] 04/08: sum
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch jira-768
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 00f33e3e6fd73400d1fef9983ad8546281a70c30
Author: HTHou <hh...@outlook.com>
AuthorDate: Fri Jun 19 14:19:51 2020 +0800
sum
---
.../query/dataset/groupby/GroupByTimeDataSet.java | 2 +-
.../db/query/executor/AggregationExecutor.java | 22 ++++++----
.../org/apache/iotdb/db/service/TSServiceImpl.java | 2 +-
.../org/apache/iotdb/db/utils/FilePathUtils.java | 51 +++++++++++++++++++---
4 files changed, 62 insertions(+), 15 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
index 224c04c..4fe932d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
@@ -58,7 +58,7 @@ public class GroupByTimeDataSet extends QueryDataSet {
}
Map<Integer, String> pathIndex = new HashMap<>();
- Map<String, Float> finalPaths = FilePathUtils.getPathByLevel(plan.getPaths(), plan.getLevel(), pathIndex);
+ Map<String, Long> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex);
// get all records from GroupByDataSet, then we merge every record
if (logger.isDebugEnabled()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 94d117a..8cb4ff5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -48,7 +48,6 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
-import org.apache.iotdb.tsfile.utils.Pair;
import java.io.IOException;
import java.util.*;
@@ -275,7 +274,7 @@ public class AggregationExecutor {
aggregateResults.add(result);
}
aggregateWithValueFilter(aggregateResults, timestampGenerator, readersOfSelectedSeries);
- return constructDataSet(aggregateResults, queryPlan);
+ return constructDataSet(aggregateResults, (AggregationPlan) queryPlan);
}
protected TimeGenerator getTimeGenerator(QueryContext context, RawDataQueryPlan queryPlan) throws StorageEngineException {
@@ -334,26 +333,33 @@ public class AggregationExecutor {
Map<Integer, String> pathIndex = new HashMap<>();
List<Path> paths = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
- Map<String, Long> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex);
RowRecord curRecord = null;
switch (aggregation) {
case "count":
+ Map<String, Long> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex);
curRecord = FilePathUtils.mergeRecordByPath(record, finalPaths, pathIndex);
for (int i = 0; i < finalPaths.size(); i++) {
dataTypes.add(TSDataType.INT64);
}
break;
+ case "sum":
+ Map<String, Long> finalPathsSum = FilePathUtils.getPathByLevel(plan, pathIndex);
+ curRecord = FilePathUtils.mergeRecordByPath(record, finalPathsSum, pathIndex);
+ for (int i = 0; i < finalPathsSum.size(); i++) {
+ dataTypes.add(TSDataType.INT64);
+ }
+ break;
case "avg":
- curRecord = FilePathUtils.avgRecordByPath(record, finalPaths, pathIndex);
- for (int i = 0; i < finalPaths.size(); i++) {
+ Map<String, Float> finalPathsAvg = FilePathUtils.getPathByLevelAvg(plan, pathIndex);
+ curRecord = FilePathUtils.avgRecordByPath(record, finalPathsAvg, pathIndex);
+ for (int i = 0; i < finalPathsAvg.size(); i++) {
dataTypes.add(TSDataType.FLOAT);
}
break;
+ default:
+ break;
}
-
-
-
dataSet = new SingleDataSet(paths, dataTypes);
dataSet.setRecord(curRecord);
} else {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 8c852fc..df378d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -693,7 +693,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
Map<String, Long> finalPaths = FilePathUtils.getPathByLevel((AggregationPlan) plan, null);
for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
respColumns.add(((AggregationPlan) plan).getAggregations().get(0) + "(" + entry.getKey() + ")");
- columnsTypes.add(entry.getValue().toString());
+ columnsTypes.add(TSDataType.INT64.toString());
}
} else {
getWideQueryHeaders(plan, respColumns, columnsTypes);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
index 9a7310a..4270df8 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
@@ -98,6 +98,47 @@ public class FilePathUtils {
}
/**
+ * get paths from group by level, like root.sg1.d2.s0, root.sg1.d1.s1
+ * level=1, return [root.sg1, 0] and pathIndex turns to be [[0, root.sg1], [1, root.sg1]]
+ * @param rawPaths
+ * @param level
+ * @param pathIndex
+ * @return
+ */
+ public static Map<String, Float> getPathByLevelAvg(AggregationPlan plan, Map<Integer, String> pathIndex) {
+ // pathGroupByLevel -> count
+ Map<String, Float> finalPaths = new TreeMap<>();
+
+ List<Path> rawPaths = plan.getPaths();
+ int level = plan.getLevel();
+ int i = 0;
+ for (Path value : rawPaths) {
+ String[] tmpPath = MetaUtils.getNodeNames(value.getFullPath());
+
+ String key;
+ if (tmpPath.length <= level) {
+ key = value.getFullPath();
+ } else {
+ StringBuilder path = new StringBuilder();
+ for (int k = 0; k <= level; k++) {
+ if (k == 0) {
+ path.append(tmpPath[k]);
+ } else {
+ path.append(".").append(tmpPath[k]);
+ }
+ }
+ key = path.toString();
+ }
+ finalPaths.putIfAbsent(key, 0F);
+ if (pathIndex != null) {
+ pathIndex.put(i++, key);
+ }
+ }
+
+ return finalPaths;
+ }
+
+ /**
* merge the raw record by level, for example
* raw record [timestamp, root.sg1.d1.s0, root.sg1.d1.s1, root.sg1.d2.s2], level=1
* and newRecord data is [100, 1, 1, 1]
@@ -136,15 +177,15 @@ public class FilePathUtils {
return tmpRecord;
}
- public static RowRecord avgRecordByPath(RowRecord newRecord, Map<String, Long> finalPaths,
+ public static RowRecord avgRecordByPath(RowRecord newRecord, Map<String, Float> finalPaths,
Map<Integer, String> pathIndex) {
if (newRecord.getFields().size() < finalPaths.size()) {
return null;
}
// reset final paths
- for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
- entry.setValue(0L);
+ for (Map.Entry<String, Float> entry : finalPaths.entrySet()) {
+ entry.setValue(0F);
}
RowRecord tmpRecord = new RowRecord(newRecord.getTimestamp());
@@ -156,8 +197,8 @@ public class FilePathUtils {
}
}
- for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
- tmpRecord.addField(Field.getField(entry.getValue(), TSDataType.INT64));
+ for (Map.Entry<String, Float> entry : finalPaths.entrySet()) {
+ tmpRecord.addField(Field.getField(entry.getValue(), TSDataType.FLOAT));
}
return tmpRecord;