You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/05/21 01:51:33 UTC
[incubator-iotdb] 06/11: fix some error bugs: add select series
group entity, add query for all nodes features
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 3ee37c3afc234b347c82b9ac984fb487988f8aeb
Author: lta <li...@163.com>
AuthorDate: Mon May 20 15:34:30 2019 +0800
fix some error bugs: add select series group entity, add query for all nodes features
---
.../executor/ClusterAggregateEngineExecutor.java | 48 ++++----
.../executor/ClusterExecutorWithTimeGenerator.java | 35 ++----
.../ClusterExecutorWithoutTimeGenerator.java | 21 +++-
.../query/executor/ClusterFillEngineExecutor.java | 15 ++-
.../query/factory/ClusterSeriesReaderFactory.java | 28 +++--
.../ClusterRpcSingleQueryManager.java | 137 +++++++++------------
...oupEntity.java => FilterSeriesGroupEntity.java} | 4 +-
.../IClusterRpcSingleQueryManager.java | 7 --
...oupEntity.java => SelectSeriesGroupEntity.java} | 55 +++------
.../AbstractClusterPointReader.java | 2 +
.../coordinatornode/ClusterFilterSeriesReader.java | 17 +--
.../coordinatornode/ClusterSelectSeriesReader.java | 25 +---
.../timegenerator/ClusterNodeConstructor.java | 4 +-
.../query/utils/ClusterTimeValuePairUtils.java | 18 +++
.../iotdb/cluster/query/utils/ExpressionUtils.java | 12 +-
.../query/utils/QueryPlanPartitionUtils.java | 65 +++++-----
.../query/manager/ClusterRpcManagerTest.java | 46 ++-----
.../query/utils/QueryPlanPartitionUtilsTest.java | 56 +++++----
18 files changed, 268 insertions(+), 327 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
index 51113c9..b34afa1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.cluster.query.executor;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -29,9 +30,11 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory;
import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
-import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterGroupEntity;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.SelectSeriesGroupEntity;
import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader;
import org.apache.iotdb.cluster.query.timegenerator.ClusterTimeGenerator;
+import org.apache.iotdb.cluster.utils.QPExecutorUtils;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
@@ -79,18 +82,25 @@ public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor {
public QueryDataSet executeWithoutTimeGenerator(QueryContext context)
throws FileNodeManagerException, IOException, PathErrorException, ProcessorException {
Filter timeFilter = expression != null ? ((GlobalTimeExpression) expression).getFilter() : null;
- Map<Path, ClusterSelectSeriesReader> selectPathReaders = queryManager.getSelectSeriesReaders();
+ Map<String, SelectSeriesGroupEntity> selectSeriesGroupEntityMap = queryManager
+ .getSelectSeriesGroupEntityMap();
List<Path> paths = new ArrayList<>();
List<IPointReader> readers = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
+ //Mark filter series reader index group by group id
+ Map<String, Integer> selectSeriesReaderIndex = new HashMap<>();
for (int i = 0; i < selectedSeries.size(); i++) {
Path path = selectedSeries.get(i);
+ String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice());
- if (selectPathReaders.containsKey(path)) {
- ClusterSelectSeriesReader reader = selectPathReaders.get(path);
+ if (selectSeriesGroupEntityMap.containsKey(groupId)) {
+ int index = selectSeriesReaderIndex.getOrDefault(groupId, 0);
+ ClusterSelectSeriesReader reader = selectSeriesGroupEntityMap.get(groupId)
+ .getSelectSeriesReaders().get(index);
readers.add(reader);
dataTypes.add(reader.getDataType());
+ selectSeriesReaderIndex.put(groupId, index + 1);
} else {
paths.add(path);
// construct AggregateFunction
@@ -140,15 +150,19 @@ public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor {
/** add query token for query series which can handle locally **/
List<Path> localQuerySeries = new ArrayList<>(selectedSeries);
- Set<Path> remoteQuerySeries = queryManager.getSelectSeriesReaders().keySet();
+ Set<Path> remoteQuerySeries = new HashSet<>();
+ queryManager.getSelectSeriesGroupEntityMap().values().forEach(
+ selectSeriesGroupEntity -> selectSeriesGroupEntity.getSelectPaths()
+ .forEach(path -> remoteQuerySeries.add(path)));
localQuerySeries.removeAll(remoteQuerySeries);
QueryResourceManager.getInstance()
.beginQueryOfGivenQueryPaths(context.getJobId(), localQuerySeries);
/** add query token for filter series which can handle locally **/
Set<String> deviceIdSet = new HashSet<>();
- for (FilterGroupEntity filterGroupEntity : queryManager.getFilterGroupEntityMap().values()) {
- List<Path> remoteFilterSeries = filterGroupEntity.getFilterPaths();
+ for (FilterSeriesGroupEntity filterSeriesGroupEntity : queryManager
+ .getFilterSeriesGroupEntityMap().values()) {
+ List<Path> remoteFilterSeries = filterSeriesGroupEntity.getFilterPaths();
remoteFilterSeries.forEach(seriesPath -> deviceIdSet.add(seriesPath.getDevice()));
}
QueryResourceManager.getInstance()
@@ -156,32 +170,18 @@ public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor {
ClusterTimeGenerator timestampGenerator;
List<EngineReaderByTimeStamp> readersOfSelectedSeries;
+ // origin data type of select paths
+ List<TSDataType> originDataTypes = new ArrayList<>();
try {
timestampGenerator = new ClusterTimeGenerator(expression, context,
queryManager);
readersOfSelectedSeries = ClusterSeriesReaderFactory
.createReadersByTimestampOfSelectedPaths(selectedSeries, context,
- queryManager);
+ queryManager, originDataTypes);
} catch (IOException ex) {
throw new FileNodeManagerException(ex);
}
- /** Get data type of select paths **/
- List<TSDataType> originDataTypes = new ArrayList<>();
- Map<Path, ClusterSelectSeriesReader> selectSeriesReaders = queryManager
- .getSelectSeriesReaders();
- for (Path path : selectedSeries) {
- try {
- if (selectSeriesReaders.containsKey(path)) {
- originDataTypes.add(selectSeriesReaders.get(path).getDataType());
- } else {
- originDataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
- }
- } catch (PathErrorException e) {
- throw new FileNodeManagerException(e);
- }
- }
-
List<AggregateFunction> aggregateFunctions = new ArrayList<>();
for (int i = 0; i < selectedSeries.size(); i++) {
TSDataType type = originDataTypes.get(i);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithTimeGenerator.java
index fed8c0d..fe2511a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithTimeGenerator.java
@@ -27,7 +27,7 @@ import java.util.Set;
import org.apache.iotdb.cluster.query.dataset.ClusterDataSetWithTimeGenerator;
import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory;
import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
-import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterGroupEntity;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity;
import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader;
import org.apache.iotdb.cluster.query.timegenerator.ClusterTimeGenerator;
import org.apache.iotdb.db.exception.FileNodeManagerException;
@@ -71,15 +71,19 @@ public class ClusterExecutorWithTimeGenerator {
/** add query token for query series which can handle locally **/
List<Path> localQuerySeries = new ArrayList<>(queryExpression.getSelectedSeries());
- Set<Path> remoteQuerySeries = queryManager.getSelectSeriesReaders().keySet();
+ Set<Path> remoteQuerySeries = new HashSet<>();
+ queryManager.getSelectSeriesGroupEntityMap().values().forEach(
+ selectSeriesGroupEntity -> selectSeriesGroupEntity.getSelectPaths()
+ .forEach(path -> remoteQuerySeries.add(path)));
localQuerySeries.removeAll(remoteQuerySeries);
QueryResourceManager.getInstance()
.beginQueryOfGivenQueryPaths(context.getJobId(), localQuerySeries);
/** add query token for filter series which can handle locally **/
Set<String> deviceIdSet = new HashSet<>();
- for (FilterGroupEntity filterGroupEntity : queryManager.getFilterGroupEntityMap().values()) {
- List<Path> remoteFilterSeries = filterGroupEntity.getFilterPaths();
+ for (FilterSeriesGroupEntity filterSeriesGroupEntity : queryManager
+ .getFilterSeriesGroupEntityMap().values()) {
+ List<Path> remoteFilterSeries = filterSeriesGroupEntity.getFilterPaths();
remoteFilterSeries.forEach(seriesPath -> deviceIdSet.add(seriesPath.getDevice()));
}
QueryResourceManager.getInstance()
@@ -88,33 +92,18 @@ public class ClusterExecutorWithTimeGenerator {
ClusterTimeGenerator timestampGenerator;
List<EngineReaderByTimeStamp> readersOfSelectedSeries;
+ /** Get data type of select paths **/
+ List<TSDataType> dataTypes = new ArrayList<>();
try {
timestampGenerator = new ClusterTimeGenerator(queryExpression.getExpression(), context,
queryManager);
readersOfSelectedSeries = ClusterSeriesReaderFactory
.createReadersByTimestampOfSelectedPaths(queryExpression.getSelectedSeries(), context,
- queryManager);
- } catch (IOException ex) {
+ queryManager, dataTypes);
+ } catch (IOException | PathErrorException ex) {
throw new FileNodeManagerException(ex);
}
- /** Get data type of select paths **/
- List<TSDataType> dataTypes = new ArrayList<>();
- Map<Path, ClusterSelectSeriesReader> selectSeriesReaders = queryManager
- .getSelectSeriesReaders();
- for (Path path : queryExpression.getSelectedSeries()) {
- try {
- if (selectSeriesReaders.containsKey(path)) {
- dataTypes.add(selectSeriesReaders.get(path).getDataType());
- } else {
- dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
- }
- } catch (PathErrorException e) {
- throw new FileNodeManagerException(e);
- }
-
- }
-
EngineReaderByTimeStamp[] readersOfSelectedSeriesArray = new EngineReaderByTimeStamp[readersOfSelectedSeries
.size()];
int index = 0;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithoutTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithoutTimeGenerator.java
index 8f42c9f..95e5f1a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithoutTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterExecutorWithoutTimeGenerator.java
@@ -20,11 +20,15 @@ package org.apache.iotdb.cluster.query.executor;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.SelectSeriesGroupEntity;
import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader;
+import org.apache.iotdb.cluster.utils.QPExecutorUtils;
import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator;
@@ -62,7 +66,7 @@ public class ClusterExecutorWithoutTimeGenerator extends AbstractExecutorWithout
* Execute query without filter or with only global time filter.
*/
public QueryDataSet execute(QueryContext context)
- throws FileNodeManagerException {
+ throws FileNodeManagerException, PathErrorException {
Filter timeFilter = null;
if (queryExpression.getExpression() != null) {
@@ -72,15 +76,22 @@ public class ClusterExecutorWithoutTimeGenerator extends AbstractExecutorWithout
List<IPointReader> readersOfSelectedSeries = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
- Map<Path, ClusterSelectSeriesReader> selectPathReaders = queryManager.getSelectSeriesReaders();
+ Map<String, SelectSeriesGroupEntity> selectSeriesGroupEntityMap = queryManager
+ .getSelectSeriesGroupEntityMap();
List<Path> paths = new ArrayList<>();
+ //Mark filter series reader index group by group id
+ Map<String, Integer> selectSeriesReaderIndex = new HashMap<>();
for (Path path : queryExpression.getSelectedSeries()) {
- if (selectPathReaders.containsKey(path)) {
- ClusterSelectSeriesReader reader = selectPathReaders.get(path);
+ String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice());
+
+ if (selectSeriesGroupEntityMap.containsKey(groupId)) {
+ int index = selectSeriesReaderIndex.getOrDefault(groupId, 0);
+ ClusterSelectSeriesReader reader = selectSeriesGroupEntityMap.get(groupId)
+ .getSelectSeriesReaders().get(index);
readersOfSelectedSeries.add(reader);
dataTypes.add(reader.getDataType());
-
+ selectSeriesReaderIndex.put(groupId, index + 1);
} else {
IPointReader reader = createSeriesReader(context, path, dataTypes, timeFilter);
readersOfSelectedSeries.add(reader);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterFillEngineExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterFillEngineExecutor.java
index 771637e..608a479 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterFillEngineExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterFillEngineExecutor.java
@@ -20,10 +20,13 @@ package org.apache.iotdb.cluster.query.executor;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.SelectSeriesGroupEntity;
import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader;
+import org.apache.iotdb.cluster.utils.QPExecutorUtils;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
@@ -58,16 +61,22 @@ public class ClusterFillEngineExecutor implements IFillEngineExecutor {
@Override
public QueryDataSet execute(QueryContext context)
throws FileNodeManagerException, PathErrorException, IOException {
- Map<Path, ClusterSelectSeriesReader> selectPathReaders = queryManager.getSelectSeriesReaders();
List<Path> paths = new ArrayList<>();
List<IFill> fillList = new ArrayList<>();
List<TSDataType> dataTypeList = new ArrayList<>();
List<IPointReader> readers = new ArrayList<>();
+ Map<String, SelectSeriesGroupEntity> selectSeriesEntityMap = queryManager.getSelectSeriesGroupEntityMap();
+ //Mark filter series reader index group by group id
+ Map<String, Integer> selectSeriesReaderIndex = new HashMap<>();
for (Path path : selectedSeries) {
- if (selectPathReaders.containsKey(path)) {
- ClusterSelectSeriesReader reader = selectPathReaders.get(path);
+ String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice());
+
+ if (selectSeriesEntityMap.containsKey(groupId)) {
+ int index = selectSeriesReaderIndex.getOrDefault(groupId, 0);
+ ClusterSelectSeriesReader reader = selectSeriesEntityMap.get(groupId).getSelectSeriesReaders().get(index);
readers.add(reader);
dataTypeList.add(reader.getDataType());
+ selectSeriesReaderIndex.put(groupId, index + 1);
} else {
QueryDataSource queryDataSource = QueryResourceManager.getInstance()
.getQueryDataSource(path, context);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/factory/ClusterSeriesReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/factory/ClusterSeriesReaderFactory.java
index d65ed58..a9ee032 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/factory/ClusterSeriesReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/factory/ClusterSeriesReaderFactory.java
@@ -20,18 +20,24 @@ package org.apache.iotdb.cluster.query.factory;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.SelectSeriesGroupEntity;
import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader;
+import org.apache.iotdb.cluster.utils.QPExecutorUtils;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
import org.apache.iotdb.db.query.reader.merge.PriorityMergeReaderByTimestamp;
import org.apache.iotdb.db.query.reader.sequence.SequenceDataReaderByTimestamp;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
/**
@@ -43,27 +49,35 @@ public class ClusterSeriesReaderFactory {
}
/**
- * Construct ReaderByTimestamp , include sequential data and unsequential data.
+ * Construct ReaderByTimestamp , include sequential data and unsequential data. And get all series dataType.
*
* @param paths selected series path
* @param context query context
* @return the list of EngineReaderByTimeStamp
*/
public static List<EngineReaderByTimeStamp> createReadersByTimestampOfSelectedPaths(
- List<Path> paths, QueryContext context, ClusterRpcSingleQueryManager queryManager)
- throws IOException, FileNodeManagerException {
+ List<Path> paths, QueryContext context, ClusterRpcSingleQueryManager queryManager, List<TSDataType> dataTypes)
+ throws IOException, FileNodeManagerException, PathErrorException {
- Map<Path, ClusterSelectSeriesReader> selectSeriesReaders = queryManager.getSelectSeriesReaders();
+ Map<String, SelectSeriesGroupEntity> selectSeriesEntityMap = queryManager
+ .getSelectSeriesGroupEntityMap();
List<EngineReaderByTimeStamp> readersOfSelectedSeries = new ArrayList<>();
+ //Mark filter series reader index group by group id
+ Map<String, Integer> selectSeriesReaderIndex = new HashMap<>();
for (Path path : paths) {
-
- if (selectSeriesReaders.containsKey(path)) {
- readersOfSelectedSeries.add(selectSeriesReaders.get(path));
+ String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice());
+ if (selectSeriesEntityMap.containsKey(groupId)) {
+ int index = selectSeriesReaderIndex.getOrDefault(groupId, 0);
+ ClusterSelectSeriesReader reader = selectSeriesEntityMap.get(groupId).getSelectSeriesReaders().get(index);
+ readersOfSelectedSeries.add(reader);
+ dataTypes.add(reader.getDataType());
+ selectSeriesReaderIndex.put(groupId, index + 1);
} else {
/** can handle series query locally **/
EngineReaderByTimeStamp readerByTimeStamp = createReaderByTimeStamp(path, context);
readersOfSelectedSeries.add(readerByTimeStamp);
+ dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
}
}
return readersOfSelectedSeries;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
index af4db31..905ce1b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
@@ -86,27 +86,15 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
// select path resource
/**
- * Query plans of select paths which are divided from queryPlan group by group id, it contains all
- * group id ,including local data group if it involves.
+ * Select series group entity group by data group, key is group id(only contain remote group id)
*/
- private Map<String, QueryPlan> selectPathPlans = new HashMap<>();
-
- /**
- * Key is group id (only contains remote group id), value is all select series in group id.
- */
- private Map<String, List<Path>> selectSeriesByGroupId = new HashMap<>();
-
- /**
- * Series reader of select paths (only contains remote series), key is series path , value is
- * reader
- */
- private Map<Path, ClusterSelectSeriesReader> selectSeriesReaders = new HashMap<>();
+ private Map<String, SelectSeriesGroupEntity> selectSeriesGroupEntityMap = new HashMap<>();
// filter path resource
/**
- * Filter group entity group by data group, key is group id(only contain remote group id)
+ * Filter series group entity group by data group, key is group id(only contain remote group id)
*/
- private Map<String, FilterGroupEntity> filterGroupEntityMap = new HashMap<>();
+ private Map<String, FilterSeriesGroupEntity> filterSeriesGroupEntityMap = new HashMap<>();
private static final ClusterConfig CLUSTER_CONF = ClusterDescriptor.getInstance().getConfig();
@@ -140,17 +128,18 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
private void initSeriesReader(int readDataConsistencyLevel)
throws RaftConnectionException, IOException {
// Init all series with data group of select series,if filter series has the same data group, init them together.
- for (Entry<String, QueryPlan> entry : selectPathPlans.entrySet()) {
+ for (Entry<String, SelectSeriesGroupEntity> entry : selectSeriesGroupEntityMap.entrySet()) {
String groupId = entry.getKey();
- QueryPlan queryPlan = entry.getValue();
+ SelectSeriesGroupEntity selectEntity = entry.getValue();
+ QueryPlan queryPlan = selectEntity.getQueryPlan();
if (!QPExecutorUtils.canHandleQueryByGroupId(groupId)) {
Map<PathType, QueryPlan> allQueryPlan = new EnumMap<>(PathType.class);
allQueryPlan.put(PathType.SELECT_PATH, queryPlan);
List<Filter> filterList = new ArrayList<>();
- if (filterGroupEntityMap.containsKey(groupId)) {
- FilterGroupEntity filterGroupEntity = filterGroupEntityMap.get(groupId);
- allQueryPlan.put(PathType.FILTER_PATH, filterGroupEntity.getQueryPlan());
- filterList = filterGroupEntity.getFilters();
+ if (filterSeriesGroupEntityMap.containsKey(groupId)) {
+ FilterSeriesGroupEntity filterSeriesGroupEntity = filterSeriesGroupEntityMap.get(groupId);
+ allQueryPlan.put(PathType.FILTER_PATH, filterSeriesGroupEntity.getQueryPlan());
+ filterList = filterSeriesGroupEntity.getFilters();
}
/** create request **/
BasicRequest request = InitSeriesReaderRequest
@@ -161,27 +150,29 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
handleInitReaderResponse(groupId, allQueryPlan, response);
} else {
dataGroupUsage.add(groupId);
- selectSeriesByGroupId.remove(groupId);
- if (filterGroupEntityMap.containsKey(groupId)) {
- filterGroupEntityMap.remove(groupId);
- }
+ selectSeriesGroupEntityMap.remove(groupId);
+ filterSeriesGroupEntityMap.remove(groupId);
}
}
//Init series reader with data groups of filter series, which don't exist in data groups list of select series.
- for (Entry<String, FilterGroupEntity> entry : filterGroupEntityMap.entrySet()) {
+ for (Entry<String, FilterSeriesGroupEntity> entry : filterSeriesGroupEntityMap.entrySet()) {
String groupId = entry.getKey();
- if (!selectPathPlans.containsKey(groupId)) {
+ if (!selectSeriesGroupEntityMap.containsKey(groupId) && !QPExecutorUtils
+ .canHandleQueryByGroupId(groupId)) {
Map<PathType, QueryPlan> allQueryPlan = new EnumMap<>(PathType.class);
- FilterGroupEntity filterGroupEntity = filterGroupEntityMap.get(groupId);
- allQueryPlan.put(PathType.FILTER_PATH, filterGroupEntity.getQueryPlan());
- List<Filter> filterList = filterGroupEntity.getFilters();
+ FilterSeriesGroupEntity filterSeriesGroupEntity = filterSeriesGroupEntityMap.get(groupId);
+ allQueryPlan.put(PathType.FILTER_PATH, filterSeriesGroupEntity.getQueryPlan());
+ List<Filter> filterList = filterSeriesGroupEntity.getFilters();
BasicRequest request = InitSeriesReaderRequest
.createInitialQueryRequest(groupId, taskId, readDataConsistencyLevel,
allQueryPlan, filterList);
InitSeriesReaderResponse response = (InitSeriesReaderResponse) ClusterRpcReaderUtils
.createClusterSeriesReader(groupId, request, this);
handleInitReaderResponse(groupId, allQueryPlan, response);
+ } else if (!selectSeriesGroupEntityMap.containsKey(groupId)) {
+ dataGroupUsage.add(groupId);
+ filterSeriesGroupEntityMap.remove(groupId);
}
}
}
@@ -201,7 +192,7 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
TSDataType dataType = seriesType.get(i);
ClusterSelectSeriesReader seriesReader = new ClusterSelectSeriesReader(groupId, seriesPath,
dataType, this);
- selectSeriesReaders.put(seriesPath, seriesReader);
+ selectSeriesGroupEntityMap.get(groupId).addSelectSeriesReader(seriesReader);
}
}
if (allQueryPlan.containsKey(PathType.FILTER_PATH)) {
@@ -213,10 +204,7 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
TSDataType dataType = seriesType.get(i);
ClusterFilterSeriesReader seriesReader = new ClusterFilterSeriesReader(groupId, seriesPath,
dataType, this);
- if (!filterGroupEntityMap.containsKey(groupId)) {
- filterGroupEntityMap.put(groupId, new FilterGroupEntity(groupId));
- }
- filterGroupEntityMap.get(groupId).addFilterSeriesReader(seriesReader);
+ filterSeriesGroupEntityMap.get(groupId).addFilterSeriesReader(seriesReader);
}
}
}
@@ -224,16 +212,15 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
@Override
public void fetchBatchDataForSelectPaths(String groupId) throws RaftConnectionException {
List<String> fetchDataSeries = new ArrayList<>();
- Map<String, List<Path>> seriesByGroupId;
- Map<Path, ClusterSelectSeriesReader> seriesReaders;
- seriesByGroupId = selectSeriesByGroupId;
- seriesReaders = selectSeriesReaders;
- if (seriesByGroupId.containsKey(groupId)) {
- List<Path> allFilterSeries = seriesByGroupId.get(groupId);
- for (Path series : allFilterSeries) {
- if (seriesReaders.get(series).enableFetchData()) {
- fetchDataSeries.add(series.getFullPath());
- }
+ List<Integer> selectSeriesIndexs = new ArrayList<>();
+ List<Path> selectSeries = selectSeriesGroupEntityMap.get(groupId).getSelectPaths();
+ List<ClusterSelectSeriesReader> seriesReaders = selectSeriesGroupEntityMap.get(groupId)
+ .getSelectSeriesReaders();
+ for (int i = 0; i < selectSeries.size(); i++) {
+ Path series = selectSeries.get(i);
+ if (seriesReaders.get(i).enableFetchData()) {
+ fetchDataSeries.add(series.getFullPath());
+ selectSeriesIndexs.add(i);
}
}
BasicRequest request = QuerySeriesDataRequest
@@ -241,7 +228,8 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
queryRounds++);
QuerySeriesDataResponse response = (QuerySeriesDataResponse) ClusterRpcReaderUtils
.handleQueryRequest(request, queryNodes.get(groupId), 0);
- handleFetchDataResponseForSelectPaths(fetchDataSeries, response);
+
+ handleFetchDataResponseForSelectPaths(groupId, selectSeriesIndexs, response);
}
@Override
@@ -258,42 +246,45 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
@Override
public void fetchBatchDataByTimestampForAllSelectPaths(List<Long> batchTimestamp)
throws RaftConnectionException {
- for (Entry<String, List<Path>> entry : selectSeriesByGroupId.entrySet()) {
+ for (Entry<String, SelectSeriesGroupEntity> entry : selectSeriesGroupEntityMap.entrySet()) {
String groupId = entry.getKey();
List<String> fetchDataFilterSeries = new ArrayList<>();
- entry.getValue().forEach(path -> fetchDataFilterSeries.add(path.getFullPath()));
+ entry.getValue().getSelectPaths()
+ .forEach(path -> fetchDataFilterSeries.add(path.getFullPath()));
BasicRequest request = QuerySeriesDataByTimestampRequest
.createRequest(groupId, queryRounds++, taskId, batchTimestamp, fetchDataFilterSeries);
QuerySeriesDataByTimestampResponse response = (QuerySeriesDataByTimestampResponse) ClusterRpcReaderUtils
.handleQueryRequest(request, queryNodes.get(groupId), 0);
- handleFetchDataByTimestampResponseForSelectPaths(fetchDataFilterSeries, response);
+ handleFetchDataByTimestampResponseForSelectPaths(groupId, fetchDataFilterSeries, response);
}
}
/**
* Handle response of fetching data, and add batch data to corresponding reader.
*/
- private void handleFetchDataByTimestampResponseForSelectPaths(List<String> fetchDataSeries,
+ private void handleFetchDataByTimestampResponseForSelectPaths(String groupId,
+ List<String> fetchDataSeries,
BasicQueryDataResponse response) {
List<BatchData> batchDataList = response.getSeriesBatchData();
+ List<ClusterSelectSeriesReader> selectSeriesReaders = selectSeriesGroupEntityMap.get(groupId)
+ .getSelectSeriesReaders();
for (int i = 0; i < fetchDataSeries.size(); i++) {
- String series = fetchDataSeries.get(i);
BatchData batchData = batchDataList.get(i);
- selectSeriesReaders.get(new Path(series))
- .addBatchData(batchData, true);
+ selectSeriesReaders.get(i).addBatchData(batchData, true);
}
}
/**
* Handle response of fetching data, and add batch data to corresponding reader.
*/
- private void handleFetchDataResponseForSelectPaths(List<String> fetchDataSeries,
- BasicQueryDataResponse response) {
+ private void handleFetchDataResponseForSelectPaths(String groupId,
+ List<Integer> selectSeriesIndexs, BasicQueryDataResponse response) {
List<BatchData> batchDataList = response.getSeriesBatchData();
- for (int i = 0; i < fetchDataSeries.size(); i++) {
- String series = fetchDataSeries.get(i);
+ List<ClusterSelectSeriesReader> selectSeriesReaders = selectSeriesGroupEntityMap.get(groupId)
+ .getSelectSeriesReaders();
+ for (int i = 0; i < selectSeriesIndexs.size(); i++) {
BatchData batchData = batchDataList.get(i);
- selectSeriesReaders.get(new Path(series))
+ selectSeriesReaders.get(selectSeriesIndexs.get(i))
.addBatchData(batchData, batchData.length() < CLUSTER_CONF.getBatchReadSize());
}
}
@@ -303,10 +294,11 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
*/
private void handleFetchDataResponseForFilterPaths(String groupId,
QuerySeriesDataResponse response) {
- FilterGroupEntity filterGroupEntity = filterGroupEntityMap.get(groupId);
- List<Path> fetchDataSeries = filterGroupEntity.getFilterPaths();
+ FilterSeriesGroupEntity filterSeriesGroupEntity = filterSeriesGroupEntityMap.get(groupId);
+ List<Path> fetchDataSeries = filterSeriesGroupEntity.getFilterPaths();
List<BatchData> batchDataList = response.getSeriesBatchData();
- List<ClusterFilterSeriesReader> filterReaders = filterGroupEntity.getFilterSeriesReaders();
+ List<ClusterFilterSeriesReader> filterReaders = filterSeriesGroupEntity
+ .getFilterSeriesReaders();
boolean remoteDataFinish = true;
for (int i = 0; i < batchDataList.size(); i++) {
if (batchDataList.get(i).length() != 0) {
@@ -323,11 +315,6 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
}
@Override
- public QueryPlan getSelectPathQueryPlan(String fullPath) {
- return selectPathPlans.get(fullPath);
- }
-
- @Override
public void setDataGroupReaderNode(String groupId, PeerId readerNode) {
queryNodes.put(groupId, readerNode);
}
@@ -375,19 +362,11 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
this.queryNodes.put(groupID, peerId);
}
- public Map<String, QueryPlan> getSelectPathPlans() {
- return selectPathPlans;
- }
-
- public Map<String, List<Path>> getSelectSeriesByGroupId() {
- return selectSeriesByGroupId;
- }
-
- public Map<Path, ClusterSelectSeriesReader> getSelectSeriesReaders() {
- return selectSeriesReaders;
+ public Map<String, SelectSeriesGroupEntity> getSelectSeriesGroupEntityMap() {
+ return selectSeriesGroupEntityMap;
}
- public Map<String, FilterGroupEntity> getFilterGroupEntityMap() {
- return filterGroupEntityMap;
+ public Map<String, FilterSeriesGroupEntity> getFilterSeriesGroupEntityMap() {
+ return filterSeriesGroupEntityMap;
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterGroupEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterSeriesGroupEntity.java
similarity index 97%
copy from cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterGroupEntity.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterSeriesGroupEntity.java
index 326af11..19407a0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterGroupEntity.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterSeriesGroupEntity.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
/**
* Filter entities of a data group, concluding QueryPlan, filters, all filter paths and filter readers
*/
-public class FilterGroupEntity {
+public class FilterSeriesGroupEntity {
/**
* Group id
@@ -62,7 +62,7 @@ public class FilterGroupEntity {
*/
private List<ClusterFilterSeriesReader> filterSeriesReaders;
- public FilterGroupEntity(String groupId) {
+ public FilterSeriesGroupEntity(String groupId) {
this.groupId = groupId;
this.filterPaths = new ArrayList<>();
this.filters = new ArrayList<>();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java
index c4aec9c..d6ca0d7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java
@@ -69,13 +69,6 @@ public interface IClusterRpcSingleQueryManager {
throws RaftConnectionException;
/**
- * Get query plan of select path
- *
- * @param fullPath Timeseries full path in select paths
- */
- QueryPlan getSelectPathQueryPlan(String fullPath);
-
- /**
* Set reader node of a data group
*
* @param groupId data group id
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterGroupEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/SelectSeriesGroupEntity.java
similarity index 55%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterGroupEntity.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/SelectSeriesGroupEntity.java
index 326af11..9f35117 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/FilterGroupEntity.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/SelectSeriesGroupEntity.java
@@ -21,15 +21,14 @@ package org.apache.iotdb.cluster.query.manager.coordinatornode;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterFilterSeriesReader;
+import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
/**
- * Filter entities of a data group, concluding QueryPlan, filters, all filter paths and filter readers
+ * Select series entity entities of a data group, concluding QueryPlan, all select paths and series readers
*/
-public class FilterGroupEntity {
-
+public class SelectSeriesGroupEntity {
/**
* Group id
*/
@@ -41,32 +40,24 @@ public class FilterGroupEntity {
private QueryPlan queryPlan;
/**
- * Filters of filter path.
- */
- private List<Filter> filters;
-
- /**
*
- * all filter series
+ * all select series
* <p>
- * Note: It may contain multiple series in a complicated tree
- * for example: select * from root.vehicle where d0.s0 > 10 and d0.s0 < 101 or time = 12,
- * filter tree: <code>[[[[root.vehicle.d0.s0:time == 12] || [root.vehicle.d0.s1:time == 12]] || [root.vehicle.d1.s2:time == 12]] || [root.vehicle.d1.s3:time == 12]]</code>
+ * Note: It may contain multiple series in a query
+ * for example: select sum(s0), max(s0) from root.vehicle.d0 where s0 > 10
* </p>
*/
- private List<Path> filterPaths;
-
+ private List<Path> selectPaths;
/**
* Series reader of filter paths (only contains remote series)
*/
- private List<ClusterFilterSeriesReader> filterSeriesReaders;
+ private List<ClusterSelectSeriesReader> selectSeriesReaders;
- public FilterGroupEntity(String groupId) {
+ public SelectSeriesGroupEntity(String groupId) {
this.groupId = groupId;
- this.filterPaths = new ArrayList<>();
- this.filters = new ArrayList<>();
- this.filterSeriesReaders = new ArrayList<>();
+ this.selectPaths = new ArrayList<>();
+ this.selectSeriesReaders = new ArrayList<>();
}
public String getGroupId() {
@@ -85,27 +76,19 @@ public class FilterGroupEntity {
this.queryPlan = queryPlan;
}
- public List<Filter> getFilters() {
- return filters;
- }
-
- public void addFilter(Filter filter) {
- this.filters.add(filter);
- }
-
- public List<Path> getFilterPaths() {
- return filterPaths;
+ public List<Path> getSelectPaths() {
+ return selectPaths;
}
- public void addFilterPaths(Path filterPath) {
- this.filterPaths.add(filterPath);
+ public void addSelectPaths(Path selectPath) {
+ this.selectPaths.add(selectPath);
}
- public List<ClusterFilterSeriesReader> getFilterSeriesReaders() {
- return filterSeriesReaders;
+ public List<ClusterSelectSeriesReader> getSelectSeriesReaders() {
+ return selectSeriesReaders;
}
- public void addFilterSeriesReader(ClusterFilterSeriesReader filterSeriesReader) {
- this.filterSeriesReaders.add(filterSeriesReader);
+ public void addSelectSeriesReader(ClusterSelectSeriesReader selectSeriesReader) {
+ this.selectSeriesReaders.add(selectSeriesReader);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/AbstractClusterPointReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/AbstractClusterPointReader.java
index 3f73160..c0012a1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/AbstractClusterPointReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/AbstractClusterPointReader.java
@@ -71,4 +71,6 @@ public abstract class AbstractClusterPointReader implements IPointReader {
}
return null;
}
+
+ public abstract void addBatchData(BatchData batchData, boolean remoteDataFinish);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterFilterSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterFilterSeriesReader.java
index 805d3af..0c0287e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterFilterSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterFilterSeriesReader.java
@@ -95,14 +95,6 @@ public class ClusterFilterSeriesReader extends AbstractClusterPointReader {
//Do nothing
}
- public Path getSeriesPath() {
- return seriesPath;
- }
-
- public void setSeriesPath(Path seriesPath) {
- this.seriesPath = seriesPath;
- }
-
public TSDataType getDataType() {
return dataType;
}
@@ -111,14 +103,7 @@ public class ClusterFilterSeriesReader extends AbstractClusterPointReader {
this.dataType = dataType;
}
- public BatchData getCurrentBatchData() {
- return currentBatchData;
- }
-
- public void setCurrentBatchData(BatchData currentBatchData) {
- this.currentBatchData = currentBatchData;
- }
-
+ @Override
public void addBatchData(BatchData batchData, boolean remoteDataFinish) {
batchDataList.addLast(batchData);
this.remoteDataFinish = remoteDataFinish;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterSelectSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterSelectSeriesReader.java
index 0a507d5..c640b53 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterSelectSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterSelectSeriesReader.java
@@ -119,14 +119,6 @@ public class ClusterSelectSeriesReader extends AbstractClusterPointReader implem
batchDataList = null;
}
- public Path getSeriesPath() {
- return seriesPath;
- }
-
- public void setSeriesPath(Path seriesPath) {
- this.seriesPath = seriesPath;
- }
-
public TSDataType getDataType() {
return dataType;
}
@@ -135,27 +127,12 @@ public class ClusterSelectSeriesReader extends AbstractClusterPointReader implem
this.dataType = dataType;
}
- public BatchData getCurrentBatchData() {
- return currentBatchData;
- }
-
- public void setCurrentBatchData(BatchData currentBatchData) {
- this.currentBatchData = currentBatchData;
- }
-
+ @Override
public void addBatchData(BatchData batchData, boolean remoteDataFinish) {
batchDataList.addLast(batchData);
this.remoteDataFinish = remoteDataFinish;
}
- public boolean isRemoteDataFinish() {
- return remoteDataFinish;
- }
-
- public void setRemoteDataFinish(boolean remoteDataFinish) {
- this.remoteDataFinish = remoteDataFinish;
- }
-
/**
* Check if this series need to fetch data from remote query node
*/
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/timegenerator/ClusterNodeConstructor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/timegenerator/ClusterNodeConstructor.java
index 639dce8..2b3ab18 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/timegenerator/ClusterNodeConstructor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/timegenerator/ClusterNodeConstructor.java
@@ -25,7 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
-import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterGroupEntity;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity;
import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterFilterSeriesReader;
import org.apache.iotdb.cluster.utils.QPExecutorUtils;
import org.apache.iotdb.db.exception.FileNodeManagerException;
@@ -65,7 +65,7 @@ public class ClusterNodeConstructor extends AbstractNodeConstructor {
* Init filter series reader
*/
private void init(ClusterRpcSingleQueryManager queryManager) {
- Map<String, FilterGroupEntity> filterGroupEntityMap = queryManager.getFilterGroupEntityMap();
+ Map<String, FilterSeriesGroupEntity> filterGroupEntityMap = queryManager.getFilterSeriesGroupEntityMap();
filterGroupEntityMap.forEach(
(key, value) -> filterSeriesReadersByGroupId.put(key, value.getFilterSeriesReaders()));
filterSeriesReadersByGroupId.forEach((key, value) -> filterSeriesReaderIndex.put(key, 0));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
index a0ee256..0f05cf2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.iotdb.cluster.query.utils;
import org.apache.iotdb.cluster.query.manager.common.FillBatchData;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ExpressionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ExpressionUtils.java
index 0024138..4089e9b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ExpressionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ExpressionUtils.java
@@ -26,7 +26,7 @@ import static org.apache.iotdb.tsfile.read.expression.ExpressionType.TRUE;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.cluster.query.expression.TrueExpression;
-import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterGroupEntity;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity;
import org.apache.iotdb.cluster.utils.QPExecutorUtils;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -46,17 +46,17 @@ public class ExpressionUtils {
* Get all series path of expression group by group id
*/
public static void getAllExpressionSeries(IExpression expression,
- Map<String, FilterGroupEntity> filterGroupEntityMap)
+ Map<String, FilterSeriesGroupEntity> filterGroupEntityMap)
throws PathErrorException {
if (expression.getType() == ExpressionType.SERIES) {
Path path = ((SingleSeriesExpression) expression).getSeriesPath();
String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice());
if (!filterGroupEntityMap.containsKey(groupId)) {
- filterGroupEntityMap.put(groupId, new FilterGroupEntity(groupId));
+ filterGroupEntityMap.put(groupId, new FilterSeriesGroupEntity(groupId));
}
- FilterGroupEntity filterGroupEntity = filterGroupEntityMap.get(groupId);
- filterGroupEntity.addFilterPaths(path);
- filterGroupEntity.addFilter(((SingleSeriesExpression) expression).getFilter());
+ FilterSeriesGroupEntity filterSeriesGroupEntity = filterGroupEntityMap.get(groupId);
+ filterSeriesGroupEntity.addFilterPaths(path);
+ filterSeriesGroupEntity.addFilter(((SingleSeriesExpression) expression).getFilter());
} else if (expression.getType() == OR || expression.getType() == AND) {
getAllExpressionSeries(((IBinaryExpression) expression).getLeft(), filterGroupEntityMap);
getAllExpressionSeries(((IBinaryExpression) expression).getRight(), filterGroupEntityMap);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
index 5fbd30c..3a2746f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
@@ -25,7 +25,8 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
-import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterGroupEntity;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.SelectSeriesGroupEntity;
import org.apache.iotdb.cluster.utils.QPExecutorUtils;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
@@ -84,24 +85,24 @@ public class QueryPlanPartitionUtils {
private static void splitQueryPlanBySelectPath(ClusterRpcSingleQueryManager singleQueryManager)
throws PathErrorException {
QueryPlan queryPlan = singleQueryManager.getOriginQueryPlan();
- Map<String, List<Path>> selectSeriesByGroupId = singleQueryManager.getSelectSeriesByGroupId();
- Map<String, QueryPlan> selectPathPlans = singleQueryManager.getSelectPathPlans();
+ // split query plan by select path
+ Map<String, SelectSeriesGroupEntity> selectGroupEntityMap = singleQueryManager
+ .getSelectSeriesGroupEntityMap();
List<Path> selectPaths = queryPlan.getPaths();
for (Path path : selectPaths) {
String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice());
- if (!selectSeriesByGroupId.containsKey(groupId)) {
- selectSeriesByGroupId.put(groupId, new ArrayList<>());
+ if (!selectGroupEntityMap.containsKey(groupId)) {
+ selectGroupEntityMap.put(groupId, new SelectSeriesGroupEntity(groupId));
}
- selectSeriesByGroupId.get(groupId).add(path);
+ selectGroupEntityMap.get(groupId).addSelectPaths(path);
}
- for (Entry<String, List<Path>> entry : selectSeriesByGroupId.entrySet()) {
- String groupId = entry.getKey();
- List<Path> paths = entry.getValue();
+ for (SelectSeriesGroupEntity entity : selectGroupEntityMap.values()) {
+ List<Path> paths = entity.getSelectPaths();
QueryPlan subQueryPlan = new QueryPlan();
subQueryPlan.setProposer(queryPlan.getProposer());
subQueryPlan.setPaths(paths);
subQueryPlan.setExpression(queryPlan.getExpression());
- selectPathPlans.put(groupId, subQueryPlan);
+ entity.setQueryPlan(subQueryPlan);
}
}
@@ -113,12 +114,12 @@ public class QueryPlanPartitionUtils {
throws PathErrorException {
QueryPlan queryPlan = singleQueryManager.getOriginQueryPlan();
// split query plan by filter path
- Map<String, FilterGroupEntity> filterGroupEntityMap = singleQueryManager
- .getFilterGroupEntityMap();
+ Map<String, FilterSeriesGroupEntity> filterGroupEntityMap = singleQueryManager
+ .getFilterSeriesGroupEntityMap();
IExpression expression = queryPlan.getExpression();
ExpressionUtils.getAllExpressionSeries(expression, filterGroupEntityMap);
- for (FilterGroupEntity filterGroupEntity : filterGroupEntityMap.values()) {
- List<Path> filterSeriesList = filterGroupEntity.getFilterPaths();
+ for (FilterSeriesGroupEntity filterSeriesGroupEntity : filterGroupEntityMap.values()) {
+ List<Path> filterSeriesList = filterSeriesGroupEntity.getFilterPaths();
// create filter sub query plan
QueryPlan subQueryPlan = new QueryPlan();
subQueryPlan.setPaths(filterSeriesList);
@@ -127,7 +128,7 @@ public class QueryPlanPartitionUtils {
if (subExpression.getType() != ExpressionType.TRUE) {
subQueryPlan.setExpression(subExpression);
}
- filterGroupEntity.setQueryPlan(subQueryPlan);
+ filterSeriesGroupEntity.setQueryPlan(subQueryPlan);
}
}
@@ -157,29 +158,30 @@ public class QueryPlanPartitionUtils {
AggregationPlan queryPlan = (AggregationPlan) singleQueryManager.getOriginQueryPlan();
List<Path> selectPaths = queryPlan.getPaths();
List<String> aggregations = queryPlan.getAggregations();
- Map<String, List<Path>> selectSeriesByGroupId = singleQueryManager.getSelectSeriesByGroupId();
Map<String, List<String>> selectAggregationByGroupId = new HashMap<>();
- Map<String, QueryPlan> selectPathPlans = singleQueryManager.getSelectPathPlans();
+ Map<String, SelectSeriesGroupEntity> selectGroupEntityMap = singleQueryManager
+ .getSelectSeriesGroupEntityMap();
for (int i = 0; i < selectPaths.size(); i++) {
Path path = selectPaths.get(i);
String aggregation = aggregations.get(i);
String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice());
- if (!selectSeriesByGroupId.containsKey(groupId)) {
- selectSeriesByGroupId.put(groupId, new ArrayList<>());
+ if (!selectGroupEntityMap.containsKey(groupId)) {
+ selectGroupEntityMap.put(groupId, new SelectSeriesGroupEntity(groupId));
selectAggregationByGroupId.put(groupId, new ArrayList<>());
}
selectAggregationByGroupId.get(groupId).add(aggregation);
- selectSeriesByGroupId.get(groupId).add(path);
+ selectGroupEntityMap.get(groupId).addSelectPaths(path);
}
- for (Entry<String, List<Path>> entry : selectSeriesByGroupId.entrySet()) {
+ for (Entry<String, SelectSeriesGroupEntity> entry : selectGroupEntityMap.entrySet()) {
String groupId = entry.getKey();
- List<Path> paths = entry.getValue();
+ SelectSeriesGroupEntity entity = entry.getValue();
+ List<Path> paths = entity.getSelectPaths();
AggregationPlan subQueryPlan = new AggregationPlan();
subQueryPlan.setProposer(queryPlan.getProposer());
subQueryPlan.setPaths(paths);
subQueryPlan.setExpression(queryPlan.getExpression());
subQueryPlan.setAggregations(selectAggregationByGroupId.get(groupId));
- selectPathPlans.put(groupId, subQueryPlan);
+ entity.setQueryPlan(subQueryPlan);
}
}
@@ -200,25 +202,24 @@ public class QueryPlanPartitionUtils {
throws PathErrorException {
FillQueryPlan fillQueryPlan = (FillQueryPlan) singleQueryManager.getOriginQueryPlan();
List<Path> selectPaths = fillQueryPlan.getPaths();
- Map<String, List<Path>> selectSeriesByGroupId = singleQueryManager.getSelectSeriesByGroupId();
- Map<String, QueryPlan> selectPathPlans = singleQueryManager.getSelectPathPlans();
+ Map<String, SelectSeriesGroupEntity> selectGroupEntityMap = singleQueryManager
+ .getSelectSeriesGroupEntityMap();
for (Path path : selectPaths) {
String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice());
- if (!selectSeriesByGroupId.containsKey(groupId)) {
- selectSeriesByGroupId.put(groupId, new ArrayList<>());
+ if (!selectGroupEntityMap.containsKey(groupId)) {
+ selectGroupEntityMap.put(groupId, new SelectSeriesGroupEntity(groupId));
}
- selectSeriesByGroupId.get(groupId).add(path);
+ selectGroupEntityMap.get(groupId).addSelectPaths(path);
}
- for (Entry<String, List<Path>> entry : selectSeriesByGroupId.entrySet()) {
- String groupId = entry.getKey();
- List<Path> paths = entry.getValue();
+ for (SelectSeriesGroupEntity entity : selectGroupEntityMap.values()) {
+ List<Path> paths = entity.getSelectPaths();
FillQueryPlan subQueryPlan = new FillQueryPlan();
subQueryPlan.setProposer(fillQueryPlan.getProposer());
subQueryPlan.setPaths(paths);
subQueryPlan.setExpression(fillQueryPlan.getExpression());
subQueryPlan.setQueryTime(fillQueryPlan.getQueryTime());
subQueryPlan.setFillType(new EnumMap<>(fillQueryPlan.getFillType()));
- selectPathPlans.put(groupId, subQueryPlan);
+ entity.setQueryPlan(subQueryPlan);
}
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterRpcManagerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterRpcManagerTest.java
index b800fbf..4745553 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterRpcManagerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterRpcManagerTest.java
@@ -39,7 +39,8 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.entity.Server;
import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcQueryManager;
import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
-import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterGroupEntity;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.SelectSeriesGroupEntity;
import org.apache.iotdb.cluster.utils.EnvironmentUtils;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.jdbc.Config;
@@ -261,25 +262,8 @@ public class ClusterRpcManagerTest {
assertEquals(taskId, singleManager.getTaskId());
// select path plans
- Map<String, QueryPlan> selectPathPlans = singleManager.getSelectPathPlans();
- assertEquals(1, selectPathPlans.size());
- for (QueryPlan queryPlan : selectPathPlans.values()) {
- List<Path> paths = queryPlan.getPaths();
- List<Path> correctPaths = new ArrayList<>();
- correctPaths.add(new Path("root.vehicle.d0.s0"));
- correctPaths.add(new Path("root.vehicle.d0.s1"));
- correctPaths.add(new Path("root.vehicle.d0.s3"));
- assertEquals(correctPaths, paths);
- assertNull(queryPlan.getExpression());
- }
-
- // select series by group id
- assertEquals(0, singleManager.getSelectSeriesByGroupId().size());
-
- // select series reader
- assertTrue(singleManager
- .getSelectSeriesReaders().isEmpty());
-
+ Map<String, SelectSeriesGroupEntity> selectSeriesGroupEntityMap = singleManager.getSelectSeriesGroupEntityMap();
+ assertTrue(selectSeriesGroupEntityMap.isEmpty());
}
statement.close();
}
@@ -304,27 +288,11 @@ public class ClusterRpcManagerTest {
assertEquals(taskId, singleManager.getTaskId());
// select path plans
- Map<String, QueryPlan> selectPathPlans = singleManager.getSelectPathPlans();
- assertEquals(1, selectPathPlans.size());
- for (QueryPlan queryPlan : selectPathPlans.values()) {
- List<Path> paths = queryPlan.getPaths();
- List<Path> correctPaths = new ArrayList<>();
- correctPaths.add(new Path("root.vehicle.d0.s0"));
- correctPaths.add(new Path("root.vehicle.d0.s1"));
- correctPaths.add(new Path("root.vehicle.d0.s3"));
- assertEquals(correctPaths, paths);
- assertNotNull(queryPlan.getExpression());
- }
-
- // select series by group id
- assertTrue(singleManager.getSelectSeriesByGroupId().isEmpty());
-
- // select series reader
- assertTrue(singleManager
- .getSelectSeriesReaders().isEmpty());
+ Map<String, SelectSeriesGroupEntity> selectSeriesGroupEntityMap = singleManager.getSelectSeriesGroupEntityMap();
+ assertTrue(selectSeriesGroupEntityMap.isEmpty());
// filter path plans
- Map<String, FilterGroupEntity> filterGroupEntityMap = singleManager.getFilterGroupEntityMap();
+ Map<String, FilterSeriesGroupEntity> filterGroupEntityMap = singleManager.getFilterSeriesGroupEntityMap();
assertTrue(filterGroupEntityMap.isEmpty());
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtilsTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtilsTest.java
index a0d409b..363ef98 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtilsTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtilsTest.java
@@ -39,8 +39,11 @@ import org.apache.iotdb.cluster.entity.Server;
import org.apache.iotdb.cluster.qp.executor.ClusterQueryProcessExecutor;
import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcQueryManager;
import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
-import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterGroupEntity;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.SelectSeriesGroupEntity;
import org.apache.iotdb.cluster.utils.EnvironmentUtils;
+import org.apache.iotdb.cluster.utils.QPExecutorUtils;
+import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
import org.apache.iotdb.db.qp.QueryProcessor;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.jdbc.Config;
@@ -60,6 +63,9 @@ public class QueryPlanPartitionUtilsTest {
private static ClusterRpcQueryManager manager = ClusterRpcQueryManager.getInstance();
private ClusterQueryProcessExecutor queryDataExecutor = new ClusterQueryProcessExecutor();
private QueryProcessor queryProcessor = new QueryProcessor(queryDataExecutor);
+ private static final PhysicalNode localNode = new PhysicalNode(CLUSTER_CONFIG.getIp(),
+ CLUSTER_CONFIG.getPort());
+
private static final String URL = "127.0.0.1:6667/";
@@ -105,6 +111,7 @@ public class QueryPlanPartitionUtilsTest {
EnvironmentUtils.cleanEnv();
EnvironmentUtils.closeStatMonitor();
EnvironmentUtils.closeMemControl();
+ QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
CLUSTER_CONFIG.createAllPath();
server = Server.getInstance();
server.start();
@@ -115,6 +122,7 @@ public class QueryPlanPartitionUtilsTest {
@After
public void tearDown() throws Exception {
server.stop();
+ QPExecutorUtils.setLocalNodeAddr(localNode.getIp(), localNode.getPort());
EnvironmentUtils.cleanEnv();
}
@@ -230,14 +238,14 @@ public class QueryPlanPartitionUtilsTest {
}
@Test
- public void splitQueryPlanWithoutValueFilter() throws Exception{
+ public void splitQueryPlanWithoutValueFilter() throws Exception {
try (Connection connection = DriverManager
.getConnection(Config.IOTDB_URL_PREFIX + URL, "root", "root")) {
insertData(connection, createSQLs, insertSQLs);
initCorrectResult();
- for(int i = 0 ; i < queryStatementsWithoutFilters.length; i++) {
+ for (int i = 0; i < queryStatementsWithoutFilters.length; i++) {
String queryStatementsWithoutFilter = queryStatementsWithoutFilters[i];
- try(Statement statement = connection.createStatement()) {
+ try (Statement statement = connection.createStatement()) {
boolean hasResultSet = statement.execute(queryStatementsWithoutFilter);
assertTrue(hasResultSet);
ResultSet resultSet = statement.getResultSet();
@@ -256,14 +264,15 @@ public class QueryPlanPartitionUtilsTest {
assertEquals(taskId, String.format("%s:%d", LOCAL_ADDR, jobId));
ClusterRpcSingleQueryManager singleQueryManager = ClusterRpcQueryManager.getInstance()
.getSingleQuery(jobId);
- assertTrue(singleQueryManager.getFilterGroupEntityMap().isEmpty());
- Map<String, QueryPlan> selectPathPlans = singleQueryManager.getSelectPathPlans();
- assertFalse(selectPathPlans.isEmpty());
- for(Entry<String, QueryPlan> entry1: selectPathPlans.entrySet()){
- QueryPlan queryPlan = entry1.getValue();
+ assertTrue(singleQueryManager.getFilterSeriesGroupEntityMap().isEmpty());
+ Map<String, SelectSeriesGroupEntity> selectSeriesGroupEntityMap = singleQueryManager
+ .getSelectSeriesGroupEntityMap();
+ assertFalse(selectSeriesGroupEntityMap.isEmpty());
+ for (SelectSeriesGroupEntity entity : selectSeriesGroupEntityMap.values()) {
+ QueryPlan queryPlan = entity.getQueryPlan();
QueryPlan correctQueryPlan = withoutFilterResults.get(i + 1);
assertTrue(correctQueryPlan.getPaths().containsAll(queryPlan.getPaths()));
- assertEquals(correctQueryPlan.getExpression(),queryPlan.getExpression());
+ assertEquals(correctQueryPlan.getExpression(), queryPlan.getExpression());
assertEquals(correctQueryPlan.isQuery(), queryPlan.isQuery());
assertEquals(correctQueryPlan.getOperatorType(), queryPlan.getOperatorType());
assertEquals(correctQueryPlan.getAggregations(), queryPlan.getAggregations());
@@ -275,14 +284,14 @@ public class QueryPlanPartitionUtilsTest {
}
@Test
- public void splitQueryPlanWithValueFilter() throws Exception{
+ public void splitQueryPlanWithValueFilter() throws Exception {
try (Connection connection = DriverManager
.getConnection(Config.IOTDB_URL_PREFIX + URL, "root", "root")) {
insertData(connection, createSQLs, insertSQLs);
initCorrectResult();
- for(int i = 0 ; i < queryStatementsWithFilters.length; i++) {
+ for (int i = 0; i < queryStatementsWithFilters.length; i++) {
String queryStatementsWithoutFilter = queryStatementsWithFilters[i];
- try(Statement statement = connection.createStatement()) {
+ try (Statement statement = connection.createStatement()) {
boolean hasResultSet = statement.execute(queryStatementsWithoutFilter);
assertTrue(hasResultSet);
ResultSet resultSet = statement.getResultSet();
@@ -301,21 +310,24 @@ public class QueryPlanPartitionUtilsTest {
assertEquals(taskId, String.format("%s:%d", LOCAL_ADDR, jobId));
ClusterRpcSingleQueryManager singleQueryManager = ClusterRpcQueryManager.getInstance()
.getSingleQuery(jobId);
- assertTrue(singleQueryManager.getFilterGroupEntityMap().isEmpty());
- Map<String, QueryPlan> selectPathPlans = singleQueryManager.getSelectPathPlans();
- assertFalse(selectPathPlans.isEmpty());
- for(Entry<String, QueryPlan> entry1 : selectPathPlans.entrySet()) {
- QueryPlan queryPlan = entry1.getValue();
+ assertFalse(singleQueryManager.getFilterSeriesGroupEntityMap().isEmpty());
+ Map<String, SelectSeriesGroupEntity> selectSeriesGroupEntityMap = singleQueryManager
+ .getSelectSeriesGroupEntityMap();
+ assertFalse(selectSeriesGroupEntityMap.isEmpty());
+ for (SelectSeriesGroupEntity entity : selectSeriesGroupEntityMap.values()) {
+ QueryPlan queryPlan = entity.getQueryPlan();
QueryPlan correctQueryPlan = withFilterSelectResults.get(i + 1);
assertTrue(correctQueryPlan.getPaths().containsAll(queryPlan.getPaths()));
- assertEquals(correctQueryPlan.getExpression().getType(), queryPlan.getExpression().getType());
+ assertEquals(correctQueryPlan.getExpression().getType(),
+ queryPlan.getExpression().getType());
assertEquals(correctQueryPlan.isQuery(), queryPlan.isQuery());
assertEquals(correctQueryPlan.getOperatorType(), queryPlan.getOperatorType());
assertEquals(correctQueryPlan.getAggregations(), queryPlan.getAggregations());
}
- Map<String, FilterGroupEntity> filterGroupEntityMap = singleQueryManager.getFilterGroupEntityMap();
- for (FilterGroupEntity filterGroupEntity:filterGroupEntityMap.values()) {
- QueryPlan queryPlan = filterGroupEntity.getQueryPlan();
+ Map<String, FilterSeriesGroupEntity> filterGroupEntityMap = singleQueryManager
+ .getFilterSeriesGroupEntityMap();
+ for (FilterSeriesGroupEntity filterSeriesGroupEntity : filterGroupEntityMap.values()) {
+ QueryPlan queryPlan = filterSeriesGroupEntity.getQueryPlan();
QueryPlan correctQueryPlan = withFilterFilterResults.get(i + 1);
assertTrue(correctQueryPlan.getPaths().containsAll(queryPlan.getPaths()));
assertEquals(correctQueryPlan.getExpression().getType(),