You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/05/21 01:51:34 UTC
[incubator-iotdb] 07/11: add it test of aggregation function
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 9487dbfc0a68fb62efa5270cc4fb080ea4e2a3a0
Author: lta <li...@163.com>
AuthorDate: Mon May 20 17:27:57 2019 +0800
add it test of aggregation function
---
.../query/{manager => }/common/FillBatchData.java | 2 +-
.../executor/ClusterAggregateEngineExecutor.java | 1 -
.../ClusterRpcSingleQueryManager.java | 26 ++---
.../IClusterRpcSingleQueryManager.java | 4 +-
.../coordinatornode/SelectSeriesGroupEntity.java | 1 -
.../querynode/ClusterLocalSingleQueryManager.java | 76 +++++++++------
.../coordinatornode/ClusterFilterSeriesReader.java | 2 +-
.../ClusterFillSelectSeriesBatchReader.java | 2 +-
...a => ClusterFilterSeriesBatchReaderEntity.java} | 6 +-
...a => ClusterSelectSeriesBatchReaderEntity.java} | 42 +++++++--
... => IClusterFilterSeriesBatchReaderEntity.java} | 2 +-
.../query/utils/ClusterTimeValuePairUtils.java | 2 +-
.../QuerySeriesDataByTimestampRequest.java | 17 +---
.../request/querydata/QuerySeriesDataRequest.java | 16 ++--
.../integration/IoTDBAggregationLargeDataIT.java | 3 +-
.../integration/IoTDBAggregationSmallDataIT.java | 94 ------------------
.../query/manager/ClusterLocalManagerTest.java | 105 +++++++++++----------
17 files changed, 160 insertions(+), 241 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/common/FillBatchData.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/common/FillBatchData.java
similarity index 97%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/manager/common/FillBatchData.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/common/FillBatchData.java
index 3e128e3..2d17d0c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/common/FillBatchData.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/common/FillBatchData.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.query.manager.common;
+package org.apache.iotdb.cluster.query.common;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.BatchData;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
index b34afa1..2cf4e87 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
@@ -69,7 +69,6 @@ import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor {
private ClusterRpcSingleQueryManager queryManager;
- private static final ClusterConfig CLUSTER_CONF = ClusterDescriptor.getInstance().getConfig();
public ClusterAggregateEngineExecutor(List<Path> selectedSeries, List<String> aggres,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
index 905ce1b..05bf9df 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
@@ -211,29 +211,26 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
@Override
public void fetchBatchDataForSelectPaths(String groupId) throws RaftConnectionException {
- List<String> fetchDataSeries = new ArrayList<>();
- List<Integer> selectSeriesIndexs = new ArrayList<>();
+ List<Integer> fetchDataSeriesIndexs = new ArrayList<>();
List<Path> selectSeries = selectSeriesGroupEntityMap.get(groupId).getSelectPaths();
List<ClusterSelectSeriesReader> seriesReaders = selectSeriesGroupEntityMap.get(groupId)
.getSelectSeriesReaders();
for (int i = 0; i < selectSeries.size(); i++) {
- Path series = selectSeries.get(i);
if (seriesReaders.get(i).enableFetchData()) {
- fetchDataSeries.add(series.getFullPath());
- selectSeriesIndexs.add(i);
+ fetchDataSeriesIndexs.add(i);
}
}
BasicRequest request = QuerySeriesDataRequest
- .createFetchDataRequest(groupId, taskId, PathType.SELECT_PATH, fetchDataSeries,
+ .createFetchDataRequest(groupId, taskId, PathType.SELECT_PATH, fetchDataSeriesIndexs,
queryRounds++);
QuerySeriesDataResponse response = (QuerySeriesDataResponse) ClusterRpcReaderUtils
.handleQueryRequest(request, queryNodes.get(groupId), 0);
- handleFetchDataResponseForSelectPaths(groupId, selectSeriesIndexs, response);
+ handleFetchDataResponseForSelectPaths(groupId, fetchDataSeriesIndexs, response);
}
@Override
- public void fetchBatchDataForFilterPaths(String groupId) throws RaftConnectionException {
+ public void fetchBatchDataForAllFilterPaths(String groupId) throws RaftConnectionException {
BasicRequest request = QuerySeriesDataRequest
.createFetchDataRequest(groupId, taskId, PathType.FILTER_PATH, null, queryRounds++);
QuerySeriesDataResponse response = (QuerySeriesDataResponse) ClusterRpcReaderUtils
@@ -248,27 +245,22 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
throws RaftConnectionException {
for (Entry<String, SelectSeriesGroupEntity> entry : selectSeriesGroupEntityMap.entrySet()) {
String groupId = entry.getKey();
- List<String> fetchDataFilterSeries = new ArrayList<>();
- entry.getValue().getSelectPaths()
- .forEach(path -> fetchDataFilterSeries.add(path.getFullPath()));
BasicRequest request = QuerySeriesDataByTimestampRequest
- .createRequest(groupId, queryRounds++, taskId, batchTimestamp, fetchDataFilterSeries);
+ .createRequest(groupId, queryRounds++, taskId, batchTimestamp);
QuerySeriesDataByTimestampResponse response = (QuerySeriesDataByTimestampResponse) ClusterRpcReaderUtils
.handleQueryRequest(request, queryNodes.get(groupId), 0);
- handleFetchDataByTimestampResponseForSelectPaths(groupId, fetchDataFilterSeries, response);
+ handleFetchDataByTimestampResponseForSelectPaths(groupId, response);
}
}
/**
* Handle response of fetching data, and add batch data to corresponding reader.
*/
- private void handleFetchDataByTimestampResponseForSelectPaths(String groupId,
- List<String> fetchDataSeries,
- BasicQueryDataResponse response) {
+ private void handleFetchDataByTimestampResponseForSelectPaths(String groupId, BasicQueryDataResponse response) {
List<BatchData> batchDataList = response.getSeriesBatchData();
List<ClusterSelectSeriesReader> selectSeriesReaders = selectSeriesGroupEntityMap.get(groupId)
.getSelectSeriesReaders();
- for (int i = 0; i < fetchDataSeries.size(); i++) {
+ for (int i = 0; i < selectSeriesReaders.size(); i++) {
BatchData batchData = batchDataList.get(i);
selectSeriesReaders.get(i).addBatchData(batchData, true);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java
index d6ca0d7..19d8f25 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java
@@ -53,11 +53,11 @@ public interface IClusterRpcSingleQueryManager {
void fetchBatchDataForSelectPaths(String groupId) throws RaftConnectionException;
/**
- * Fetch data for filter path.
+ * Fetch data for all filter paths.
*
* @param groupId data group id
*/
- void fetchBatchDataForFilterPaths(String groupId) throws RaftConnectionException;
+ void fetchBatchDataForAllFilterPaths(String groupId) throws RaftConnectionException;
/**
* Fetch batch data for all select paths by batch timestamp. If target data can be fetched, skip
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/SelectSeriesGroupEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/SelectSeriesGroupEntity.java
index 9f35117..1de26bd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/SelectSeriesGroupEntity.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/SelectSeriesGroupEntity.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.cluster.query.manager.coordinatornode;
import java.util.ArrayList;
import java.util.List;
-import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterFilterSeriesReader;
import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.tsfile.read.common.Path;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
index 0f2cf62..8799be2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
@@ -30,10 +30,11 @@ import org.apache.iotdb.cluster.query.PathType;
import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory;
import org.apache.iotdb.cluster.query.reader.querynode.AbstractClusterSelectSeriesBatchReader;
import org.apache.iotdb.cluster.query.reader.querynode.ClusterFillSelectSeriesBatchReader;
-import org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReader;
+import org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReaderEntity;
import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReader;
import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReaderByTimestamp;
-import org.apache.iotdb.cluster.query.reader.querynode.IClusterFilterSeriesBatchReader;
+import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReaderEntity;
+import org.apache.iotdb.cluster.query.reader.querynode.IClusterFilterSeriesBatchReaderEntity;
import org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest;
import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataByTimestampRequest;
import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest;
@@ -79,6 +80,11 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
private String groupId;
/**
+ * Mark whether this manager has initialized or not.
+ */
+ private boolean isInit = false;
+
+ /**
* Timer of Query, if the time is up, close query resource.
*/
private ScheduledFuture<?> queryTimer;
@@ -94,14 +100,14 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
private long queryRound = -1;
/**
- * Key is series full path, value is reader of select series
+ * Select reader entity
*/
- private Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = new HashMap<>();
+ private ClusterSelectSeriesBatchReaderEntity selectReaderEntity;
/**
- * Filter reader
+ * Filter reader entity
*/
- private IClusterFilterSeriesBatchReader filterReader;
+ private IClusterFilterSeriesBatchReaderEntity filterReaderEntity;
/**
* Key is series full path, value is data type of series
@@ -127,11 +133,17 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
@Override
public InitSeriesReaderResponse createSeriesReader(InitSeriesReaderRequest request)
throws IOException, PathErrorException, FileNodeManagerException, ProcessorException, QueryFilterOptimizationException, ClassNotFoundException {
+ if (isInit) {
+ throw new IOException(String
+ .format("ClusterLocalSingleQueryManager has already initialized. Job id = %s", jobId));
+ }
+ isInit = true;
this.groupId = request.getGroupID();
InitSeriesReaderResponse response = new InitSeriesReaderResponse(groupId);
QueryContext context = new QueryContext(jobId);
Map<PathType, QueryPlan> queryPlanMap = request.getAllQueryPlan();
if (queryPlanMap.containsKey(PathType.SELECT_PATH)) {
+ selectReaderEntity = new ClusterSelectSeriesBatchReaderEntity();
QueryPlan plan = queryPlanMap.get(PathType.SELECT_PATH);
if (plan instanceof GroupByPlan) {
throw new UnsupportedOperationException();
@@ -179,8 +191,9 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
fill.setDataType(dataType);
fill.setQueryTime(fillQueryPlan.getQueryTime());
fill.constructReaders(queryDataSource, context);
- selectSeriesReaders.put(path.getFullPath(),
- new ClusterFillSelectSeriesBatchReader(dataType, fill.getFillResult()));
+ selectReaderEntity.addPath(path.getFullPath());
+ selectReaderEntity
+ .addReaders(new ClusterFillSelectSeriesBatchReader(dataType, fill.getFillResult()));
dataTypeMap.put(path.getFullPath(), dataType);
}
@@ -195,7 +208,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
*/
private void handleAggreSeriesReader(QueryPlan queryPlan, QueryContext context,
InitSeriesReaderResponse response)
- throws FileNodeManagerException, PathErrorException, IOException, QueryFilterOptimizationException, ProcessorException {
+ throws FileNodeManagerException, PathErrorException, IOException, ProcessorException {
if (queryPlan.getExpression() == null
|| queryPlan.getExpression().getType() == ExpressionType.GLOBAL_TIME) {
handleAggreSeriesReaderWithoutTimeGenerator(queryPlan, context, response);
@@ -227,7 +240,8 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
for (int i = 0; i < selectedPaths.size(); i++) {
Path path = selectedPaths.get(i);
- selectSeriesReaders.put(path.getFullPath(),
+ selectReaderEntity.addPath(path.getFullPath());
+ selectReaderEntity.addReaders(
new ClusterSelectSeriesBatchReader(dataTypes.get(i), readers.get(i)));
dataTypeMap.put(path.getFullPath(), dataTypes.get(i));
}
@@ -275,8 +289,8 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
String fullPath = paths.get(i).getFullPath();
IPointReader reader = AbstractExecutorWithoutTimeGenerator
.createSeriesReader(context, paths.get(i), dataTypes, timeFilter);
- selectSeriesReaders
- .put(fullPath, new ClusterSelectSeriesBatchReader(dataTypes.get(i), reader));
+ selectReaderEntity.addPath(fullPath);
+ selectReaderEntity.addReaders(new ClusterSelectSeriesBatchReader(dataTypes.get(i), reader));
dataTypeMap.put(fullPath, dataTypes.get(i));
}
response.getSeriesDataTypes().put(PathType.SELECT_PATH, dataTypes);
@@ -298,7 +312,8 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
dataTypeMap.put(paths.get(i).getFullPath(), dataTypes.get(i));
}
response.getSeriesDataTypes().put(pathType, dataTypes);
- filterReader = new ClusterFilterSeriesBatchReader(queryDataSet, paths, request.getFilterList());
+ filterReaderEntity = new ClusterFilterSeriesBatchReaderEntity(queryDataSet, paths,
+ request.getFilterList());
}
/**
@@ -318,9 +333,9 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
EngineReaderByTimeStamp readerByTimeStamp = ClusterSeriesReaderFactory
.createReaderByTimeStamp(path, context);
TSDataType dataType = MManager.getInstance().getSeriesType(path.getFullPath());
- selectSeriesReaders
- .put(path.getFullPath(),
- new ClusterSelectSeriesBatchReaderByTimestamp(readerByTimeStamp, dataType));
+ selectReaderEntity.addPath(path.getFullPath());
+ selectReaderEntity
+ .addReaders(new ClusterSelectSeriesBatchReaderByTimestamp(readerByTimeStamp, dataType));
dataTypeMap.put(path.getFullPath(), dataType);
dataTypeList.add(dataType);
}
@@ -336,10 +351,9 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
if (targetQueryRounds != this.queryRound) {
this.queryRound = targetQueryRounds;
PathType pathType = request.getPathType();
- List<String> paths = request.getSeriesPaths();
List<BatchData> batchDataList;
if (pathType == PathType.SELECT_PATH) {
- batchDataList = readSelectSeriesBatchData(paths);
+ batchDataList = readSelectSeriesBatchData(request.getSeriesPathIndexs());
} else {
batchDataList = readFilterSeriesBatchData();
}
@@ -355,13 +369,12 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
throws IOException {
resetQueryTimer();
QuerySeriesDataByTimestampResponse response = new QuerySeriesDataByTimestampResponse(groupId);
- List<String> fetchDataSeries = request.getFetchDataSeries();
long targetQueryRounds = request.getQueryRounds();
if (targetQueryRounds != this.queryRound) {
this.queryRound = targetQueryRounds;
+ List<AbstractClusterSelectSeriesBatchReader> readers = selectReaderEntity.getAllReaders();
List<BatchData> batchDataList = new ArrayList<>();
- for (String series : fetchDataSeries) {
- AbstractClusterSelectSeriesBatchReader reader = selectSeriesReaders.get(series);
+ for (AbstractClusterSelectSeriesBatchReader reader : readers) {
batchDataList.add(reader.nextBatch(request.getBatchTimestamp()));
}
cachedBatchDataResult = batchDataList;
@@ -378,14 +391,15 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
}
/**
- * Read batch data of select series
+ * Read batch data of select series by series index
*
- * @param paths all series to query
+ * @param seriesIndexs all series index to query
*/
- private List<BatchData> readSelectSeriesBatchData(List<String> paths) throws IOException {
+ private List<BatchData> readSelectSeriesBatchData(List<Integer> seriesIndexs) throws IOException {
List<BatchData> batchDataList = new ArrayList<>();
- for (String fullPath : paths) {
- batchDataList.add(selectSeriesReaders.get(fullPath).nextBatch());
+ for (int index : seriesIndexs) {
+ AbstractClusterSelectSeriesBatchReader reader = selectReaderEntity.getReaderByIndex(index);
+ batchDataList.add(reader.nextBatch());
}
return batchDataList;
}
@@ -396,7 +410,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
* @return batch data of all filter series
*/
private List<BatchData> readFilterSeriesBatchData() throws IOException {
- return filterReader.nextBatchList();
+ return filterReaderEntity.nextBatchList();
}
public String getGroupId() {
@@ -417,12 +431,12 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
return queryRound;
}
- public Map<String, AbstractClusterSelectSeriesBatchReader> getSelectSeriesReaders() {
- return selectSeriesReaders;
+ public ClusterSelectSeriesBatchReaderEntity getSelectReaderEntity() {
+ return selectReaderEntity;
}
- public IClusterFilterSeriesBatchReader getFilterReader() {
- return filterReader;
+ public IClusterFilterSeriesBatchReaderEntity getFilterReaderEntity() {
+ return filterReaderEntity;
}
public Map<String, TSDataType> getDataTypeMap() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterFilterSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterFilterSeriesReader.java
index 0c0287e..9d60ae2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterFilterSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterFilterSeriesReader.java
@@ -83,7 +83,7 @@ public class ClusterFilterSeriesReader extends AbstractClusterPointReader {
@Override
protected void updateCurrentBatchData() throws RaftConnectionException {
if (batchDataList.isEmpty() && !remoteDataFinish) {
- queryManager.fetchBatchDataForFilterPaths(groupId);
+ queryManager.fetchBatchDataForAllFilterPaths(groupId);
}
if (!batchDataList.isEmpty()) {
currentBatchData = batchDataList.removeFirst();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java
index 55639a1..fadd92f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.cluster.query.reader.querynode;
import java.io.IOException;
-import org.apache.iotdb.cluster.query.manager.common.FillBatchData;
+import org.apache.iotdb.cluster.query.common.FillBatchData;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReaderEntity.java
similarity index 93%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReader.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReaderEntity.java
index 1cd357e..65f8c1c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReaderEntity.java
@@ -32,9 +32,9 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
/**
- * Batch reader for all filter paths.
+ * Batch reader entity for all filter paths.
*/
-public class ClusterFilterSeriesBatchReader implements IClusterFilterSeriesBatchReader {
+public class ClusterFilterSeriesBatchReaderEntity implements IClusterFilterSeriesBatchReaderEntity {
private List<Path> allFilterPath;
@@ -44,7 +44,7 @@ public class ClusterFilterSeriesBatchReader implements IClusterFilterSeriesBatch
private static final ClusterConfig CLUSTER_CONF = ClusterDescriptor.getInstance().getConfig();
- public ClusterFilterSeriesBatchReader(QueryDataSet queryDataSet, List<Path> allFilterPath,
+ public ClusterFilterSeriesBatchReaderEntity(QueryDataSet queryDataSet, List<Path> allFilterPath,
List<Filter> filters) {
this.queryDataSet = queryDataSet;
this.allFilterPath = allFilterPath;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderEntity.java
similarity index 52%
copy from cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReader.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderEntity.java
index 218d68b..f0dea38 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderEntity.java
@@ -18,19 +18,45 @@
*/
package org.apache.iotdb.cluster.query.reader.querynode;
-import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
-import org.apache.iotdb.tsfile.read.common.BatchData;
/**
- * Batch reader for filter series which is used in query node.
+ * Batch reader entity for all select paths.
*/
-public interface IClusterFilterSeriesBatchReader {
-
- boolean hasNext() throws IOException;
+public class ClusterSelectSeriesBatchReaderEntity {
+ /**
+ * All select paths
+ */
+ List<String> paths;
/**
- * Get next batch data of all filter series.
+ * All select readers
*/
- List<BatchData> nextBatchList() throws IOException;
+ List<AbstractClusterSelectSeriesBatchReader> readers;
+
+ public ClusterSelectSeriesBatchReaderEntity() {
+ paths = new ArrayList<>();
+ readers = new ArrayList<>();
+ }
+
+ public void addPath(String path) {
+ this.paths.add(path);
+ }
+
+ public void addReaders(AbstractClusterSelectSeriesBatchReader reader) {
+ this.readers.add(reader);
+ }
+
+ public List<AbstractClusterSelectSeriesBatchReader> getAllReaders() {
+ return readers;
+ }
+
+ public AbstractClusterSelectSeriesBatchReader getReaderByIndex(int index){
+ return readers.get(index);
+ }
+
+ public List<String> getAllPaths() {
+ return paths;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReaderEntity.java
similarity index 95%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReader.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReaderEntity.java
index 218d68b..a045e2a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterFilterSeriesBatchReaderEntity.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
/**
* Batch reader for filter series which is used in query node.
*/
-public interface IClusterFilterSeriesBatchReader {
+public interface IClusterFilterSeriesBatchReaderEntity {
boolean hasNext() throws IOException;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
index 0f05cf2..7525368 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.cluster.query.utils;
-import org.apache.iotdb.cluster.query.manager.common.FillBatchData;
+import org.apache.iotdb.cluster.query.common.FillBatchData;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
import org.apache.iotdb.tsfile.read.common.BatchData;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataByTimestampRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataByTimestampRequest.java
index 351e6eb..cbcef15 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataByTimestampRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataByTimestampRequest.java
@@ -39,21 +39,16 @@ public class QuerySeriesDataByTimestampRequest extends BasicQueryRequest {
*/
private List<Long> batchTimestamp;
- /**
- * Series to fetch data from remote query node
- */
- private List<String> fetchDataSeries;
-
private QuerySeriesDataByTimestampRequest(String groupID) {
super(groupID);
}
- public static QuerySeriesDataByTimestampRequest createRequest(String groupId, long queryRounds, String taskId, List<Long> batchTimestamp, List<String> fetchDataSeries){
+ public static QuerySeriesDataByTimestampRequest createRequest(String groupId, long queryRounds,
+ String taskId, List<Long> batchTimestamp) {
QuerySeriesDataByTimestampRequest request = new QuerySeriesDataByTimestampRequest(groupId);
request.queryRounds = queryRounds;
request.taskId = taskId;
request.batchTimestamp = batchTimestamp;
- request.fetchDataSeries = fetchDataSeries;
return request;
}
@@ -80,12 +75,4 @@ public class QuerySeriesDataByTimestampRequest extends BasicQueryRequest {
public void setBatchTimestamp(List<Long> batchTimestamp) {
this.batchTimestamp = batchTimestamp;
}
-
- public List<String> getFetchDataSeries() {
- return fetchDataSeries;
- }
-
- public void setFetchDataSeries(List<String> fetchDataSeries) {
- this.fetchDataSeries = fetchDataSeries;
- }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataRequest.java
index 554b8c1..e0fc23c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataRequest.java
@@ -46,9 +46,9 @@ public class QuerySeriesDataRequest extends BasicQueryRequest {
private PathType pathType;
/**
- * Key is series type, value is series list
+ * list of series path index.
*/
- private List<String> seriesPaths = new ArrayList<>();
+ private List<Integer> seriesPathIndexs = new ArrayList<>();
private QuerySeriesDataRequest(String groupID, String taskId) {
super(groupID);
@@ -56,10 +56,10 @@ public class QuerySeriesDataRequest extends BasicQueryRequest {
}
public static QuerySeriesDataRequest createFetchDataRequest(String groupId, String taskId,
- PathType pathType, List<String> seriesPaths, long queryRounds) {
+ PathType pathType, List<Integer> seriesPathIndexs, long queryRounds) {
QuerySeriesDataRequest request = new QuerySeriesDataRequest(groupId, taskId);
request.pathType = pathType;
- request.seriesPaths = seriesPaths;
+ request.seriesPathIndexs = seriesPathIndexs;
request.queryRounds = queryRounds;
return request;
}
@@ -88,11 +88,7 @@ public class QuerySeriesDataRequest extends BasicQueryRequest {
this.pathType = pathType;
}
- public List<String> getSeriesPaths() {
- return seriesPaths;
- }
-
- public void setSeriesPaths(List<String> seriesPaths) {
- this.seriesPaths = seriesPaths;
+ public List<Integer> getSeriesPathIndexs() {
+ return seriesPathIndexs;
}
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationLargeDataIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationLargeDataIT.java
index 494029c..45ab923 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationLargeDataIT.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationLargeDataIT.java
@@ -163,7 +163,6 @@ public class IoTDBAggregationLargeDataIT {
}
@Test
- @Ignore
public void remoteTest() throws ClassNotFoundException, SQLException {
QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
insertSQL();
@@ -183,7 +182,7 @@ public class IoTDBAggregationLargeDataIT {
maxTimeAggreWithMultiFilterTest();
minValueAggreWithMultiFilterTest();
maxValueAggreWithMultiFilterTest();
-// meanAggreWithMultiFilterTest();
+ meanAggreWithMultiFilterTest();
sumAggreWithMultiFilterTest();
firstAggreWithMultiFilterTest();
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
index 02dc01f..162c5ac 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
@@ -181,13 +181,6 @@ public class IoTDBAggregationSmallDataIT {
}
@Test
- @Ignore
- public void countOnlyTimeFilterRemoteTest() throws ClassNotFoundException, SQLException {
- QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
- countOnlyTimeFilterTest();
- }
-
- @Test
public void functionsNoFilterTest() throws ClassNotFoundException, SQLException {
String[] retArray = new String[]{
"0,4,0,6,1",
@@ -286,13 +279,6 @@ public class IoTDBAggregationSmallDataIT {
}
@Test
- @Ignore
- public void functionsNoFilterRemoteTest() throws ClassNotFoundException, SQLException {
- QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
- functionsNoFilterTest();
- }
-
- @Test
public void lastAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException {
String[] retArray = new String[]{
"0,22222,55555"
@@ -322,13 +308,6 @@ public class IoTDBAggregationSmallDataIT {
}
@Test
- @Ignore
- public void lastAggreWithSingleFilterRemoteTest() throws ClassNotFoundException, SQLException {
- QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
- lastAggreWithSingleFilterTest();
- }
-
- @Test
public void firstAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException {
String[] retArray = new String[]{
"0,99,180"
@@ -358,13 +337,6 @@ public class IoTDBAggregationSmallDataIT {
}
@Test
- @Ignore
- public void firstAggreWithSingleFilterRemoteTest() throws ClassNotFoundException, SQLException {
- QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
- firstAggreWithSingleFilterTest();
- }
-
- @Test
public void sumAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException {
String[] retArray = new String[]{
"0,22321.0,55934.0,1029"
@@ -394,13 +366,6 @@ public class IoTDBAggregationSmallDataIT {
}
@Test
- @Ignore
- public void sumAggreWithSingleFilterRemoteTest() throws ClassNotFoundException, SQLException {
- QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
- sumAggreWithSingleFilterTest();
- }
-
- @Test
public void meanAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException {
String[] retArray = new String[]{
"0,11160.5,18645,206"
@@ -430,13 +395,6 @@ public class IoTDBAggregationSmallDataIT {
}
@Test
- @Ignore
- public void meanAggreWithSingleFilterRemoteTest() throws ClassNotFoundException, SQLException {
- QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
- meanAggreWithSingleFilterTest();
- }
-
- @Test
public void countAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException {
String[] retArray = new String[]{
"0,2,3,5,1,0"
@@ -468,13 +426,6 @@ public class IoTDBAggregationSmallDataIT {
}
@Test
- @Ignore
- public void countAggreWithSingleFilterRemoteTest() throws ClassNotFoundException, SQLException {
- QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
- countAggreWithSingleFilterTest();
- }
-
- @Test
public void minTimeAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException {
String[] retArray = new String[]{
"0,104,1,2,101,100"
@@ -507,13 +458,6 @@ public class IoTDBAggregationSmallDataIT {
}
@Test
- @Ignore
- public void minTimeAggreWithSingleFilterRemoteTest() throws ClassNotFoundException, SQLException {
- QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
- minTimeAggreWithSingleFilterTest();
- }
-
- @Test
public void maxTimeAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException {
String[] retArray = new String[]{
"0,105,105,105,102,100"
@@ -546,13 +490,6 @@ public class IoTDBAggregationSmallDataIT {
}
@Test
- @Ignore
- public void maxTimeAggreWithSingleFilterRemoteTest() throws ClassNotFoundException, SQLException {
- QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
- maxTimeAggreWithSingleFilterTest();
- }
-
- @Test
public void minValueAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException {
String[] retArray = new String[]{
"0,90,180,2.22,ddddd,true"
@@ -588,14 +525,6 @@ public class IoTDBAggregationSmallDataIT {
}
@Test
- @Ignore
- public void minValueAggreWithSingleFilterRemoteTest()
- throws ClassNotFoundException, SQLException {
- QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
- minValueAggreWithSingleFilterTest();
- }
-
- @Test
public void maxValueAggreWithSingleFilterTest() throws ClassNotFoundException, SQLException {
String[] retArray = new String[]{
"0,99,50000,11.11,fffff,true"
@@ -630,14 +559,6 @@ public class IoTDBAggregationSmallDataIT {
}
@Test
- @Ignore
- public void maxValueAggreWithSingleFilterRemoteTest()
- throws ClassNotFoundException, SQLException {
- QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
- maxValueAggreWithSingleFilterTest();
- }
-
- @Test
public void countAggreWithMultiMultiFilterTest() {
String[] retArray = new String[]{
"0,2",
@@ -667,14 +588,6 @@ public class IoTDBAggregationSmallDataIT {
}
@Test
- @Ignore
- public void countAggreWithMultiMultiFilterRemoteTest()
- throws ClassNotFoundException, SQLException {
- QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
- countAggreWithMultiMultiFilterTest();
- }
-
- @Test
public void selectAllSQLTest() throws ClassNotFoundException, SQLException {
//d0s0,d0s1,d0s2,d0s3,d1s0
String[] retArray = new String[]{
@@ -739,13 +652,6 @@ public class IoTDBAggregationSmallDataIT {
}
}
- @Test
- @Ignore
- public void selectAllSQLRemoteTest() throws ClassNotFoundException, SQLException {
- QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
- selectAllSQLTest();
- }
-
private static void insertSQL() {
try (Connection connection = DriverManager.getConnection
(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root")) {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterLocalManagerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterLocalManagerTest.java
index b822831..e71e489 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterLocalManagerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterLocalManagerTest.java
@@ -42,7 +42,8 @@ import org.apache.iotdb.cluster.query.manager.querynode.ClusterLocalSingleQueryM
import org.apache.iotdb.cluster.query.reader.querynode.AbstractClusterSelectSeriesBatchReader;
import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReader;
import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReaderByTimestamp;
-import org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReader;
+import org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReaderEntity;
+import org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReaderEntity;
import org.apache.iotdb.cluster.utils.EnvironmentUtils;
import org.apache.iotdb.cluster.utils.QPExecutorUtils;
import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
@@ -219,15 +220,15 @@ public class ClusterLocalManagerTest {
assertNotNull(singleQueryManager);
assertEquals((long) map.get(taskId), singleQueryManager.getJobId());
assertEquals(0, singleQueryManager.getQueryRound());
- assertNull(singleQueryManager.getFilterReader());
- Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = singleQueryManager
- .getSelectSeriesReaders();
- assertEquals(3, selectSeriesReaders.size());
+ assertNull(singleQueryManager.getFilterReaderEntity());
+ ClusterSelectSeriesBatchReaderEntity selectSeriesBatchReaderEntity = singleQueryManager.getSelectReaderEntity();
+ assertEquals(3, selectSeriesBatchReaderEntity.getAllReaders().size());
Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap();
- for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : selectSeriesReaders.entrySet()) {
- String path = entry.getKey();
- TSDataType dataType = typeMap.get(path);
- AbstractClusterSelectSeriesBatchReader clusterBatchReader = entry.getValue();
+ List<AbstractClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders();
+ List<String> paths = selectSeriesBatchReaderEntity.getAllPaths();
+ for (int i =0 ; i < readers.size(); i++) {
+ TSDataType dataType = typeMap.get(paths.get(i));
+ AbstractClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i);
assertNotNull(((ClusterSelectSeriesBatchReader) clusterBatchReader).getReader());
assertEquals(dataType,
((ClusterSelectSeriesBatchReader) clusterBatchReader).getDataType());
@@ -246,15 +247,15 @@ public class ClusterLocalManagerTest {
assertNotNull(singleQueryManager);
assertEquals((long) map.get(taskId), singleQueryManager.getJobId());
assertEquals(0, singleQueryManager.getQueryRound());
- assertNull(singleQueryManager.getFilterReader());
- Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = singleQueryManager
- .getSelectSeriesReaders();
- assertEquals(3, selectSeriesReaders.size());
+ assertNull(singleQueryManager.getFilterReaderEntity());
+ ClusterSelectSeriesBatchReaderEntity selectSeriesBatchReaderEntity = singleQueryManager.getSelectReaderEntity();
+ assertEquals(3, selectSeriesBatchReaderEntity.getAllReaders().size());
Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap();
- for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : selectSeriesReaders.entrySet()) {
- String path = entry.getKey();
- TSDataType dataType = typeMap.get(path);
- AbstractClusterSelectSeriesBatchReader clusterBatchReader = entry.getValue();
+ List<AbstractClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders();
+ List<String> paths = selectSeriesBatchReaderEntity.getAllPaths();
+ for (int i =0 ; i < readers.size(); i++) {
+ TSDataType dataType = typeMap.get(paths.get(i));
+ AbstractClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i);
assertNotNull(((ClusterSelectSeriesBatchReader) clusterBatchReader).getReader());
assertEquals(dataType,
((ClusterSelectSeriesBatchReader) clusterBatchReader).getDataType());
@@ -273,15 +274,15 @@ public class ClusterLocalManagerTest {
assertNotNull(singleQueryManager);
assertEquals((long) map.get(taskId), singleQueryManager.getJobId());
assertEquals(0, singleQueryManager.getQueryRound());
- assertNull(singleQueryManager.getFilterReader());
- Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = singleQueryManager
- .getSelectSeriesReaders();
- assertEquals(3, selectSeriesReaders.size());
+ assertNull(singleQueryManager.getFilterReaderEntity());
+ ClusterSelectSeriesBatchReaderEntity selectSeriesBatchReaderEntity = singleQueryManager.getSelectReaderEntity();
+ assertEquals(3, selectSeriesBatchReaderEntity.getAllReaders().size());
Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap();
- for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : selectSeriesReaders.entrySet()) {
- String path = entry.getKey();
- TSDataType dataType = typeMap.get(path);
- AbstractClusterSelectSeriesBatchReader clusterBatchReader = entry.getValue();
+ List<AbstractClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders();
+ List<String> paths = selectSeriesBatchReaderEntity.getAllPaths();
+ for (int i =0 ; i < readers.size(); i++) {
+ TSDataType dataType = typeMap.get(paths.get(i));
+ AbstractClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i);
assertNotNull(((ClusterSelectSeriesBatchReader) clusterBatchReader).getReader());
assertEquals(dataType,
((ClusterSelectSeriesBatchReader) clusterBatchReader).getDataType());
@@ -310,22 +311,22 @@ public class ClusterLocalManagerTest {
assertNotNull(singleQueryManager);
assertEquals((long) map.get(taskId), singleQueryManager.getJobId());
assertEquals(3, singleQueryManager.getQueryRound());
- ClusterFilterSeriesBatchReader filterReader = (ClusterFilterSeriesBatchReader) singleQueryManager.getFilterReader();
+ ClusterFilterSeriesBatchReaderEntity filterReader = (ClusterFilterSeriesBatchReaderEntity) singleQueryManager.getFilterReaderEntity();
assertNotNull(filterReader);
List<Path> allFilterPaths = new ArrayList<>();
allFilterPaths.add(new Path("root.vehicle.d0.s0"));
assertTrue(allFilterPaths.containsAll(filterReader.getAllFilterPath()));
assertNotNull(filterReader.getQueryDataSet());
- Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = singleQueryManager
- .getSelectSeriesReaders();
- assertNotNull(selectSeriesReaders);
- assertEquals(3, selectSeriesReaders.size());
+ ClusterSelectSeriesBatchReaderEntity selectSeriesBatchReaderEntity = singleQueryManager.getSelectReaderEntity();
+ assertNotNull(selectSeriesBatchReaderEntity.getAllReaders());
+ assertEquals(3, selectSeriesBatchReaderEntity.getAllReaders().size());
Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap();
- for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : selectSeriesReaders.entrySet()) {
- String path = entry.getKey();
- TSDataType dataType = typeMap.get(path);
- AbstractClusterSelectSeriesBatchReader clusterBatchReader = entry.getValue();
+ List<AbstractClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders();
+ List<String> paths = selectSeriesBatchReaderEntity.getAllPaths();
+ for (int i =0 ; i < readers.size(); i++) {
+ TSDataType dataType = typeMap.get(paths.get(i));
+ AbstractClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i);
assertNotNull(((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getReaderByTimeStamp());
assertEquals(dataType,
((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getDataType());
@@ -344,22 +345,22 @@ public class ClusterLocalManagerTest {
assertNotNull(singleQueryManager);
assertEquals((long) map.get(taskId), singleQueryManager.getJobId());
assertEquals(3, singleQueryManager.getQueryRound());
- ClusterFilterSeriesBatchReader filterReader = (ClusterFilterSeriesBatchReader) singleQueryManager.getFilterReader();
+ ClusterFilterSeriesBatchReaderEntity filterReader = (ClusterFilterSeriesBatchReaderEntity) singleQueryManager.getFilterReaderEntity();
assertNotNull(filterReader);
List<Path> allFilterPaths = new ArrayList<>();
allFilterPaths.add(new Path("root.vehicle.d0.s0"));
assertTrue(allFilterPaths.containsAll(filterReader.getAllFilterPath()));
assertNotNull(filterReader.getQueryDataSet());
- Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = singleQueryManager
- .getSelectSeriesReaders();
- assertNotNull(selectSeriesReaders);
- assertEquals(3, selectSeriesReaders.size());
+ ClusterSelectSeriesBatchReaderEntity selectSeriesBatchReaderEntity = singleQueryManager.getSelectReaderEntity();
+ List<AbstractClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders();
+ assertNotNull(readers);
+ assertEquals(3, readers.size());
Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap();
- for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : selectSeriesReaders.entrySet()) {
- String path = entry.getKey();
- TSDataType dataType = typeMap.get(path);
- AbstractClusterSelectSeriesBatchReader clusterBatchReader = entry.getValue();
+ List<String> paths = selectSeriesBatchReaderEntity.getAllPaths();
+ for (int i =0 ; i < readers.size(); i++) {
+ TSDataType dataType = typeMap.get(paths.get(i));
+ AbstractClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i);
assertNotNull(((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getReaderByTimeStamp());
assertEquals(dataType,
((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getDataType());
@@ -378,22 +379,22 @@ public class ClusterLocalManagerTest {
assertNotNull(singleQueryManager);
assertEquals((long) map.get(taskId), singleQueryManager.getJobId());
assertEquals(3, singleQueryManager.getQueryRound());
- ClusterFilterSeriesBatchReader filterReader = (ClusterFilterSeriesBatchReader) singleQueryManager.getFilterReader();
+ ClusterFilterSeriesBatchReaderEntity filterReader = (ClusterFilterSeriesBatchReaderEntity) singleQueryManager.getFilterReaderEntity();
assertNotNull(filterReader);
List<Path> allFilterPaths = new ArrayList<>();
allFilterPaths.add(new Path("root.vehicle.d0.s0"));
assertTrue(allFilterPaths.containsAll(filterReader.getAllFilterPath()));
assertNotNull(filterReader.getQueryDataSet());
- Map<String, AbstractClusterSelectSeriesBatchReader> selectSeriesReaders = singleQueryManager
- .getSelectSeriesReaders();
- assertNotNull(selectSeriesReaders);
- assertEquals(3, selectSeriesReaders.size());
+ ClusterSelectSeriesBatchReaderEntity selectSeriesBatchReaderEntity = singleQueryManager.getSelectReaderEntity();
+ List<AbstractClusterSelectSeriesBatchReader> readers = selectSeriesBatchReaderEntity.getAllReaders();
+ assertNotNull(readers);
+ assertEquals(3, readers.size());
Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap();
- for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : selectSeriesReaders.entrySet()) {
- String path = entry.getKey();
- TSDataType dataType = typeMap.get(path);
- AbstractClusterSelectSeriesBatchReader clusterBatchReader = entry.getValue();
+ List<String> paths = selectSeriesBatchReaderEntity.getAllPaths();
+ for (int i =0 ; i < readers.size(); i++) {
+ TSDataType dataType = typeMap.get(paths.get(i));
+ AbstractClusterSelectSeriesBatchReader clusterBatchReader = readers.get(i);
assertNotNull(((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getReaderByTimeStamp());
assertEquals(dataType,
((ClusterSelectSeriesBatchReaderByTimestamp) clusterBatchReader).getDataType());