You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/09/16 09:07:41 UTC
[iotdb] 01/05: implement groupVectorSeries()
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch aggrVector2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d8c97bde3e78fdc44d709f8c7764e5732201d8d1
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Sep 16 12:46:24 2021 +0800
implement groupVectorSeries()
---
.../db/query/executor/AggregationExecutor.java | 25 ++++++++++++++++++++++
1 file changed, 25 insertions(+)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index c01f3c3..bc970ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.VectorPartialPath;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
@@ -102,7 +103,11 @@ public class AggregationExecutor {
// TODO use multi-thread
Map<PartialPath, List<Integer>> pathToAggrIndexesMap =
groupAggregationsBySeries(selectedSeries);
+ // Attention: this method will REMOVE vector path from pathToAggrIndexesMap
+ Map<PartialPath, Map<String, List<Integer>>> vectorPathIndexesMap =
+ groupVectorSeries(pathToAggrIndexesMap);
AggregateResult[] aggregateResultList = new AggregateResult[selectedSeries.size()];
+
// TODO-Cluster: group the paths by storage group to reduce communications
List<StorageGroupProcessor> list =
StorageEngine.getInstance().mergeLock(new ArrayList<>(pathToAggrIndexesMap.keySet()));
@@ -511,4 +516,24 @@ public class AggregationExecutor {
}
return pathToAggrIndexesMap;
}
+
+ /**
+ * Group all the subSensors of one vector into one VectorPartialPath and Remove vectorPartialPath
+ * from pathToAggrIndexesMap.
+ *
+ * @return e.g. vector[s1, s2], Map{s1 -> 1, s2 -> 2}
+ */
+ private Map<PartialPath, Map<String, List<Integer>>> groupVectorSeries(
+ Map<PartialPath, List<Integer>> pathToAggrIndexesMap) {
+ Map<PartialPath, Map<String, List<Integer>>> result = new HashMap<>();
+ for (PartialPath seriesPath : pathToAggrIndexesMap.keySet()) {
+ if (seriesPath instanceof VectorPartialPath) {
+ List<Integer> indexes = pathToAggrIndexesMap.remove(seriesPath);
+ result
+ .computeIfAbsent(seriesPath, key -> new HashMap<>())
+ .put(((VectorPartialPath) seriesPath).getSubSensor(0), indexes);
+ }
+ }
+ return result;
+ }
}