You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/16 12:27:06 UTC
[incubator-iotdb] 15/19: add ClusterRpcReaderUtils
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster_read
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit d4863aad7f439c31bd967780aff9094ede628fde
Author: lta <li...@163.com>
AuthorDate: Tue Apr 16 16:17:00 2019 +0800
add ClusterRpcReaderUtils
---
.../org/apache/iotdb/cluster/entity/Server.java | 14 +--
.../cluster/entity/raft/DataStateMachine.java | 2 +-
.../cluster/entity/raft/MetadataStateManchine.java | 2 +-
.../cluster/qp/executor/AbstractQPExecutor.java | 17 ++--
.../executor/ClusterQueryProcessExecutor.java | 57 ++++++++++---
.../cluster/qp/executor/NonQueryExecutor.java | 8 +-
.../cluster/qp/executor/QueryMetadataExecutor.java | 38 ++++-----
.../apache/iotdb/cluster/qp/task/BatchQPTask.java | 2 +-
.../executor/ClusterExecutorWithTimeGenerator.java | 5 +-
.../ClusterExecutorWithoutTimeGenerator.java | 6 +-
.../executor/ClusterQueryRouter.java | 41 ++++++---
.../factory/ClusterRpcReaderFactory.java | 31 -------
.../manager/ClusterSingleQueryManager.java | 37 ++++----
.../manager/IClusterSingleQueryManager.java | 5 +-
.../reader/ClusterRpcBatchDataReader.java | 29 ++++++-
.../reader/ClusterSeriesReader.java | 2 +-
.../iotdb/cluster/rpc/raft/NodeAsClient.java | 8 +-
.../rpc/raft/impl/RaftNodeAsClientManager.java | 13 ++-
.../DataGroupNonQueryAsyncProcessor.java | 7 +-
.../MetaGroupNonQueryAsyncProcessor.java | 7 +-
.../QueryMetadataAsyncProcessor.java | 7 +-
.../QueryMetadataInStringAsyncProcessor.java | 7 +-
.../QueryPathsAsyncProcessor.java | 7 +-
.../QuerySeriesTypeAsyncProcessor.java | 7 +-
.../QueryTimeSeriesAsyncProcessor.java | 7 +-
.../rpc/raft/request/BasicQueryRequest.java | 4 +
.../{ => nonquery}/DataGroupNonQueryRequest.java | 5 +-
.../{ => nonquery}/MetaGroupNonQueryRequest.java | 5 +-
.../QuerySeriesDataRequest.java} | 27 ++++--
.../Stage.java} | 13 +--
.../QueryMetadataInStringRequest.java | 6 +-
.../{ => querymetadata}/QueryMetadataRequest.java | 6 +-
.../{ => querymetadata}/QueryPathsRequest.java | 6 +-
.../QuerySeriesTypeRequest.java | 6 +-
.../QueryStorageGroupRequest.java | 6 +-
.../QueryTimeSeriesRequest.java | 6 +-
.../{ => nonquery}/DataGroupNonQueryResponse.java | 4 +-
.../{ => nonquery}/MetaGroupNonQueryResponse.java | 4 +-
.../querydata/QuerySeriesDataResponse.java | 69 +++++++++++++++
.../QueryMetadataInStringResponse.java | 4 +-
.../{ => querymetadata}/QueryMetadataResponse.java | 3 +-
.../{ => querymetadata}/QueryPathsResponse.java | 3 +-
.../QuerySeriesTypeResponse.java | 3 +-
.../QueryStorageGroupResponse.java | 3 +-
.../QueryTimeSeriesResponse.java | 3 +-
.../cluster/service/TSServiceClusterImpl.java | 18 ++--
.../org/apache/iotdb/cluster/utils/RaftUtils.java | 2 +-
.../cluster/utils/query/ClusterRpcReaderUtils.java | 99 ++++++++++++++++++++++
.../query/QueryTask.java} | 39 ++++-----
.../apache/iotdb/cluster/utils/RaftUtilsTest.java | 2 +-
.../db/qp/executor/IQueryProcessExecutor.java | 1 -
.../iotdb/db/qp/executor/OverflowQPExecutor.java | 6 +-
.../iotdb/db/qp/executor/QueryProcessExecutor.java | 9 +-
.../apache/iotdb/tsfile/read/common/BatchData.java | 4 +-
54 files changed, 490 insertions(+), 242 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index a4c02cd..1f0e4e3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -32,13 +32,13 @@ import org.apache.iotdb.cluster.entity.metadata.MetadataHolder;
import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager;
-import org.apache.iotdb.cluster.rpc.raft.processor.DataGroupNonQueryAsyncProcessor;
-import org.apache.iotdb.cluster.rpc.raft.processor.MetaGroupNonQueryAsyncProcessor;
-import org.apache.iotdb.cluster.rpc.raft.processor.QueryMetadataAsyncProcessor;
-import org.apache.iotdb.cluster.rpc.raft.processor.QueryMetadataInStringAsyncProcessor;
-import org.apache.iotdb.cluster.rpc.raft.processor.QueryPathsAsyncProcessor;
-import org.apache.iotdb.cluster.rpc.raft.processor.QuerySeriesTypeAsyncProcessor;
-import org.apache.iotdb.cluster.rpc.raft.processor.QueryTimeSeriesAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.nonquery.DataGroupNonQueryAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.nonquery.MetaGroupNonQueryAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryMetadataAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryMetadataInStringAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryPathsAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QuerySeriesTypeAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryTimeSeriesAsyncProcessor;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
import org.apache.iotdb.cluster.utils.hash.Router;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
index ebac074..e74db12 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
@@ -31,7 +31,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.cluster.rpc.raft.closure.ResponseClosure;
-import org.apache.iotdb.cluster.rpc.raft.request.DataGroupNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.nonquery.DataGroupNonQueryRequest;
import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.db.exception.PathErrorException;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
index 9592718..b45ab9c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
@@ -32,7 +32,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.cluster.rpc.raft.closure.ResponseClosure;
-import org.apache.iotdb.cluster.rpc.raft.request.MetaGroupNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.nonquery.MetaGroupNonQueryRequest;
import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.db.exception.PathErrorException;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
index e5df083..810cc6e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
@@ -70,12 +70,17 @@ public abstract class AbstractQPExecutor {
/**
* ReadMetadataConsistencyLevel: 1 Strong consistency, 2 Weak consistency
*/
- protected int readMetadataConsistencyLevel = CLUSTER_CONFIG.getReadMetadataConsistencyLevel();
+ private ThreadLocal<Integer> readMetadataConsistencyLevel = new ThreadLocal<>();
/**
* ReadDataConsistencyLevel: 1 Strong consistency, 2 Weak consistency
*/
- private int readDataConsistencyLevel = CLUSTER_CONFIG.getReadDataConsistencyLevel();
+ private ThreadLocal<Integer> readDataConsistencyLevel = new ThreadLocal<>();
+
+ public AbstractQPExecutor() {
+ readMetadataConsistencyLevel.set(CLUSTER_CONFIG.getReadMetadataConsistencyLevel());
+ readDataConsistencyLevel.set(CLUSTER_CONFIG.getReadDataConsistencyLevel());
+ }
/**
* Async handle QPTask by QPTask and leader id
@@ -141,7 +146,7 @@ public abstract class AbstractQPExecutor {
public void setReadMetadataConsistencyLevel(int level) throws ConsistencyLevelException {
if (level <= ClusterConstant.MAX_CONSISTENCY_LEVEL) {
- this.readMetadataConsistencyLevel = level;
+ readMetadataConsistencyLevel.set(level);
} else {
throw new ConsistencyLevelException(String.format("Consistency level %d not support", level));
}
@@ -149,17 +154,17 @@ public abstract class AbstractQPExecutor {
public void setReadDataConsistencyLevel(int level) throws ConsistencyLevelException {
if (level <= ClusterConstant.MAX_CONSISTENCY_LEVEL) {
- this.readDataConsistencyLevel = level;
+ readDataConsistencyLevel.set(level);
} else {
throw new ConsistencyLevelException(String.format("Consistency level %d not support", level));
}
}
public int getReadMetadataConsistencyLevel() {
- return readMetadataConsistencyLevel;
+ return readMetadataConsistencyLevel.get();
}
public int getReadDataConsistencyLevel() {
- return readDataConsistencyLevel;
+ return readDataConsistencyLevel.get();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryProcessExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/ClusterQueryProcessExecutor.java
similarity index 67%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryProcessExecutor.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/ClusterQueryProcessExecutor.java
index e7e58f8..1b82583 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryProcessExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/ClusterQueryProcessExecutor.java
@@ -16,18 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.query.coordinatornode.executor;
+package org.apache.iotdb.cluster.qp.executor;
import java.io.IOException;
import java.util.List;
import java.util.Map;
-import org.apache.iotdb.cluster.qp.executor.QueryMetadataExecutor;
+import org.apache.iotdb.cluster.query.coordinatornode.executor.ClusterQueryRouter;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.qp.constant.SQLConstant;
-import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
+import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.executor.IEngineQueryRouter;
import org.apache.iotdb.db.query.fill.IFill;
@@ -35,34 +39,64 @@ import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Pair;
-public class ClusterQueryProcessExecutor extends QueryProcessExecutor {
+public class ClusterQueryProcessExecutor extends AbstractQPExecutor implements IQueryProcessExecutor {
- private IEngineQueryRouter queryRouter = new ClusterQueryRouter();
+ private ThreadLocal<Integer> fetchSize = new ThreadLocal<>();
+ private ClusterQueryRouter clusterQueryRouter = new ClusterQueryRouter();
private QueryMetadataExecutor queryMetadataExecutor = new QueryMetadataExecutor();
@Override
+ public QueryDataSet processQuery(QueryPlan queryPlan, QueryContext context)
+ throws IOException, FileNodeManagerException, PathErrorException,
+ QueryFilterOptimizationException, ProcessorException {
+
+ QueryExpression queryExpression = QueryExpression.create().setSelectSeries(queryPlan.getPaths())
+ .setExpression(queryPlan.getExpression());
+ clusterQueryRouter.setReadDataConsistencyLevel(getReadDataConsistencyLevel());
+ if (queryPlan instanceof GroupByPlan) {
+ GroupByPlan groupByPlan = (GroupByPlan) queryPlan;
+ return groupBy(groupByPlan.getPaths(), groupByPlan.getAggregations(),
+ groupByPlan.getExpression(), groupByPlan.getUnit(), groupByPlan.getOrigin(),
+ groupByPlan.getIntervals(), context);
+ }
+
+ if (queryPlan instanceof AggregationPlan) {
+ return aggregate(queryPlan.getPaths(), queryPlan.getAggregations(),
+ queryPlan.getExpression(), context);
+ }
+
+ if (queryPlan instanceof FillQueryPlan) {
+ FillQueryPlan fillQueryPlan = (FillQueryPlan) queryPlan;
+ return fill(queryPlan.getPaths(), fillQueryPlan.getQueryTime(),
+ fillQueryPlan.getFillType(), context);
+ }
+ return clusterQueryRouter.query(queryExpression, context);
+ }
+
+ @Override
public QueryDataSet aggregate(List<Path> paths, List<String> aggres, IExpression expression,
QueryContext context)
throws ProcessorException, IOException, PathErrorException, FileNodeManagerException, QueryFilterOptimizationException {
- return queryRouter.aggregate(paths, aggres, expression, context);
+ return clusterQueryRouter.aggregate(paths, aggres, expression, context);
}
@Override
public QueryDataSet groupBy(List<Path> paths, List<String> aggres, IExpression expression,
long unit, long origin, List<Pair<Long, Long>> intervals, QueryContext context)
throws ProcessorException, IOException, PathErrorException, FileNodeManagerException, QueryFilterOptimizationException {
- return queryRouter.groupBy(paths, aggres, expression, unit, origin, intervals, context);
+ return clusterQueryRouter.groupBy(paths, aggres, expression, unit, origin, intervals, context);
}
@Override
public QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillTypes,
QueryContext context)
throws ProcessorException, IOException, PathErrorException, FileNodeManagerException {
- return queryRouter.fill(fillPaths, queryTime, fillTypes, context);
+ return clusterQueryRouter.fill(fillPaths, queryTime, fillTypes, context);
}
@Override
@@ -111,11 +145,6 @@ public class ClusterQueryProcessExecutor extends QueryProcessExecutor {
}
@Override
- public IEngineQueryRouter getQueryRouter() {
- return queryRouter;
- }
-
- @Override
public boolean update(Path path, long startTime, long endTime, String value)
throws ProcessorException {
throw new UnsupportedOperationException();
@@ -144,6 +173,6 @@ public class ClusterQueryProcessExecutor extends QueryProcessExecutor {
@Override
public boolean processNonQuery(PhysicalPlan plan) throws ProcessorException {
- return false;
+ throw new UnsupportedOperationException();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index ecafe4e..b6247c8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -38,11 +38,11 @@ import org.apache.iotdb.cluster.qp.task.BatchQPTask;
import org.apache.iotdb.cluster.qp.task.QPTask;
import org.apache.iotdb.cluster.qp.task.SingleQPTask;
import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
-import org.apache.iotdb.cluster.rpc.raft.request.DataGroupNonQueryRequest;
-import org.apache.iotdb.cluster.rpc.raft.request.MetaGroupNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.nonquery.DataGroupNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.nonquery.MetaGroupNonQueryRequest;
import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
-import org.apache.iotdb.cluster.rpc.raft.response.DataGroupNonQueryResponse;
-import org.apache.iotdb.cluster.rpc.raft.response.MetaGroupNonQueryResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.nonquery.DataGroupNonQueryResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.nonquery.MetaGroupNonQueryResponse;
import org.apache.iotdb.cluster.service.TSServiceClusterImpl.BatchResult;
import org.apache.iotdb.cluster.utils.QPExecutorUtils;
import org.apache.iotdb.cluster.utils.RaftUtils;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index aea0656..82325e1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -32,19 +32,19 @@ import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
import org.apache.iotdb.cluster.entity.raft.RaftService;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
-import org.apache.iotdb.cluster.rpc.raft.request.QueryMetadataInStringRequest;
-import org.apache.iotdb.cluster.rpc.raft.request.QueryMetadataRequest;
-import org.apache.iotdb.cluster.rpc.raft.request.QueryPathsRequest;
-import org.apache.iotdb.cluster.rpc.raft.request.QuerySeriesTypeRequest;
-import org.apache.iotdb.cluster.rpc.raft.request.QueryStorageGroupRequest;
-import org.apache.iotdb.cluster.rpc.raft.request.QueryTimeSeriesRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QueryMetadataInStringRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QueryMetadataRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QueryPathsRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QuerySeriesTypeRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QueryStorageGroupRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QueryTimeSeriesRequest;
import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
-import org.apache.iotdb.cluster.rpc.raft.response.QueryMetadataInStringResponse;
-import org.apache.iotdb.cluster.rpc.raft.response.QueryMetadataResponse;
-import org.apache.iotdb.cluster.rpc.raft.response.QueryPathsResponse;
-import org.apache.iotdb.cluster.rpc.raft.response.QuerySeriesTypeResponse;
-import org.apache.iotdb.cluster.rpc.raft.response.QueryStorageGroupResponse;
-import org.apache.iotdb.cluster.rpc.raft.response.QueryTimeSeriesResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QueryMetadataInStringResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QueryMetadataResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QueryPathsResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QuerySeriesTypeResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QueryStorageGroupResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QueryTimeSeriesResponse;
import org.apache.iotdb.cluster.utils.QPExecutorUtils;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.db.exception.PathErrorException;
@@ -125,7 +125,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
private void handleTimseriesQuery(String groupId, List<String> pathList, List<List<String>> res)
throws ProcessorException, InterruptedException {
QueryTimeSeriesRequest request = new QueryTimeSeriesRequest(groupId,
- readMetadataConsistencyLevel, pathList);
+ getReadMetadataConsistencyLevel(), pathList);
SingleQPTask task = new SingleQPTask(false, request);
LOGGER.debug("Execute show timeseries {} statement for group {}.", pathList, groupId);
@@ -152,7 +152,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
List<SingleQPTask> taskList = new ArrayList<>();
for (String groupId : groupIdSet) {
QueryMetadataInStringRequest request = new QueryMetadataInStringRequest(groupId,
- readMetadataConsistencyLevel);
+ getReadMetadataConsistencyLevel());
SingleQPTask task = new SingleQPTask(false, request);
taskList.add(task);
@@ -191,7 +191,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
List<SingleQPTask> taskList = new ArrayList<>();
for (String groupId : groupIdSet) {
QueryMetadataRequest request = new QueryMetadataRequest(groupId,
- readMetadataConsistencyLevel);
+ getReadMetadataConsistencyLevel());
SingleQPTask task = new SingleQPTask(false, request);
taskList.add(task);
@@ -235,7 +235,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
} else {
String groupId = router.getGroupIdBySG(storageGroupList.get(0));
QuerySeriesTypeRequest request = new QuerySeriesTypeRequest(groupId,
- readMetadataConsistencyLevel, path);
+ getReadMetadataConsistencyLevel(), path);
SingleQPTask task = new SingleQPTask(false, request);
LOGGER.debug("Execute get series type for {} statement for group {}.", path, groupId);
@@ -284,7 +284,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
private void handlePathsQuery(String groupId, List<String> pathList, List<String> res)
throws ProcessorException, InterruptedException {
QueryPathsRequest request = new QueryPathsRequest(groupId,
- readMetadataConsistencyLevel, pathList);
+ getReadMetadataConsistencyLevel(), pathList);
SingleQPTask task = new SingleQPTask(false, request);
LOGGER.debug("Execute get paths for {} statement for group {}.", pathList, groupId);
@@ -325,10 +325,10 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
private Set<String> queryStorageGroupLocally() throws InterruptedException {
final byte[] reqContext = RaftUtils.createRaftRequestContext();
QueryStorageGroupRequest request = new QueryStorageGroupRequest(
- ClusterConfig.METADATA_GROUP_ID, readMetadataConsistencyLevel);
+ ClusterConfig.METADATA_GROUP_ID, getReadMetadataConsistencyLevel());
SingleQPTask task = new SingleQPTask(false, request);
MetadataRaftHolder metadataHolder = (MetadataRaftHolder) server.getMetadataHolder();
- if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
+ if (getReadMetadataConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
QueryStorageGroupResponse response;
try {
response = QueryStorageGroupResponse
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
index d224bae..43edd67 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.cluster.concurrent.pool.QPTaskManager;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
import org.apache.iotdb.cluster.qp.executor.NonQueryExecutor;
import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
-import org.apache.iotdb.cluster.rpc.raft.response.DataGroupNonQueryResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.nonquery.DataGroupNonQueryResponse;
import org.apache.iotdb.cluster.service.TSServiceClusterImpl.BatchResult;
import org.apache.iotdb.cluster.utils.QPExecutorUtils;
import org.apache.iotdb.cluster.utils.RaftUtils;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java
index d1ca472..68a14c8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java
@@ -40,10 +40,13 @@ public class ClusterExecutorWithTimeGenerator {
private QueryExpression queryExpression;
private IClusterSingleQueryManager queryManager;
+ private int readDataConsistencyLevel;
- ClusterExecutorWithTimeGenerator(QueryExpression queryExpression, IClusterSingleQueryManager queryManager) {
+ ClusterExecutorWithTimeGenerator(QueryExpression queryExpression,
+ IClusterSingleQueryManager queryManager, int readDataConsistencyLevel) {
this.queryExpression = queryExpression;
this.queryManager = queryManager;
+ this.readDataConsistencyLevel = readDataConsistencyLevel;
}
/**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithoutTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithoutTimeGenerator.java
index 7bd8008..65b7d8f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithoutTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithoutTimeGenerator.java
@@ -46,11 +46,13 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
public class ClusterExecutorWithoutTimeGenerator {
private QueryExpression queryExpression;
private ClusterSingleQueryManager queryManager;
+ private int readDataConsistencyLevel;
public ClusterExecutorWithoutTimeGenerator(QueryExpression queryExpression,
- ClusterSingleQueryManager queryManager) {
+ ClusterSingleQueryManager queryManager, int readDataConsistencyLevel) {
this.queryExpression = queryExpression;
this.queryManager = queryManager;
+ this.readDataConsistencyLevel = readDataConsistencyLevel;
}
/**
@@ -126,7 +128,9 @@ public class ClusterExecutorWithoutTimeGenerator {
ClusterSeriesReader reader = selectPathReaders.get(path.toString());
readersOfSelectedSeries.add(reader);
dataTypes.add(reader.getDataType());
+
} else {
+ // can read series locally.
QueryDataSource queryDataSource = QueryResourceManager.getInstance()
.getQueryDataSource(path,
context);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java
index 8fa1a95..da5a395 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java
@@ -21,10 +21,10 @@ package org.apache.iotdb.cluster.query.coordinatornode.executor;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import org.apache.iotdb.cluster.exception.RaftConnectionException;
import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterRpcQueryManager;
import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterSingleQueryManager;
import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterSingleQueryManager.QueryType;
-import org.apache.iotdb.cluster.query.coordinatornode.manager.IClusterSingleQueryManager;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
@@ -43,38 +43,43 @@ import org.apache.iotdb.tsfile.utils.Pair;
public class ClusterQueryRouter implements IEngineQueryRouter {
+ private ThreadLocal<Integer> readDataConsistencyLevel = new ThreadLocal<>();
+
@Override
public QueryDataSet query(QueryExpression queryExpression, QueryContext context)
throws FileNodeManagerException, PathErrorException {
ClusterSingleQueryManager queryManager = ClusterRpcQueryManager.getInstance()
.getSingleQuery(context.getJobId());
- if (queryExpression.hasQueryFilter()) {
- try {
+ try {
+ if (queryExpression.hasQueryFilter()) {
+
IExpression optimizedExpression = ExpressionOptimizer.getInstance()
.optimize(queryExpression.getExpression(), queryExpression.getSelectedSeries());
queryExpression.setExpression(optimizedExpression);
if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
- queryManager.init(QueryType.GLOBAL_TIME);
+ queryManager.init(QueryType.GLOBAL_TIME, getReadDataConsistencyLevel());
ClusterExecutorWithoutTimeGenerator engineExecutor =
- new ClusterExecutorWithoutTimeGenerator(queryExpression, queryManager);
+ new ClusterExecutorWithoutTimeGenerator(queryExpression, queryManager,
+ getReadDataConsistencyLevel());
return engineExecutor.executeWithGlobalTimeFilter(context);
} else {
- queryManager.init(QueryType.FILTER);
+ queryManager.init(QueryType.FILTER, getReadDataConsistencyLevel());
ClusterExecutorWithTimeGenerator engineExecutor = new ClusterExecutorWithTimeGenerator(
- queryExpression, queryManager);
+ queryExpression, queryManager, getReadDataConsistencyLevel());
return engineExecutor.execute(context);
}
- } catch (QueryFilterOptimizationException e) {
- throw new FileNodeManagerException(e);
+ } else {
+ queryManager.init(QueryType.NO_FILTER, getReadDataConsistencyLevel());
+ ClusterExecutorWithoutTimeGenerator engineExecutor =
+ new ClusterExecutorWithoutTimeGenerator(queryExpression, queryManager,
+ getReadDataConsistencyLevel());
+ return engineExecutor.executeWithoutFilter(context);
}
- } else {
- queryManager.init(QueryType.NO_FILTER);
- ClusterExecutorWithoutTimeGenerator engineExecutor =
- new ClusterExecutorWithoutTimeGenerator(queryExpression, queryManager);
- return engineExecutor.executeWithoutFilter(context);
+ } catch (QueryFilterOptimizationException | IOException | RaftConnectionException e) {
+ throw new FileNodeManagerException(e);
}
}
@@ -98,4 +103,12 @@ public class ClusterQueryRouter implements IEngineQueryRouter {
QueryContext context) throws FileNodeManagerException, PathErrorException, IOException {
return null;
}
+
+ public int getReadDataConsistencyLevel() {
+ return readDataConsistencyLevel.get();
+ }
+
+ public void setReadDataConsistencyLevel(int readDataConsistencyLevel) {
+ this.readDataConsistencyLevel.set(readDataConsistencyLevel);
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/factory/ClusterRpcReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/factory/ClusterRpcReaderFactory.java
deleted file mode 100644
index 27444af..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/factory/ClusterRpcReaderFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.cluster.query.coordinatornode.factory;
-
-import com.alipay.sofa.jraft.entity.PeerId;
-import java.util.Map;
-import org.apache.iotdb.cluster.query.coordinatornode.reader.ClusterSeriesReader;
-import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
-
-public class ClusterRpcReaderFactory {
-
- public static Map<String, ClusterSeriesReader> createClusterSeriesReader(String groupId, PeerId peerId, QueryPlan queryPlan){
-return null;
- }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterSingleQueryManager.java
index 90c1ac5..d350a7c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterSingleQueryManager.java
@@ -19,16 +19,19 @@
package org.apache.iotdb.cluster.query.coordinatornode.manager;
import com.alipay.sofa.jraft.entity.PeerId;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.iotdb.cluster.query.coordinatornode.factory.ClusterRpcReaderFactory;
+import org.apache.iotdb.cluster.exception.RaftConnectionException;
+import org.apache.iotdb.cluster.query.PathType;
import org.apache.iotdb.cluster.query.coordinatornode.reader.ClusterSeriesReader;
import org.apache.iotdb.cluster.utils.QPExecutorUtils;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.cluster.utils.hash.Router;
+import org.apache.iotdb.cluster.utils.query.ClusterRpcReaderUtils;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
@@ -37,7 +40,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
public class ClusterSingleQueryManager implements IClusterSingleQueryManager {
/**
- * Query job id assigned by QueryResourceManager.
+ * Query job id assigned by QueryResourceManager of coordinator node.
*/
private long jobId;
@@ -63,6 +66,8 @@ public class ClusterSingleQueryManager implements IClusterSingleQueryManager {
*/
private Map<String, QueryPlan> filterPathPlans = new HashMap<>();
+ private Map<String, ClusterSeriesReader> filterPathReaders = new HashMap<>();
+
public ClusterSingleQueryManager(long jobId,
QueryPlan queryPlan) {
this.jobId = jobId;
@@ -70,7 +75,8 @@ public class ClusterSingleQueryManager implements IClusterSingleQueryManager {
}
@Override
- public void init(QueryType queryType) throws PathErrorException {
+ public void init(QueryType queryType, int readDataConsistencyLevel)
+ throws PathErrorException, IOException, RaftConnectionException {
switch (queryType) {
case NO_FILTER:
divideNoFilterPhysicalPlan();
@@ -84,8 +90,8 @@ public class ClusterSingleQueryManager implements IClusterSingleQueryManager {
default:
throw new UnsupportedOperationException();
}
- initSelectedPathPlan();
- initFilterPathPlan();
+ initSelectedPathPlan(readDataConsistencyLevel);
+ initFilterPathPlan(readDataConsistencyLevel);
}
public enum QueryType {
@@ -127,26 +133,25 @@ public class ClusterSingleQueryManager implements IClusterSingleQueryManager {
/**
* Init select path
*/
- private void initSelectedPathPlan(){
- if(!selectPathPlans.isEmpty()){
- for(Entry<String, QueryPlan> entry: selectPathPlans.entrySet()){
+ private void initSelectedPathPlan(int readDataConsistencyLevel)
+ throws IOException, RaftConnectionException {
+ if (!selectPathPlans.isEmpty()) {
+ for (Entry<String, QueryPlan> entry : selectPathPlans.entrySet()) {
String groupId = entry.getKey();
QueryPlan queryPlan = entry.getValue();
- if(!canHandleQueryLocally(groupId)) {
+ if (!canHandleQueryLocally(groupId)) {
PeerId randomPeer = RaftUtils.getRandomPeerID(groupId);
readerNodes.put(groupId, randomPeer);
- Map<String, ClusterSeriesReader> selectPathReaders = ClusterRpcReaderFactory
- .createClusterSeriesReader(groupId, randomPeer, queryPlan);
- for (Path path : queryPlan.getPaths()) {
- selectPathReaders.put(path.getFullPath(), selectPathReaders.get(path.getFullPath()));
- }
+ this.selectPathReaders = ClusterRpcReaderUtils
+ .createClusterSeriesReader(groupId, randomPeer, queryPlan, readDataConsistencyLevel,
+ PathType.SELECT_PATH);
}
}
}
}
- private void initFilterPathPlan(){
- if(!filterPathPlans.isEmpty()){
+ private void initFilterPathPlan(int readDataConsistencyLevel) {
+ if (!filterPathPlans.isEmpty()) {
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterSingleQueryManager.java
index dc84d32..bc70e65 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterSingleQueryManager.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.cluster.query.coordinatornode.manager;
import com.alipay.sofa.jraft.entity.PeerId;
+import java.io.IOException;
+import org.apache.iotdb.cluster.exception.RaftConnectionException;
import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterSingleQueryManager.QueryType;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -33,7 +35,8 @@ public interface IClusterSingleQueryManager {
* @param queryType
*/
void init(
- QueryType queryType) throws PathErrorException;
+ QueryType queryType, int readDataConsistencyLevel)
+ throws PathErrorException, IOException, RaftConnectionException;
/**
* Get physical plan of select path
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterRpcBatchDataReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterRpcBatchDataReader.java
index 5b75c9d..ba36416 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterRpcBatchDataReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterRpcBatchDataReader.java
@@ -18,19 +18,42 @@
*/
package org.apache.iotdb.cluster.query.coordinatornode.reader;
+import com.alipay.sofa.jraft.entity.PeerId;
import java.io.IOException;
import org.apache.iotdb.cluster.query.PathType;
import org.apache.iotdb.db.query.reader.IBatchReader;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
public class ClusterRpcBatchDataReader implements IBatchReader {
- private String PeerId;
- private String jobId;
+ /**
+ * Remote query node
+ */
+ private PeerId peerId;
+
+ /**
+ * Job id in remote query node
+ */
+ private long jobId;
+
+ /**
+ * Path type
+ */
private PathType type;
+
+ /**
+ * Batch data
+ */
private BatchData batchData;
+ public ClusterRpcBatchDataReader(PeerId peerId, long jobId,
+ PathType type, BatchData batchData) {
+ this.peerId = peerId;
+ this.jobId = jobId;
+ this.type = type;
+ this.batchData = batchData;
+ }
+
@Override
public boolean hasNext() throws IOException {
return false;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterSeriesReader.java
index 438559e..3d022f8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterSeriesReader.java
@@ -40,7 +40,7 @@ public class ClusterSeriesReader implements IPointReader {
@Override
public TimeValuePair current() throws IOException {
- return null;
+ throw new IOException("current() in ClusterSeriesReader is an empty method.");
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
index ed38584..cc89abb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
@@ -22,6 +22,7 @@ import com.alipay.sofa.jraft.entity.PeerId;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
import org.apache.iotdb.cluster.qp.task.SingleQPTask;
import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
+import org.apache.iotdb.cluster.utils.query.QueryTask;
/**
* Handle the request and process the result as a client with the current node
@@ -38,11 +39,10 @@ public interface NodeAsClient {
/**
* Synchronous processing requests
- * @param clientService client rpc service handle
- * @param leader leader node of the target group
- * @param qpTask single QPTask to be executed
+ * @param peerId leader node of the target group
+ *
*/
- void syncHandleRequest(BasicRequest request, PeerId leader, SingleQPTask qpTask)
+ QueryTask syncHandleRequest(BasicRequest request, PeerId peerId)
throws RaftConnectionException;
/**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
index af77a1c..658b7a8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.cluster.qp.task.SingleQPTask;
import org.apache.iotdb.cluster.rpc.raft.NodeAsClient;
import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
+import org.apache.iotdb.cluster.utils.query.QueryTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -214,17 +215,13 @@ public class RaftNodeAsClientManager {
}
@Override
- public void syncHandleRequest(BasicRequest request, PeerId leader,
- SingleQPTask qpTask)
- throws RaftConnectionException {
+ public QueryTask syncHandleRequest(BasicRequest request, PeerId peerId) {
try {
BasicResponse response = (BasicResponse) boltClientService.getRpcClient()
- .invokeSync(leader.getEndpoint().toString(), request, TASK_TIMEOUT_MS);
- qpTask.run(response);
+ .invokeSync(peerId.getEndpoint().toString(), request, TASK_TIMEOUT_MS);
+ return new QueryTask(response, TaskState.FINISH);
} catch (RemotingException | InterruptedException e) {
- qpTask.setTaskState(TaskState.EXCEPTION);
- qpTask.run(null);
- throw new RaftConnectionException(e);
+ return new QueryTask(null, TaskState.EXCEPTION);
} finally {
releaseClient();
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/DataGroupNonQueryAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/DataGroupNonQueryAsyncProcessor.java
similarity index 90%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/DataGroupNonQueryAsyncProcessor.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/DataGroupNonQueryAsyncProcessor.java
index fb00c0d..de2d2ab 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/DataGroupNonQueryAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/DataGroupNonQueryAsyncProcessor.java
@@ -16,16 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.rpc.raft.processor;
+package org.apache.iotdb.cluster.rpc.raft.processor.nonquery;
import com.alipay.remoting.AsyncContext;
import com.alipay.remoting.BizContext;
import com.alipay.sofa.jraft.entity.PeerId;
import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.raft.request.DataGroupNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.raft.processor.BasicAsyncUserProcessor;
+import org.apache.iotdb.cluster.rpc.raft.request.nonquery.DataGroupNonQueryRequest;
import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
-import org.apache.iotdb.cluster.rpc.raft.response.DataGroupNonQueryResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.nonquery.DataGroupNonQueryResponse;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/MetaGroupNonQueryAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/MetaGroupNonQueryAsyncProcessor.java
similarity index 89%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/MetaGroupNonQueryAsyncProcessor.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/MetaGroupNonQueryAsyncProcessor.java
index d6f6270..9f09bbb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/MetaGroupNonQueryAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/MetaGroupNonQueryAsyncProcessor.java
@@ -16,16 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.rpc.raft.processor;
+package org.apache.iotdb.cluster.rpc.raft.processor.nonquery;
import com.alipay.remoting.AsyncContext;
import com.alipay.remoting.BizContext;
import com.alipay.sofa.jraft.entity.PeerId;
import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.raft.request.MetaGroupNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.raft.processor.BasicAsyncUserProcessor;
+import org.apache.iotdb.cluster.rpc.raft.request.nonquery.MetaGroupNonQueryRequest;
import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
-import org.apache.iotdb.cluster.rpc.raft.response.MetaGroupNonQueryResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.nonquery.MetaGroupNonQueryResponse;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryMetadataAsyncProcessor.java
similarity index 91%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryMetadataAsyncProcessor.java
index 176fa33..36e657c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryMetadataAsyncProcessor.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.rpc.raft.processor;
+package org.apache.iotdb.cluster.rpc.raft.processor.querymetadata;
import com.alipay.remoting.AsyncContext;
import com.alipay.remoting.BizContext;
@@ -25,8 +25,9 @@ import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.raft.request.QueryMetadataRequest;
-import org.apache.iotdb.cluster.rpc.raft.response.QueryMetadataResponse;
+import org.apache.iotdb.cluster.rpc.raft.processor.BasicAsyncUserProcessor;
+import org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QueryMetadataRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QueryMetadataResponse;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.metadata.MManager;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryMetadataInStringAsyncProcessor.java
similarity index 90%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryMetadataInStringAsyncProcessor.java
index b80f4ae..8771eea 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetadataInStringAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryMetadataInStringAsyncProcessor.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.rpc.raft.processor;
+package org.apache.iotdb.cluster.rpc.raft.processor.querymetadata;
import com.alipay.remoting.AsyncContext;
import com.alipay.remoting.BizContext;
@@ -25,8 +25,9 @@ import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.raft.request.QueryMetadataInStringRequest;
-import org.apache.iotdb.cluster.rpc.raft.response.QueryMetadataInStringResponse;
+import org.apache.iotdb.cluster.rpc.raft.processor.BasicAsyncUserProcessor;
+import org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QueryMetadataInStringRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QueryMetadataInStringResponse;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.db.metadata.MManager;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryPathsAsyncProcessor.java
similarity index 92%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryPathsAsyncProcessor.java
index f54aba0..8e1e47b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryPathsAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryPathsAsyncProcessor.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.rpc.raft.processor;
+package org.apache.iotdb.cluster.rpc.raft.processor.querymetadata;
import com.alipay.remoting.AsyncContext;
import com.alipay.remoting.BizContext;
@@ -25,8 +25,9 @@ import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.raft.request.QueryPathsRequest;
-import org.apache.iotdb.cluster.rpc.raft.response.QueryPathsResponse;
+import org.apache.iotdb.cluster.rpc.raft.processor.BasicAsyncUserProcessor;
+import org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QueryPathsRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QueryPathsResponse;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.metadata.MManager;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QuerySeriesTypeAsyncProcessor.java
similarity index 91%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QuerySeriesTypeAsyncProcessor.java
index f0a4fc6..9e4b1c7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QuerySeriesTypeAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QuerySeriesTypeAsyncProcessor.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.rpc.raft.processor;
+package org.apache.iotdb.cluster.rpc.raft.processor.querymetadata;
import com.alipay.remoting.AsyncContext;
import com.alipay.remoting.BizContext;
@@ -25,8 +25,9 @@ import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.raft.request.QuerySeriesTypeRequest;
-import org.apache.iotdb.cluster.rpc.raft.response.QuerySeriesTypeResponse;
+import org.apache.iotdb.cluster.rpc.raft.processor.BasicAsyncUserProcessor;
+import org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QuerySeriesTypeRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QuerySeriesTypeResponse;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.metadata.MManager;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryTimeSeriesAsyncProcessor.java
similarity index 92%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryTimeSeriesAsyncProcessor.java
index c41fdcf..593f99d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryTimeSeriesAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetadata/QueryTimeSeriesAsyncProcessor.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.rpc.raft.processor;
+package org.apache.iotdb.cluster.rpc.raft.processor.querymetadata;
import com.alipay.remoting.AsyncContext;
import com.alipay.remoting.BizContext;
@@ -25,8 +25,9 @@ import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.raft.request.QueryTimeSeriesRequest;
-import org.apache.iotdb.cluster.rpc.raft.response.QueryTimeSeriesResponse;
+import org.apache.iotdb.cluster.rpc.raft.processor.BasicAsyncUserProcessor;
+import org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QueryTimeSeriesRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QueryTimeSeriesResponse;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.metadata.MManager;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/BasicQueryRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/BasicQueryRequest.java
index 2cf613f..0e4ea6d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/BasicQueryRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/BasicQueryRequest.java
@@ -30,6 +30,10 @@ public abstract class BasicQueryRequest extends BasicRequest {
this.readConsistencyLevel = readConsistencyLevel;
}
+ public BasicQueryRequest(String groupID) {
+ super(groupID);
+ }
+
public int getReadConsistencyLevel() {
return readConsistencyLevel;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/DataGroupNonQueryRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/nonquery/DataGroupNonQueryRequest.java
similarity index 86%
copy from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/DataGroupNonQueryRequest.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/nonquery/DataGroupNonQueryRequest.java
index c1bcf5f..9a5df22 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/DataGroupNonQueryRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/nonquery/DataGroupNonQueryRequest.java
@@ -16,17 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.rpc.raft.request;
+package org.apache.iotdb.cluster.rpc.raft.request.nonquery;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
+import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
/**
* Handle request to data group
*/
-public class DataGroupNonQueryRequest extends BasicRequest implements Serializable {
+public class DataGroupNonQueryRequest extends BasicRequest {
public DataGroupNonQueryRequest(String groupID, List<PhysicalPlan> physicalPlanBytes)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/MetaGroupNonQueryRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/nonquery/MetaGroupNonQueryRequest.java
similarity index 86%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/MetaGroupNonQueryRequest.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/nonquery/MetaGroupNonQueryRequest.java
index 69625ff..7b90e68 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/MetaGroupNonQueryRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/nonquery/MetaGroupNonQueryRequest.java
@@ -16,17 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.rpc.raft.request;
+package org.apache.iotdb.cluster.rpc.raft.request.nonquery;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
+import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
/**
* Handle request to metadata group leader
*/
-public class MetaGroupNonQueryRequest extends BasicRequest implements Serializable {
+public class MetaGroupNonQueryRequest extends BasicRequest {
public MetaGroupNonQueryRequest(String groupID, List<PhysicalPlan> plans)
throws IOException {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/DataGroupNonQueryRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataRequest.java
similarity index 56%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/DataGroupNonQueryRequest.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataRequest.java
index c1bcf5f..fd27997 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/DataGroupNonQueryRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataRequest.java
@@ -16,23 +16,34 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.rpc.raft.request;
+package org.apache.iotdb.cluster.rpc.raft.request.querydata;
import java.io.IOException;
-import java.io.Serializable;
import java.util.List;
+import org.apache.iotdb.cluster.query.PathType;
+import org.apache.iotdb.cluster.rpc.raft.request.BasicQueryRequest;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-/**
- * Handle request to data group
- */
-public class DataGroupNonQueryRequest extends BasicRequest implements Serializable {
+public class QuerySeriesDataRequest extends BasicQueryRequest {
+ private Stage stage;
+ private PathType pathType;
+ private List<String> paths;
- public DataGroupNonQueryRequest(String groupID, List<PhysicalPlan> physicalPlanBytes)
+ public QuerySeriesDataRequest(String groupID, int readConsistencyLevel,
+ List<PhysicalPlan> physicalPlanBytes, PathType pathType)
throws IOException {
- super(groupID);
+ super(groupID, readConsistencyLevel);
init(physicalPlanBytes);
+ stage = Stage.INITIAL;
+ this.pathType = pathType;
}
+ public QuerySeriesDataRequest(String groupID, List<String> paths, PathType pathType)
+ throws IOException {
+ super(groupID);
+ this.paths = paths;
+ stage = Stage.READ_DATA;
+ this.pathType = pathType;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/Stage.java
similarity index 73%
copy from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/Stage.java
index 2628fb6..53d2e2a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/Stage.java
@@ -16,13 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.rpc.raft.request;
+package org.apache.iotdb.cluster.rpc.raft.request.querydata;
-import java.io.Serializable;
-
-public class QueryMetadataRequest extends BasicQueryRequest implements Serializable {
-
- public QueryMetadataRequest(String groupID, int readConsistencyLevel) {
- super(groupID, readConsistencyLevel);
- }
-}
\ No newline at end of file
+public enum Stage {
+ INITIAL, READ_DATA
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataInStringRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryMetadataInStringRequest.java
similarity index 87%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataInStringRequest.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryMetadataInStringRequest.java
index 18471a6..0e23dab 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataInStringRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryMetadataInStringRequest.java
@@ -16,11 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.rpc.raft.request;
+package org.apache.iotdb.cluster.rpc.raft.request.querymetadata;
-import java.io.Serializable;
+import org.apache.iotdb.cluster.rpc.raft.request.BasicQueryRequest;
-public class QueryMetadataInStringRequest extends BasicQueryRequest implements Serializable {
+public class QueryMetadataInStringRequest extends BasicQueryRequest {
public QueryMetadataInStringRequest(String groupID, int readConsistencyLevel) {
super(groupID, readConsistencyLevel);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryMetadataRequest.java
similarity index 82%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryMetadataRequest.java
index 2628fb6..3958775 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryMetadataRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryMetadataRequest.java
@@ -16,11 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.rpc.raft.request;
+package org.apache.iotdb.cluster.rpc.raft.request.querymetadata;
-import java.io.Serializable;
+import org.apache.iotdb.cluster.rpc.raft.request.BasicQueryRequest;
-public class QueryMetadataRequest extends BasicQueryRequest implements Serializable {
+public class QueryMetadataRequest extends BasicQueryRequest {
public QueryMetadataRequest(String groupID, int readConsistencyLevel) {
super(groupID, readConsistencyLevel);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryPathsRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryPathsRequest.java
similarity index 84%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryPathsRequest.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryPathsRequest.java
index 2c600f4..49355ea 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryPathsRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryPathsRequest.java
@@ -16,12 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.rpc.raft.request;
+package org.apache.iotdb.cluster.rpc.raft.request.querymetadata;
-import java.io.Serializable;
import java.util.List;
+import org.apache.iotdb.cluster.rpc.raft.request.BasicQueryRequest;
-public class QueryPathsRequest extends BasicQueryRequest implements Serializable {
+public class QueryPathsRequest extends BasicQueryRequest {
private List<String> path;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QuerySeriesTypeRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QuerySeriesTypeRequest.java
similarity index 84%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QuerySeriesTypeRequest.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QuerySeriesTypeRequest.java
index c486576..4f700dc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QuerySeriesTypeRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QuerySeriesTypeRequest.java
@@ -16,11 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.rpc.raft.request;
+package org.apache.iotdb.cluster.rpc.raft.request.querymetadata;
-import java.io.Serializable;
+import org.apache.iotdb.cluster.rpc.raft.request.BasicQueryRequest;
-public class QuerySeriesTypeRequest extends BasicQueryRequest implements Serializable {
+public class QuerySeriesTypeRequest extends BasicQueryRequest {
private String path;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryStorageGroupRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryStorageGroupRequest.java
similarity index 88%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryStorageGroupRequest.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryStorageGroupRequest.java
index 037924f..2bcf187 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryStorageGroupRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryStorageGroupRequest.java
@@ -16,11 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.rpc.raft.request;
+package org.apache.iotdb.cluster.rpc.raft.request.querymetadata;
-import java.io.Serializable;
+import org.apache.iotdb.cluster.rpc.raft.request.BasicQueryRequest;
-public class QueryStorageGroupRequest extends BasicQueryRequest implements Serializable {
+public class QueryStorageGroupRequest extends BasicQueryRequest {
public QueryStorageGroupRequest(String groupID, int readConsistencyLevel) {
super(groupID, readConsistencyLevel);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryTimeSeriesRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryTimeSeriesRequest.java
similarity index 84%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryTimeSeriesRequest.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryTimeSeriesRequest.java
index 0106f18..0bad4fc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/QueryTimeSeriesRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetadata/QueryTimeSeriesRequest.java
@@ -16,12 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.rpc.raft.request;
+package org.apache.iotdb.cluster.rpc.raft.request.querymetadata;
-import java.io.Serializable;
import java.util.List;
+import org.apache.iotdb.cluster.rpc.raft.request.BasicQueryRequest;
-public class QueryTimeSeriesRequest extends BasicQueryRequest implements Serializable {
+public class QueryTimeSeriesRequest extends BasicQueryRequest {
private List<String> path;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/DataGroupNonQueryResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/nonquery/DataGroupNonQueryResponse.java
similarity index 92%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/DataGroupNonQueryResponse.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/nonquery/DataGroupNonQueryResponse.java
index 074f452..0950c0d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/DataGroupNonQueryResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/nonquery/DataGroupNonQueryResponse.java
@@ -16,7 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.rpc.raft.response;
+package org.apache.iotdb.cluster.rpc.raft.response.nonquery;
+
+import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
/**
* Handle response from data group leader
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/MetaGroupNonQueryResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/nonquery/MetaGroupNonQueryResponse.java
similarity index 91%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/MetaGroupNonQueryResponse.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/nonquery/MetaGroupNonQueryResponse.java
index f662e35..2ec1165 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/MetaGroupNonQueryResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/nonquery/MetaGroupNonQueryResponse.java
@@ -16,7 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.rpc.raft.response;
+package org.apache.iotdb.cluster.rpc.raft.response.nonquery;
+
+import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
/**
* Handle response from metadata group leader
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querydata/QuerySeriesDataResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querydata/QuerySeriesDataResponse.java
new file mode 100644
index 0000000..9f62617
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querydata/QuerySeriesDataResponse.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.response.querydata;
+
+import java.util.Map;
+import org.apache.iotdb.cluster.query.PathType;
+import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+
+public class QuerySeriesDataResponse extends BasicResponse {
+
+ private long jobId;
+ private PathType pathType;
+ private Map<String, TSDataType> seriesType;
+ private Map<String, BatchData> seriesBatchData;
+
+ public QuerySeriesDataResponse(String groupId, PathType pathType) {
+ super(groupId, false, null, null);
+ this.pathType = pathType;
+ }
+
+ public QuerySeriesDataResponse setJobId(long jobId) {
+ this.jobId = jobId;
+ return this;
+ }
+
+ public QuerySeriesDataResponse setSeriesType(Map<String, TSDataType> seriesType) {
+ this.seriesType = seriesType;
+ return this;
+ }
+
+ public QuerySeriesDataResponse seySeriesBatchData(Map<String, BatchData> seriesBatchData) {
+ this.seriesBatchData = seriesBatchData;
+ return this;
+ }
+
+ public long getJobId() {
+ return jobId;
+ }
+
+ public PathType getPathType() {
+ return pathType;
+ }
+
+ public Map<String, TSDataType> getSeriesType() {
+ return seriesType;
+ }
+
+ public Map<String, BatchData> getSeriesBatchData() {
+ return seriesBatchData;
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataInStringResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryMetadataInStringResponse.java
similarity index 92%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataInStringResponse.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryMetadataInStringResponse.java
index a3a963a..8967ea1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataInStringResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryMetadataInStringResponse.java
@@ -16,7 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.rpc.raft.response;
+package org.apache.iotdb.cluster.rpc.raft.response.querymetadata;
+
+import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
public class QueryMetadataInStringResponse extends BasicResponse {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryMetadataResponse.java
similarity index 92%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataResponse.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryMetadataResponse.java
index 6c21798..3f0ad51 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryMetadataResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryMetadataResponse.java
@@ -16,8 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.rpc.raft.response;
+package org.apache.iotdb.cluster.rpc.raft.response.querymetadata;
+import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
import org.apache.iotdb.db.metadata.Metadata;
public class QueryMetadataResponse extends BasicResponse {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryPathsResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryPathsResponse.java
similarity index 92%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryPathsResponse.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryPathsResponse.java
index 29d659a..47f51f7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryPathsResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryPathsResponse.java
@@ -16,10 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.rpc.raft.response;
+package org.apache.iotdb.cluster.rpc.raft.response.querymetadata;
import java.util.ArrayList;
import java.util.List;
+import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
public class QueryPathsResponse extends BasicResponse {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QuerySeriesTypeResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QuerySeriesTypeResponse.java
similarity index 92%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QuerySeriesTypeResponse.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QuerySeriesTypeResponse.java
index e86e108..d772365 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QuerySeriesTypeResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QuerySeriesTypeResponse.java
@@ -16,8 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.rpc.raft.response;
+package org.apache.iotdb.cluster.rpc.raft.response.querymetadata;
+import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public class QuerySeriesTypeResponse extends BasicResponse {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryStorageGroupResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryStorageGroupResponse.java
similarity index 92%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryStorageGroupResponse.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryStorageGroupResponse.java
index 6abff89..4668e6d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryStorageGroupResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryStorageGroupResponse.java
@@ -16,9 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.rpc.raft.response;
+package org.apache.iotdb.cluster.rpc.raft.response.querymetadata;
import java.util.Set;
+import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
public class QueryStorageGroupResponse extends BasicResponse {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryTimeSeriesResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryTimeSeriesResponse.java
similarity index 92%
rename from cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryTimeSeriesResponse.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryTimeSeriesResponse.java
index edeb4c4..2950b1f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/QueryTimeSeriesResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetadata/QueryTimeSeriesResponse.java
@@ -16,10 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.rpc.raft.response;
+package org.apache.iotdb.cluster.rpc.raft.response.querymetadata;
import java.util.ArrayList;
import java.util.List;
+import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
public class QueryTimeSeriesResponse extends BasicResponse {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
index 79b2acd..d978f7a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
@@ -31,7 +31,7 @@ import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.exception.ConsistencyLevelException;
import org.apache.iotdb.cluster.qp.executor.NonQueryExecutor;
import org.apache.iotdb.cluster.qp.executor.QueryMetadataExecutor;
-import org.apache.iotdb.cluster.query.coordinatornode.executor.ClusterQueryProcessExecutor;
+import org.apache.iotdb.cluster.qp.executor.ClusterQueryProcessExecutor;
import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterRpcQueryManager;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.conf.IoTDBConstant;
@@ -65,11 +65,13 @@ public class TSServiceClusterImpl extends TSServiceImpl {
private static final Logger LOGGER = LoggerFactory.getLogger(TSServiceClusterImpl.class);
- private ClusterRpcQueryManager queryManager = ClusterRpcQueryManager.getInstance();
- private QueryProcessor processor = new QueryProcessor(new ClusterQueryProcessExecutor());
+ private ClusterQueryProcessExecutor queryDataExecutor = new ClusterQueryProcessExecutor();
private NonQueryExecutor nonQueryExecutor = new NonQueryExecutor();
private QueryMetadataExecutor queryMetadataExecutor = new QueryMetadataExecutor();
+ private ClusterRpcQueryManager queryManager = ClusterRpcQueryManager.getInstance();
+ private QueryProcessor queryProcessor = new QueryProcessor(queryDataExecutor);
+
public TSServiceClusterImpl() throws IOException {
super();
}
@@ -126,7 +128,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
/** find all valid physical plans **/
for (int i = 0; i < statements.size(); i++) {
try {
- PhysicalPlan plan = processor
+ PhysicalPlan plan = queryProcessor
.parseSQLToPhysicalPlan(statements.get(i), zoneIds.get());
plan.setProposer(username.get());
@@ -242,13 +244,13 @@ public class TSServiceClusterImpl extends TSServiceImpl {
if (Pattern.matches(ClusterConstant.SET_READ_METADATA_CONSISTENCY_LEVEL_PATTERN, statement)) {
String[] splits = statement.split("\\s+");
int level = Integer.parseInt(splits[splits.length - 1]);
- nonQueryExecutor.setReadMetadataConsistencyLevel(level);
+ queryMetadataExecutor.setReadMetadataConsistencyLevel(level);
return true;
} else if (Pattern
.matches(ClusterConstant.SET_READ_DATA_CONSISTENCY_LEVEL_PATTERN, statement)) {
String[] splits = statement.split("\\s+");
int level = Integer.parseInt(splits[splits.length - 1]);
- nonQueryExecutor.setReadDataConsistencyLevel(level);
+ queryDataExecutor.setReadDataConsistencyLevel(level);
return true;
} else {
return false;
@@ -295,7 +297,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
throws PathErrorException, QueryFilterOptimizationException, FileNodeManagerException,
ProcessorException, IOException {
PhysicalPlan physicalPlan = queryStatus.get().get(statement);
- processor.getExecutor().setFetchSize(fetchSize);
+ queryProcessor.getExecutor().setFetchSize(fetchSize);
long jobId = QueryResourceManager.getInstance().assignJobId();
QueryContext context = new QueryContext(jobId);
@@ -303,7 +305,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
contextMapLocal.get().put(req.queryId, context);
queryManager.addSingleQuery(jobId, (QueryPlan) physicalPlan);
- QueryDataSet queryDataSet = processor.getExecutor().processQuery((QueryPlan) physicalPlan,
+ QueryDataSet queryDataSet = queryProcessor.getExecutor().processQuery((QueryPlan) physicalPlan,
context);
queryRet.get().put(statement, queryDataSet);
return queryDataSet;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index 5c3f3cb..d2d5429 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -47,7 +47,7 @@ import org.apache.iotdb.cluster.rpc.raft.closure.ResponseClosure;
import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager;
import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
-import org.apache.iotdb.cluster.rpc.raft.response.MetaGroupNonQueryResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.nonquery.MetaGroupNonQueryResponse;
import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
import org.apache.iotdb.cluster.utils.hash.Router;
import org.slf4j.Logger;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/ClusterRpcReaderUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/ClusterRpcReaderUtils.java
new file mode 100644
index 0000000..35ff3fb
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/ClusterRpcReaderUtils.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.utils.query;
+
+import com.alipay.sofa.jraft.entity.PeerId;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.exception.RaftConnectionException;
+import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
+import org.apache.iotdb.cluster.query.PathType;
+import org.apache.iotdb.cluster.query.coordinatornode.reader.ClusterRpcBatchDataReader;
+import org.apache.iotdb.cluster.query.coordinatornode.reader.ClusterSeriesReader;
+import org.apache.iotdb.cluster.rpc.raft.NodeAsClient;
+import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataResponse;
+import org.apache.iotdb.cluster.utils.RaftUtils;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.reader.IBatchReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+
+public class ClusterRpcReaderUtils {
+
+ /**
+ * Count limit to redo a task
+ */
+ private static final int TASK_MAX_RETRY = ClusterDescriptor.getInstance().getConfig()
+ .getQpTaskRedoCount();
+
+ public static Map<String, ClusterSeriesReader> createClusterSeriesReader(String groupId,
+ PeerId peerId, QueryPlan queryPlan, int readDataConsistencyLevel, PathType pathType)
+ throws IOException, RaftConnectionException {
+
+ /** handle request **/
+ List<PhysicalPlan> physicalPlanList = new ArrayList<>();
+ physicalPlanList.add(queryPlan);
+ BasicRequest request = new QuerySeriesDataRequest(groupId, readDataConsistencyLevel,
+ physicalPlanList, pathType);
+ QuerySeriesDataResponse response = (QuerySeriesDataResponse) handleQueryRequest(request, peerId,
+ 0);
+
+ /** create cluster series reader **/
+ Map<String, ClusterSeriesReader> allSeriesReader = new HashMap<>();
+ Map<String, TSDataType> seriesType = response.getSeriesType();
+ Map<String, BatchData> seriesBatchData = response.getSeriesBatchData();
+ long jobId = response.getJobId();
+ for (Entry<String, TSDataType> entry : seriesType.entrySet()) {
+ String seriesPath = entry.getKey();
+ TSDataType dataType = entry.getValue();
+ IBatchReader batchDataReader = new ClusterRpcBatchDataReader(peerId, jobId,
+ PathType.SELECT_PATH, seriesBatchData.get(seriesPath));
+ ClusterSeriesReader seriesReader = new ClusterSeriesReader(batchDataReader, seriesPath,
+ dataType);
+ allSeriesReader.put(seriesPath, seriesReader);
+ }
+ return allSeriesReader;
+ }
+
+ private static BasicResponse handleQueryRequest(BasicRequest request, PeerId peerId,
+ int taskRetryNum)
+ throws RaftConnectionException {
+ if (taskRetryNum > TASK_MAX_RETRY) {
+ throw new RaftConnectionException(
+ String.format("Query request retries reach the upper bound %s",
+ TASK_MAX_RETRY));
+ }
+ NodeAsClient nodeAsClient = RaftUtils.getRaftNodeAsClient();
+ QueryTask queryTask = nodeAsClient.syncHandleRequest(request, peerId);
+ if (queryTask.getState() == TaskState.FINISH) {
+ return queryTask.getBasicResponse();
+ } else {
+ return handleQueryRequest(request, peerId, taskRetryNum + 1);
+ }
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterRpcBatchDataReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/QueryTask.java
similarity index 54%
copy from cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterRpcBatchDataReader.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/utils/query/QueryTask.java
index 5b75c9d..69f72b2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterRpcBatchDataReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/QueryTask.java
@@ -16,33 +16,34 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.query.coordinatornode.reader;
+package org.apache.iotdb.cluster.utils.query;
-import java.io.IOException;
-import org.apache.iotdb.cluster.query.PathType;
-import org.apache.iotdb.db.query.reader.IBatchReader;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
+import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
-public class ClusterRpcBatchDataReader implements IBatchReader {
+public class QueryTask {
+ private BasicResponse basicResponse;
+ private TaskState state;
- private String PeerId;
- private String jobId;
- private PathType type;
- private BatchData batchData;
+ public QueryTask(BasicResponse basicResponse,
+ TaskState state) {
+ this.basicResponse = basicResponse;
+ this.state = state;
+ }
- @Override
- public boolean hasNext() throws IOException {
- return false;
+ public BasicResponse getBasicResponse() {
+ return basicResponse;
}
- @Override
- public BatchData nextBatch() throws IOException {
- return null;
+ public void setBasicResponse(BasicResponse basicResponse) {
+ this.basicResponse = basicResponse;
}
- @Override
- public void close() throws IOException {
+ public TaskState getState() {
+ return state;
+ }
+ public void setState(TaskState state) {
+ this.state = state;
}
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java
index d35746d..85a692d 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java
@@ -40,7 +40,7 @@ import org.apache.iotdb.cluster.entity.raft.RaftService;
import org.apache.iotdb.cluster.qp.task.QPTask;
import org.apache.iotdb.cluster.qp.task.SingleQPTask;
import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
-import org.apache.iotdb.cluster.rpc.raft.request.DataGroupNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.nonquery.DataGroupNonQueryRequest;
import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
import org.apache.iotdb.cluster.utils.hash.Router;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
index 68cde76..44d0a58 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/IQueryProcessExecutor.java
@@ -140,5 +140,4 @@ public interface IQueryProcessExecutor {
void setFetchSize(int fetchSize);
- IEngineQueryRouter getQueryRouter();
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
index 58702d8..9a4566e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
@@ -185,14 +185,14 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
QueryContext context)
throws ProcessorException, FileNodeManagerException, QueryFilterOptimizationException,
PathErrorException, IOException {
- return getQueryRouter().aggregate(paths, aggres, expression, context);
+ return queryRouter.aggregate(paths, aggres, expression, context);
}
@Override
public QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillTypes,
QueryContext context)
throws ProcessorException, IOException, PathErrorException, FileNodeManagerException {
- return getQueryRouter().fill(fillPaths, queryTime, fillTypes, context);
+ return queryRouter.fill(fillPaths, queryTime, fillTypes, context);
}
@Override
@@ -200,7 +200,7 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
long unit, long origin, List<Pair<Long, Long>> intervals, QueryContext context)
throws ProcessorException, FileNodeManagerException, QueryFilterOptimizationException,
PathErrorException, IOException {
- return getQueryRouter().groupBy(paths, aggres, expression, unit, origin, intervals, context);
+ return queryRouter.groupBy(paths, aggres, expression, unit, origin, intervals, context);
}
@Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index 54c750b..bef0328 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -43,7 +43,7 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
public abstract class QueryProcessExecutor implements IQueryProcessExecutor {
protected ThreadLocal<Integer> fetchSize = new ThreadLocal<>();
- private IEngineQueryRouter queryRouter = new EngineQueryRouter();
+ protected IEngineQueryRouter queryRouter = new EngineQueryRouter();
@Override
public QueryDataSet processQuery(QueryPlan queryPlan, QueryContext context)
@@ -69,7 +69,7 @@ public abstract class QueryProcessExecutor implements IQueryProcessExecutor {
return fill(queryPlan.getPaths(), fillQueryPlan.getQueryTime(),
fillQueryPlan.getFillType(), context);
}
- return getQueryRouter().query(queryExpression, context);
+ return queryRouter.query(queryExpression, context);
}
@Override
@@ -86,11 +86,6 @@ public abstract class QueryProcessExecutor implements IQueryProcessExecutor {
}
@Override
- public IEngineQueryRouter getQueryRouter() {
- return queryRouter;
- }
-
- @Override
public boolean delete(List<Path> paths, long deleteTime) throws ProcessorException {
try {
boolean result = true;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index fac4765..aeb789e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.tsfile.read.common;
+import java.io.Serializable;
import java.util.ArrayList;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -28,8 +29,9 @@ import org.apache.iotdb.tsfile.utils.Binary;
* <code>BatchData</code> is a self-defined data structure which is optimized for different type of
* values. This class can be viewed as a collection which is more efficient than ArrayList.
*/
-public class BatchData {
+public class BatchData implements Serializable {
+ private static final long serialVersionUID = -4620310601188394839L;
private int timeCapacity = 1;
private int valueCapacity = 1;
private int emptyTimeCapacity = 1;