You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/07/14 09:42:15 UTC
[incubator-iotdb] 03/08: jira 768
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch jira-768
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 991515bddf1c03272185bb6a590ceecf1a0dcca3
Author: HTHou <hh...@outlook.com>
AuthorDate: Thu Jun 18 15:41:48 2020 +0800
jira 768
---
.../db/query/executor/AggregationExecutor.java | 30 +++++++++----
.../org/apache/iotdb/db/service/TSServiceImpl.java | 4 +-
.../org/apache/iotdb/db/utils/FilePathUtils.java | 52 +++++++++++++++-------
3 files changed, 59 insertions(+), 27 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index aee2237..94d117a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import org.apache.iotdb.tsfile.utils.Pair;
import java.io.IOException;
import java.util.*;
@@ -319,27 +320,40 @@ public class AggregationExecutor {
*
* @param aggregateResultList aggregate result list
*/
- private QueryDataSet constructDataSet(List<AggregateResult> aggregateResultList, RawDataQueryPlan plan) {
+ private QueryDataSet constructDataSet(List<AggregateResult> aggregateResultList, AggregationPlan plan) {
RowRecord record = new RowRecord(0);
for (AggregateResult resultData : aggregateResultList) {
TSDataType dataType = resultData.getResultDataType();
record.addField(resultData.getResult(), dataType);
}
+ String aggregation = plan.getAggregations().get(0);
SingleDataSet dataSet = null;
- if (((AggregationPlan)plan).getLevel() >= 0) {
+ if (plan.getLevel() >= 0) {
// current only support count operation
Map<Integer, String> pathIndex = new HashMap<>();
- Map<String, TSDataType> finalPaths = FilePathUtils.getPathByLevel((AggregationPlan) plan, pathIndex);
-
- RowRecord curRecord = FilePathUtils.mergeRecordByPath(record, finalPaths, pathIndex);
-
List<Path> paths = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
- for (int i = 0; i < finalPaths.size(); i++) {
- dataTypes.add(TSDataType.DOUBLE);
+ Map<String, Long> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex);
+ RowRecord curRecord = null;
+ switch (aggregation) {
+ case "count":
+ curRecord = FilePathUtils.mergeRecordByPath(record, finalPaths, pathIndex);
+ for (int i = 0; i < finalPaths.size(); i++) {
+ dataTypes.add(TSDataType.INT64);
+ }
+ break;
+ case "avg":
+ curRecord = FilePathUtils.avgRecordByPath(record, finalPaths, pathIndex);
+ for (int i = 0; i < finalPaths.size(); i++) {
+ dataTypes.add(TSDataType.FLOAT);
+ }
+ break;
}
+
+
+
dataSet = new SingleDataSet(paths, dataTypes);
dataSet.setRecord(curRecord);
} else {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 3b51b3c..8c852fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -690,8 +690,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
// because the query dataset and query id is different although the header of last query is same.
return StaticResps.LAST_RESP.deepCopy();
} else if (plan instanceof AggregationPlan && ((AggregationPlan)plan).getLevel() >= 0) {
- Map<String, TSDataType> finalPaths = FilePathUtils.getPathByLevel((AggregationPlan) plan, null);
- for (Map.Entry<String, TSDataType> entry : finalPaths.entrySet()) {
+ Map<String, Long> finalPaths = FilePathUtils.getPathByLevel((AggregationPlan) plan, null);
+ for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
respColumns.add(((AggregationPlan) plan).getAggregations().get(0) + "(" + entry.getKey() + ")");
columnsTypes.add(entry.getValue().toString());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
index 84b11e2..9a7310a 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
@@ -64,13 +64,12 @@ public class FilePathUtils {
* @param pathIndex
* @return
*/
- public static Map<String, TSDataType> getPathByLevel(AggregationPlan plan, Map<Integer, String> pathIndex) {
+ public static Map<String, Long> getPathByLevel(AggregationPlan plan, Map<Integer, String> pathIndex) {
// pathGroupByLevel -> count
- Map<String, TSDataType> finalPaths = new TreeMap<>();
+ Map<String, Long> finalPaths = new TreeMap<>();
List<Path> rawPaths = plan.getPaths();
int level = plan.getLevel();
- String aggregation = plan.getAggregations().get(0);
int i = 0;
for (Path value : rawPaths) {
String[] tmpPath = MetaUtils.getNodeNames(value.getFullPath());
@@ -89,15 +88,7 @@ public class FilePathUtils {
}
key = path.toString();
}
- switch (aggregation) {
- case "sum" :
- finalPaths.putIfAbsent(key, TSDataType.INT64);
- break;
- case "avg" :
- finalPaths.putIfAbsent(key, TSDataType.INT64);
- break;
- }
- finalPaths.putIfAbsent(key, (float) 0);
+ finalPaths.putIfAbsent(key, 0L);
if (pathIndex != null) {
pathIndex.put(i++, key);
}
@@ -118,15 +109,42 @@ public class FilePathUtils {
* @return
*/
public static RowRecord mergeRecordByPath(RowRecord newRecord,
- Map<String, Float> finalPaths,
+ Map<String, Long> finalPaths,
Map<Integer, String> pathIndex) {
if (newRecord.getFields().size() < finalPaths.size()) {
return null;
}
// reset final paths
- for (Map.Entry<String, Float> entry : finalPaths.entrySet()) {
- entry.setValue((float) 0);
+ for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
+ entry.setValue(0L);
+ }
+
+ RowRecord tmpRecord = new RowRecord(newRecord.getTimestamp());
+
+ for (int i = 0; i < newRecord.getFields().size(); i++) {
+ if (newRecord.getFields().get(i) != null) {
+ finalPaths.put(pathIndex.get(i),
+ finalPaths.get(pathIndex.get(i)) + newRecord.getFields().get(i).getLongV());
+ }
+ }
+
+ for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
+ tmpRecord.addField(Field.getField(entry.getValue(), TSDataType.INT64));
+ }
+
+ return tmpRecord;
+ }
+
+ public static RowRecord avgRecordByPath(RowRecord newRecord, Map<String, Long> finalPaths,
+ Map<Integer, String> pathIndex) {
+ if (newRecord.getFields().size() < finalPaths.size()) {
+ return null;
+ }
+
+ // reset final paths
+ for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
+ entry.setValue(0L);
}
RowRecord tmpRecord = new RowRecord(newRecord.getTimestamp());
@@ -138,8 +156,8 @@ public class FilePathUtils {
}
}
- for (Map.Entry<String, Float> entry : finalPaths.entrySet()) {
- tmpRecord.addField(Field.getField(entry.getValue(), TSDataType.FLOAT));
+ for (Map.Entry<String, Long> entry : finalPaths.entrySet()) {
+ tmpRecord.addField(Field.getField(entry.getValue(), TSDataType.INT64));
}
return tmpRecord;