You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/16 12:27:05 UTC
[incubator-iotdb] 14/19: add single query manager
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster_read
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit e0439b68e6801d2969a2f9e9b3969e81ecbc207a
Author: lta <li...@163.com>
AuthorDate: Tue Apr 16 11:23:05 2019 +0800
add single query manager
---
.../cluster/qp/executor/AbstractQPExecutor.java | 105 +-------------
.../cluster/qp/executor/NonQueryExecutor.java | 17 +--
.../cluster/qp/executor/QueryMetadataExecutor.java | 17 +--
.../apache/iotdb/cluster/qp/task/BatchQPTask.java | 3 +-
...xecutorWithTimeGenerator.java => PathType.java} | 6 +-
.../executor/ClusterExecutorWithTimeGenerator.java | 64 +++++++++
.../ClusterExecutorWithoutTimeGenerator.java | 145 +++++++++++---------
.../executor/ClusterQueryProcessExecutor.java | 1 +
.../executor/ClusterQueryRouter.java | 39 +++++-
.../ClusterRpcReaderFactory.java} | 12 +-
.../manager/ClusterRpcQueryManager.java | 4 +-
.../manager/ClusterSingleQueryManager.java | 152 +++++++++++++++++++--
.../manager/IClusterSingleQueryManager.java | 6 +-
...hReader.java => ClusterRpcBatchDataReader.java} | 9 +-
...AllDataReader.java => ClusterSeriesReader.java} | 37 ++++-
.../cluster/service/TSServiceClusterImpl.java | 40 +-----
.../iotdb/cluster/utils/QPExecutorUtils.java | 120 ++++++++++++++++
.../org/apache/iotdb/cluster/utils/RaftUtils.java | 25 ++++
.../apache/iotdb/cluster/utils/hash/Router.java | 7 +
.../db/query/executor/IEngineQueryRouter.java | 2 +-
20 files changed, 564 insertions(+), 247 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
index 8f635c4..e5df083 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
@@ -51,17 +51,8 @@ public abstract class AbstractQPExecutor {
private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig();
- /**
- * Raft as client manager.
- */
- private static final RaftNodeAsClientManager CLIENT_MANAGER = RaftNodeAsClientManager
- .getInstance();
-
protected Router router = Router.getInstance();
- private PhysicalNode localNode = new PhysicalNode(CLUSTER_CONFIG.getIp(),
- CLUSTER_CONFIG.getPort());
-
protected MManager mManager = MManager.getInstance();
protected final Server server = Server.getInstance();
@@ -87,84 +78,6 @@ public abstract class AbstractQPExecutor {
private int readDataConsistencyLevel = CLUSTER_CONFIG.getReadDataConsistencyLevel();
/**
- * Get Storage Group Name by device name
- */
- protected String getStroageGroupByDevice(String device) throws PathErrorException {
- String storageGroup;
- try {
- storageGroup = MManager.getInstance().getFileNameByPath(device);
- } catch (PathErrorException e) {
- throw new PathErrorException(String.format("File level of %s doesn't exist.", device));
- }
- return storageGroup;
- }
-
- /**
- * Get all Storage Group Names by path
- */
- public List<String> getAllStroageGroupsByPath(String path) throws PathErrorException {
- List<String> storageGroupList;
- try {
- storageGroupList = mManager.getAllFileNamesByPath(path);
- } catch (PathErrorException e) {
- throw new PathErrorException(String.format("File level of %s doesn't exist.", path));
- }
- return storageGroupList;
- }
-
- /**
- * Classify the input storage group list by which data group it belongs to.
- *
- * @return key is groupId, value is all SGs belong to this data group
- */
- protected Map<String, Set<String>> classifySGByGroupId(List<String> sgList) {
- Map<String, Set<String>> map = new HashMap<>();
- for (int i = 0; i < sgList.size(); i++) {
- String sg = sgList.get(i);
- String groupId = getGroupIdBySG(sg);
- if (map.containsKey(groupId)) {
- map.get(groupId).add(sg);
- } else {
- Set<String> set = new HashSet<>();
- set.add(sg);
- map.put(groupId, set);
- }
- }
- return map;
- }
-
- /**
- * Get raft group id by storage group name
- */
- protected String getGroupIdBySG(String storageGroup) {
- return router.getGroupID(router.routeGroup(storageGroup));
- }
-
- /**
- * Check if the non query command can execute in local. 1. If this node belongs to the storage
- * group 2. If this node is leader.
- */
- public boolean canHandleNonQueryByGroupId(String groupId) {
- boolean canHandle = false;
- if(groupId.equals(ClusterConfig.METADATA_GROUP_ID)){
- canHandle = ((MetadataRaftHolder) (server.getMetadataHolder())).getFsm().isLeader();
- }else {
- if (router.containPhysicalNodeByGroupId(groupId, localNode) && RaftUtils
- .getPhysicalNodeFrom(RaftUtils.getLeaderPeerID(groupId)).equals(localNode)) {
- canHandle = true;
- }
- }
- return canHandle;
- }
-
- /**
- * Check if the query command can execute in local. Check if this node belongs to the group id
- */
- protected boolean canHandleQueryByGroupId(String groupId) {
- return router.containPhysicalNodeByGroupId(groupId, localNode);
- }
-
- /**
* Async handle QPTask by QPTask and leader id
*
* @param task request QPTask
@@ -185,32 +98,18 @@ public abstract class AbstractQPExecutor {
* @param leader leader node of the group
* @param taskRetryNum Retry time of the task
*/
- public void asyncSendNonQuerySingleTask(SingleQPTask task, PeerId leader, int taskRetryNum)
+ protected void asyncSendNonQuerySingleTask(SingleQPTask task, PeerId leader, int taskRetryNum)
throws RaftConnectionException {
if (taskRetryNum >= TASK_MAX_RETRY) {
throw new RaftConnectionException(String.format("QPTask retries reach the upper bound %s",
TASK_MAX_RETRY));
}
- NodeAsClient client = getRaftNodeAsClient();
+ NodeAsClient client = RaftUtils.getRaftNodeAsClient();
/** Call async method **/
client.asyncHandleRequest(task.getRequest(), leader, task);
}
/**
- * try to get raft rpc client
- */
- private NodeAsClient getRaftNodeAsClient() throws RaftConnectionException {
- NodeAsClient client = CLIENT_MANAGER.getRaftNodeAsClient();
- if (client == null) {
- throw new RaftConnectionException(String
- .format("Raft inner rpc clients have reached the max numbers %s",
- CLUSTER_CONFIG.getMaxNumOfInnerRpcClient() + CLUSTER_CONFIG
- .getMaxQueueNumOfInnerRpcClient()));
- }
- return client;
- }
-
- /**
* Synchronous get task response. If it's redirected or status is exception, the task needs to be
* resent. Note: If status is Exception, it marks that an exception occurred during the task is
* being sent instead of executed.
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index f90bc66..ecafe4e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
import org.apache.iotdb.cluster.rpc.raft.response.DataGroupNonQueryResponse;
import org.apache.iotdb.cluster.rpc.raft.response.MetaGroupNonQueryResponse;
import org.apache.iotdb.cluster.service.TSServiceClusterImpl.BatchResult;
+import org.apache.iotdb.cluster.utils.QPExecutorUtils;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
@@ -196,16 +197,16 @@ public class NonQueryExecutor extends AbstractQPExecutor {
switch (plan.getOperatorType()) {
case DELETE:
storageGroup = getStorageGroupFromDeletePlan((DeletePlan) plan);
- groupId = getGroupIdBySG(storageGroup);
+ groupId = router.getGroupIdBySG(storageGroup);
break;
case UPDATE:
Path path = ((UpdatePlan) plan).getPath();
- storageGroup = getStroageGroupByDevice(path.getDevice());
- groupId = getGroupIdBySG(storageGroup);
+ storageGroup = QPExecutorUtils.getStroageGroupByDevice(path.getDevice());
+ groupId = router.getGroupIdBySG(storageGroup);
break;
case INSERT:
- storageGroup = getStroageGroupByDevice(((InsertPlan) plan).getDeviceId());
- groupId = getGroupIdBySG(storageGroup);
+ storageGroup = QPExecutorUtils.getStroageGroupByDevice(((InsertPlan) plan).getDeviceId());
+ groupId = router.getGroupIdBySG(storageGroup);
break;
case CREATE_ROLE:
case DELETE_ROLE:
@@ -284,8 +285,8 @@ public class NonQueryExecutor extends AbstractQPExecutor {
case ADD_PATH:
case DELETE_PATH:
String deviceId = path.getDevice();
- String storageGroup = getStroageGroupByDevice(deviceId);
- groupId = getGroupIdBySG(storageGroup);
+ String storageGroup = QPExecutorUtils.getStroageGroupByDevice(deviceId);
+ groupId = router.getGroupIdBySG(storageGroup);
break;
case SET_FILE_LEVEL:
boolean fileLevelExist = mManager.checkStorageLevelOfMTree(path.getFullPath());
@@ -319,7 +320,7 @@ public class NonQueryExecutor extends AbstractQPExecutor {
currentTask.set(qpTask);
/** Check if the plan can be executed locally. **/
- if (canHandleNonQueryByGroupId(groupId)) {
+ if (QPExecutorUtils.canHandleNonQueryByGroupId(groupId)) {
return handleNonQueryRequestLocally(groupId, qpTask);
} else {
PeerId leader = RaftUtils.getLeaderPeerID(groupId);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 0702548..aea0656 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.cluster.rpc.raft.response.QueryPathsResponse;
import org.apache.iotdb.cluster.rpc.raft.response.QuerySeriesTypeResponse;
import org.apache.iotdb.cluster.rpc.raft.response.QueryStorageGroupResponse;
import org.apache.iotdb.cluster.rpc.raft.response.QueryTimeSeriesResponse;
+import org.apache.iotdb.cluster.utils.QPExecutorUtils;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
@@ -81,7 +82,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
if (storageGroupList.isEmpty()) {
return new ArrayList<>();
} else {
- Map<String, Set<String>> groupIdSGMap = classifySGByGroupId(storageGroupList);
+ Map<String, Set<String>> groupIdSGMap = QPExecutorUtils.classifySGByGroupId(storageGroupList);
for (Entry<String, Set<String>> entry : groupIdSGMap.entrySet()) {
List<String> paths = getSubQueryPaths(entry.getValue(), path);
String groupId = entry.getKey();
@@ -130,7 +131,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
LOGGER.debug("Execute show timeseries {} statement for group {}.", pathList, groupId);
PeerId holder;
/** Check if the plan can be executed locally. **/
- if (canHandleQueryByGroupId(groupId)) {
+ if (QPExecutorUtils.canHandleQueryByGroupId(groupId)) {
LOGGER.debug("Execute show timeseries {} statement locally for group {} by sending request to local node.", pathList, groupId);
holder = this.server.getServerId();
} else {
@@ -158,7 +159,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
LOGGER.debug("Execute show metadata in string statement for group {}.", groupId);
PeerId holder;
/** Check if the plan can be executed locally. **/
- if (canHandleQueryByGroupId(groupId)) {
+ if (QPExecutorUtils.canHandleQueryByGroupId(groupId)) {
LOGGER.debug("Execute show metadata in string statement locally for group {} by sending request to local node.", groupId);
holder = this.server.getServerId();
} else {
@@ -197,7 +198,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
LOGGER.debug("Execute query metadata statement for group {}.", groupId);
PeerId holder;
/** Check if the plan can be executed locally. **/
- if (canHandleQueryByGroupId(groupId)) {
+ if (QPExecutorUtils.canHandleQueryByGroupId(groupId)) {
LOGGER.debug("Execute query metadata statement locally for group {} by sending request to local node.", groupId);
holder = this.server.getServerId();
} else {
@@ -232,7 +233,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
if (storageGroupList.size() != 1) {
throw new PathErrorException("path " + path + " is not valid.");
} else {
- String groupId = getGroupIdBySG(storageGroupList.get(0));
+ String groupId = router.getGroupIdBySG(storageGroupList.get(0));
QuerySeriesTypeRequest request = new QuerySeriesTypeRequest(groupId,
readMetadataConsistencyLevel, path);
SingleQPTask task = new SingleQPTask(false, request);
@@ -240,7 +241,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
LOGGER.debug("Execute get series type for {} statement for group {}.", path, groupId);
PeerId holder;
/** Check if the plan can be executed locally. **/
- if (canHandleQueryByGroupId(groupId)) {
+ if (QPExecutorUtils.canHandleQueryByGroupId(groupId)) {
LOGGER.debug("Execute get series type for {} statement locally for group {} by sending request to local node.", path, groupId);
holder = this.server.getServerId();
} else {
@@ -265,7 +266,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
if (storageGroupList.isEmpty()) {
return new ArrayList<>();
} else {
- Map<String, Set<String>> groupIdSGMap = classifySGByGroupId(storageGroupList);
+ Map<String, Set<String>> groupIdSGMap = QPExecutorUtils.classifySGByGroupId(storageGroupList);
for (Entry<String, Set<String>> entry : groupIdSGMap.entrySet()) {
List<String> paths = getSubQueryPaths(entry.getValue(), path);
String groupId = entry.getKey();
@@ -289,7 +290,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
LOGGER.debug("Execute get paths for {} statement for group {}.", pathList, groupId);
PeerId holder;
/** Check if the plan can be executed locally. **/
- if (canHandleQueryByGroupId(groupId)) {
+ if (QPExecutorUtils.canHandleQueryByGroupId(groupId)) {
LOGGER.debug("Execute get paths for {} statement locally for group {} by sending request to local node.", pathList, groupId);
holder = this.server.getServerId();
} else {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
index 67c7362..d224bae 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.cluster.qp.executor.NonQueryExecutor;
import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
import org.apache.iotdb.cluster.rpc.raft.response.DataGroupNonQueryResponse;
import org.apache.iotdb.cluster.service.TSServiceClusterImpl.BatchResult;
+import org.apache.iotdb.cluster.utils.QPExecutorUtils;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -121,7 +122,7 @@ public class BatchQPTask extends MultiQPTask {
String groupId = entry.getKey();
SingleQPTask subTask = entry.getValue();
Future<?> taskThread;
- if (executor.canHandleNonQueryByGroupId(groupId)) {
+ if (QPExecutorUtils.canHandleNonQueryByGroupId(groupId)) {
taskThread = QPTaskManager.getInstance()
.submit(() -> executeLocalSubTask(subTask, groupId));
} else {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/PathType.java
similarity index 87%
copy from cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/query/PathType.java
index 8bbff0b..d25bb86 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/PathType.java
@@ -16,8 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.query.coordinatornode.executor;
-
-public class ClusterExecutorWithTimeGenerator {
+package org.apache.iotdb.cluster.query;
+public enum PathType {
+ SELECT_PATH, FILTER_PATH
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java
index 8bbff0b..d1ca472 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java
@@ -18,6 +18,70 @@
*/
package org.apache.iotdb.cluster.query.coordinatornode.executor;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.cluster.query.coordinatornode.manager.IClusterSingleQueryManager;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.dataset.EngineDataSetWithTimeGenerator;
+import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
+import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
+import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
public class ClusterExecutorWithTimeGenerator {
+ private QueryExpression queryExpression;
+ private IClusterSingleQueryManager queryManager;
+
+ ClusterExecutorWithTimeGenerator(QueryExpression queryExpression, IClusterSingleQueryManager queryManager) {
+ this.queryExpression = queryExpression;
+ this.queryManager = queryManager;
+ }
+
+ /**
+ * execute query.
+ *
+ * @return QueryDataSet object
+ * @throws IOException IOException
+ * @throws FileNodeManagerException FileNodeManagerException
+ */
+ public QueryDataSet execute(QueryContext context) throws FileNodeManagerException {
+
+ QueryResourceManager.getInstance()
+ .beginQueryOfGivenQueryPaths(context.getJobId(), queryExpression.getSelectedSeries());
+ QueryResourceManager.getInstance()
+ .beginQueryOfGivenExpression(context.getJobId(), queryExpression.getExpression());
+
+ EngineTimeGenerator timestampGenerator;
+ List<EngineReaderByTimeStamp> readersOfSelectedSeries;
+ try {
+ timestampGenerator = new EngineTimeGenerator(queryExpression.getExpression(), context);
+ readersOfSelectedSeries = SeriesReaderFactory
+ .getByTimestampReadersOfSelectedPaths(queryExpression.getSelectedSeries(), context);
+ } catch (IOException ex) {
+ throw new FileNodeManagerException(ex);
+ }
+
+ List<TSDataType> dataTypes = new ArrayList<>();
+
+ for (Path path : queryExpression.getSelectedSeries()) {
+ try {
+ dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
+ } catch (PathErrorException e) {
+ throw new FileNodeManagerException(e);
+ }
+
+ }
+ return new EngineDataSetWithTimeGenerator(queryExpression.getSelectedSeries(), dataTypes,
+ timestampGenerator,
+ readersOfSelectedSeries);
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithoutTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithoutTimeGenerator.java
index 0392bce..7bd8008 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithoutTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithoutTimeGenerator.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.cluster.query.coordinatornode.executor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterSingleQueryManager;
+import org.apache.iotdb.cluster.query.coordinatornode.reader.ClusterSeriesReader;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
@@ -42,9 +45,12 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
public class ClusterExecutorWithoutTimeGenerator {
private QueryExpression queryExpression;
+ private ClusterSingleQueryManager queryManager;
- public ClusterExecutorWithoutTimeGenerator(QueryExpression queryExpression) {
+ public ClusterExecutorWithoutTimeGenerator(QueryExpression queryExpression,
+ ClusterSingleQueryManager queryManager) {
this.queryExpression = queryExpression;
+ this.queryManager = queryManager;
}
/**
@@ -62,37 +68,37 @@ public class ClusterExecutorWithoutTimeGenerator {
.beginQueryOfGivenQueryPaths(context.getJobId(), queryExpression.getSelectedSeries());
for (Path path : queryExpression.getSelectedSeries()) {
-
- QueryDataSource queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(path,
- context);
-
- // add data type
- try {
- dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
- } catch (PathErrorException e) {
- throw new FileNodeManagerException(e);
- }
-
- // sequence reader for one sealed tsfile
- SequenceDataReader tsFilesReader;
- try {
- tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
- timeFilter, context);
- } catch (IOException e) {
- throw new FileNodeManagerException(e);
- }
-
- // unseq reader for all chunk groups in unSeqFile
- PriorityMergeReader unSeqMergeReader;
- try {
- unSeqMergeReader = SeriesReaderFactory.getInstance()
- .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), timeFilter);
- } catch (IOException e) {
- throw new FileNodeManagerException(e);
- }
-
- // merge sequence data with unsequence data.
- readersOfSelectedSeries.add(new AllDataReader(tsFilesReader, unSeqMergeReader));
+//
+// QueryDataSource queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(path,
+// context);
+//
+// // add data type
+// try {
+// dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
+// } catch (PathErrorException e) {
+// throw new FileNodeManagerException(e);
+// }
+//
+// // sequence reader for one sealed tsfile
+// SequenceDataReader tsFilesReader;
+// try {
+// tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
+// timeFilter, context);
+// } catch (IOException e) {
+// throw new FileNodeManagerException(e);
+// }
+//
+// // unseq reader for all chunk groups in unSeqFile
+// PriorityMergeReader unSeqMergeReader;
+// try {
+// unSeqMergeReader = SeriesReaderFactory.getInstance()
+// .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), timeFilter);
+// } catch (IOException e) {
+// throw new FileNodeManagerException(e);
+// }
+//
+// // merge sequence data with unsequence data.
+// readersOfSelectedSeries.add(new AllDataReader(tsFilesReader, unSeqMergeReader));
}
try {
@@ -112,43 +118,54 @@ public class ClusterExecutorWithoutTimeGenerator {
List<IPointReader> readersOfSelectedSeries = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
- QueryResourceManager.getInstance()
- .beginQueryOfGivenQueryPaths(context.getJobId(), queryExpression.getSelectedSeries());
-
+ Map<String, ClusterSeriesReader> selectPathReaders = queryManager.getSelectPathReaders();
+ List<Path> paths = new ArrayList<>();
for (Path path : queryExpression.getSelectedSeries()) {
- QueryDataSource queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(path,
- context);
-
- // add data type
- try {
- dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
- } catch (PathErrorException e) {
- throw new FileNodeManagerException(e);
+ if(selectPathReaders.containsKey(path.toString())){
+ ClusterSeriesReader reader = selectPathReaders.get(path.toString());
+ readersOfSelectedSeries.add(reader);
+ dataTypes.add(reader.getDataType());
+ } else {
+ QueryDataSource queryDataSource = QueryResourceManager.getInstance()
+ .getQueryDataSource(path,
+ context);
+
+ // add data type
+ try {
+ dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
+ } catch (PathErrorException e) {
+ throw new FileNodeManagerException(e);
+ }
+
+ // sequence insert data
+ SequenceDataReader tsFilesReader;
+ try {
+ tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
+ null, context);
+ } catch (IOException e) {
+ throw new FileNodeManagerException(e);
+ }
+
+ // unseq insert data
+ PriorityMergeReader unSeqMergeReader;
+ try {
+ unSeqMergeReader = SeriesReaderFactory.getInstance()
+ .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), null);
+ } catch (IOException e) {
+ throw new FileNodeManagerException(e);
+ }
+
+ // merge sequence data with unsequence data.
+ readersOfSelectedSeries.add(new AllDataReader(tsFilesReader, unSeqMergeReader));
+
+ paths.add(path);
}
-
- // sequence insert data
- SequenceDataReader tsFilesReader;
- try {
- tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
- null, context);
- } catch (IOException e) {
- throw new FileNodeManagerException(e);
- }
-
- // unseq insert data
- PriorityMergeReader unSeqMergeReader;
- try {
- unSeqMergeReader = SeriesReaderFactory.getInstance()
- .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), null);
- } catch (IOException e) {
- throw new FileNodeManagerException(e);
- }
-
- // merge sequence data with unsequence data.
- readersOfSelectedSeries.add(new AllDataReader(tsFilesReader, unSeqMergeReader));
}
+ QueryResourceManager.getInstance()
+ .beginQueryOfGivenQueryPaths(context.getJobId(), paths);
+
try {
return new EngineDataSetWithoutTimeGenerator(queryExpression.getSelectedSeries(), dataTypes,
readersOfSelectedSeries);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryProcessExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryProcessExecutor.java
index 7b51a4f..e7e58f8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryProcessExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryProcessExecutor.java
@@ -110,6 +110,7 @@ public class ClusterQueryProcessExecutor extends QueryProcessExecutor {
this.fetchSize.set(fetchSize);
}
+ @Override
public IEngineQueryRouter getQueryRouter() {
return queryRouter;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java
index c0a853b..8fa1a95 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java
@@ -21,6 +21,10 @@ package org.apache.iotdb.cluster.query.coordinatornode.executor;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterRpcQueryManager;
+import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterSingleQueryManager;
+import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterSingleQueryManager.QueryType;
+import org.apache.iotdb.cluster.query.coordinatornode.manager.IClusterSingleQueryManager;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
@@ -30,8 +34,10 @@ import org.apache.iotdb.db.query.fill.IFill;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.ExpressionType;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -39,8 +45,37 @@ public class ClusterQueryRouter implements IEngineQueryRouter {
@Override
public QueryDataSet query(QueryExpression queryExpression, QueryContext context)
- throws FileNodeManagerException {
- return null;
+ throws FileNodeManagerException, PathErrorException {
+
+ ClusterSingleQueryManager queryManager = ClusterRpcQueryManager.getInstance()
+ .getSingleQuery(context.getJobId());
+ if (queryExpression.hasQueryFilter()) {
+ try {
+ IExpression optimizedExpression = ExpressionOptimizer.getInstance()
+ .optimize(queryExpression.getExpression(), queryExpression.getSelectedSeries());
+ queryExpression.setExpression(optimizedExpression);
+
+ if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
+ queryManager.init(QueryType.GLOBAL_TIME);
+ ClusterExecutorWithoutTimeGenerator engineExecutor =
+ new ClusterExecutorWithoutTimeGenerator(queryExpression, queryManager);
+ return engineExecutor.executeWithGlobalTimeFilter(context);
+ } else {
+ queryManager.init(QueryType.FILTER);
+ ClusterExecutorWithTimeGenerator engineExecutor = new ClusterExecutorWithTimeGenerator(
+ queryExpression, queryManager);
+ return engineExecutor.execute(context);
+ }
+
+ } catch (QueryFilterOptimizationException e) {
+ throw new FileNodeManagerException(e);
+ }
+ } else {
+ queryManager.init(QueryType.NO_FILTER);
+ ClusterExecutorWithoutTimeGenerator engineExecutor =
+ new ClusterExecutorWithoutTimeGenerator(queryExpression, queryManager);
+ return engineExecutor.executeWithoutFilter(context);
+ }
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/factory/ClusterRpcReaderFactory.java
similarity index 64%
copy from cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/factory/ClusterRpcReaderFactory.java
index 8bbff0b..27444af 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/factory/ClusterRpcReaderFactory.java
@@ -16,8 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.query.coordinatornode.executor;
+package org.apache.iotdb.cluster.query.coordinatornode.factory;
-public class ClusterExecutorWithTimeGenerator {
+import com.alipay.sofa.jraft.entity.PeerId;
+import java.util.Map;
+import org.apache.iotdb.cluster.query.coordinatornode.reader.ClusterSeriesReader;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+public class ClusterRpcReaderFactory {
+
+ public static Map<String, ClusterSeriesReader> createClusterSeriesReader(String groupId, PeerId peerId, QueryPlan queryPlan){
+return null;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterRpcQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterRpcQueryManager.java
index 3e9a68c..f33fe05 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterRpcQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterRpcQueryManager.java
@@ -29,7 +29,7 @@ public class ClusterRpcQueryManager{
/**
* Key is group id, value is manager of a client query.
*/
- ConcurrentHashMap<Long, IClusterSingleQueryManager> singleQueryManagerMap = new ConcurrentHashMap<>();
+ ConcurrentHashMap<Long, ClusterSingleQueryManager> singleQueryManagerMap = new ConcurrentHashMap<>();
/**
* Add a query
@@ -41,7 +41,7 @@ public class ClusterRpcQueryManager{
/**
* Get query manager by group id
*/
- public IClusterSingleQueryManager getSingleQuery(long jobId) {
+ public ClusterSingleQueryManager getSingleQuery(long jobId) {
return singleQueryManagerMap.get(jobId);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterSingleQueryManager.java
index 156b9a5..90c1ac5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterSingleQueryManager.java
@@ -19,10 +19,20 @@
package org.apache.iotdb.cluster.query.coordinatornode.manager;
import com.alipay.sofa.jraft.entity.PeerId;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.iotdb.cluster.query.coordinatornode.factory.ClusterRpcReaderFactory;
+import org.apache.iotdb.cluster.query.coordinatornode.reader.ClusterSeriesReader;
+import org.apache.iotdb.cluster.utils.QPExecutorUtils;
+import org.apache.iotdb.cluster.utils.RaftUtils;
+import org.apache.iotdb.cluster.utils.hash.Router;
+import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.tsfile.read.common.Path;
public class ClusterSingleQueryManager implements IClusterSingleQueryManager {
@@ -34,7 +44,7 @@ public class ClusterSingleQueryManager implements IClusterSingleQueryManager {
/**
* Origin query plan parsed by QueryProcessor
*/
- private QueryPlan originPhysicalPlan;
+ private QueryPlan queryPlan;
/**
* Represent selected reader nodes, key is group id and value is selected peer id
@@ -42,25 +52,107 @@ public class ClusterSingleQueryManager implements IClusterSingleQueryManager {
private Map<String, PeerId> readerNodes = new HashMap<>();
/**
- * Physical plans of select paths which are divided from originPhysicalPlan
+ * Query plans of select paths which are divided from queryPlan group by group id
*/
private Map<String, QueryPlan> selectPathPlans = new HashMap<>();
+ private Map<String, ClusterSeriesReader> selectPathReaders = new HashMap<>();
+
/**
- * Physical plans of filter paths which are divided from originPhysicalPlan
+ * Physical plans of filter paths which are divided from queryPlan group by group id
*/
private Map<String, QueryPlan> filterPathPlans = new HashMap<>();
public ClusterSingleQueryManager(long jobId,
- QueryPlan originPhysicalPlan) {
+ QueryPlan queryPlan) {
this.jobId = jobId;
- this.originPhysicalPlan = originPhysicalPlan;
+ this.queryPlan = queryPlan;
}
@Override
- public void dividePhysicalPlan() {
-// List<Path>
-// MManager.getInstance().getFileNameByPath()
+ public void init(QueryType queryType) throws PathErrorException {
+ switch (queryType) {
+ case NO_FILTER:
+ divideNoFilterPhysicalPlan();
+ break;
+ case GLOBAL_TIME:
+ divideGlobalTimePhysicalPlan();
+ break;
+ case FILTER:
+ divideFilterPhysicalPlan();
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ initSelectedPathPlan();
+ initFilterPathPlan();
+ }
+
+ public enum QueryType {
+ NO_FILTER, GLOBAL_TIME, FILTER
+ }
+
+ /**
+ * Divide no-fill type query plan by group id
+ */
+ private void divideNoFilterPhysicalPlan() throws PathErrorException {
+ List<Path> selectPaths = queryPlan.getPaths();
+ Map<String, List<Path>> pathsByGroupId = new HashMap<>();
+ for (Path path : selectPaths) {
+ String storageGroup = QPExecutorUtils.getStroageGroupByDevice(path.getDevice());
+ String groupId = Router.getInstance().getGroupIdBySG(storageGroup);
+ if (pathsByGroupId.containsKey(groupId)) {
+ pathsByGroupId.put(groupId, new ArrayList<>());
+ }
+ pathsByGroupId.get(groupId).add(path);
+ }
+ for (Entry<String, List<Path>> entry : pathsByGroupId.entrySet()) {
+ String groupId = entry.getKey();
+ List<Path> paths = entry.getValue();
+ QueryPlan subQueryPlan = new QueryPlan();
+ subQueryPlan.setProposer(queryPlan.getProposer());
+ subQueryPlan.setPaths(paths);
+ selectPathPlans.put(groupId, subQueryPlan);
+ }
+ }
+
+ private void divideGlobalTimePhysicalPlan() {
+
+ }
+
+ private void divideFilterPhysicalPlan() {
+
+ }
+
+ /**
+ * Init select path
+ */
+ private void initSelectedPathPlan(){
+ if(!selectPathPlans.isEmpty()){
+ for(Entry<String, QueryPlan> entry: selectPathPlans.entrySet()){
+ String groupId = entry.getKey();
+ QueryPlan queryPlan = entry.getValue();
+ if(!canHandleQueryLocally(groupId)) {
+ PeerId randomPeer = RaftUtils.getRandomPeerID(groupId);
+ readerNodes.put(groupId, randomPeer);
+ Map<String, ClusterSeriesReader> selectPathReaders = ClusterRpcReaderFactory
+ .createClusterSeriesReader(groupId, randomPeer, queryPlan);
+ for (Path path : queryPlan.getPaths()) {
+ selectPathReaders.put(path.getFullPath(), selectPathReaders.get(path.getFullPath()));
+ }
+ }
+ }
+ }
+ }
+
+ private void initFilterPathPlan(){
+ if(!filterPathPlans.isEmpty()){
+
+ }
+ }
+
+ private boolean canHandleQueryLocally(String groupId){
+ return QPExecutorUtils.canHandleQueryByGroupId(groupId);
}
@Override
@@ -96,11 +188,47 @@ public class ClusterSingleQueryManager implements IClusterSingleQueryManager {
this.jobId = jobId;
}
- public PhysicalPlan getOriginPhysicalPlan() {
- return originPhysicalPlan;
+ public PhysicalPlan getQueryPlan() {
+ return queryPlan;
+ }
+
+ public void setQueryPlan(QueryPlan queryPlan) {
+ this.queryPlan = queryPlan;
+ }
+
+ public Map<String, PeerId> getReaderNodes() {
+ return readerNodes;
+ }
+
+ public void setReaderNodes(
+ Map<String, PeerId> readerNodes) {
+ this.readerNodes = readerNodes;
+ }
+
+ public Map<String, QueryPlan> getSelectPathPlans() {
+ return selectPathPlans;
+ }
+
+ public void setSelectPathPlans(
+ Map<String, QueryPlan> selectPathPlans) {
+ this.selectPathPlans = selectPathPlans;
+ }
+
+ public Map<String, ClusterSeriesReader> getSelectPathReaders() {
+ return selectPathReaders;
+ }
+
+ public void setSelectPathReaders(
+ Map<String, ClusterSeriesReader> selectPathReaders) {
+ this.selectPathReaders = selectPathReaders;
+ }
+
+ public Map<String, QueryPlan> getFilterPathPlans() {
+ return filterPathPlans;
}
- public void setOriginPhysicalPlan(QueryPlan originPhysicalPlan) {
- this.originPhysicalPlan = originPhysicalPlan;
+ public void setFilterPathPlans(
+ Map<String, QueryPlan> filterPathPlans) {
+ this.filterPathPlans = filterPathPlans;
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterSingleQueryManager.java
index c5fffbc..dc84d32 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterSingleQueryManager.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.cluster.query.coordinatornode.manager;
import com.alipay.sofa.jraft.entity.PeerId;
+import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterSingleQueryManager.QueryType;
+import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
/**
@@ -28,8 +30,10 @@ public interface IClusterSingleQueryManager {
/**
* Divide physical plan into several sub physical plans according to timeseries full path.
+ * @param queryType
*/
- void dividePhysicalPlan();
+ void init(
+ QueryType queryType) throws PathErrorException;
/**
* Get physical plan of select path
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterRpcBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterRpcBatchDataReader.java
similarity index 81%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterRpcBatchReader.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterRpcBatchDataReader.java
index fc9eb8c..5b75c9d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterRpcBatchReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterRpcBatchDataReader.java
@@ -19,10 +19,17 @@
package org.apache.iotdb.cluster.query.coordinatornode.reader;
import java.io.IOException;
+import org.apache.iotdb.cluster.query.PathType;
import org.apache.iotdb.db.query.reader.IBatchReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
-public class ClusterRpcBatchReader implements IBatchReader {
+public class ClusterRpcBatchDataReader implements IBatchReader {
+
+ private String PeerId;
+ private String jobId;
+ private PathType type;
+ private BatchData batchData;
@Override
public boolean hasNext() throws IOException {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterAllDataReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterSeriesReader.java
similarity index 60%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterAllDataReader.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterSeriesReader.java
index 684ead0..438559e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterAllDataReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/reader/ClusterSeriesReader.java
@@ -22,10 +22,21 @@ import java.io.IOException;
import org.apache.iotdb.db.query.reader.IBatchReader;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
-public class ClusterAllDataReader implements IPointReader {
+public class ClusterSeriesReader implements IPointReader {
private IBatchReader rpcBatchReader;
+ private String fullPath;
+ private TSDataType dataType;
+
+ public ClusterSeriesReader(IBatchReader rpcBatchReader, String fullPath,
+ TSDataType dataType) {
+ this.rpcBatchReader = rpcBatchReader;
+ this.fullPath = fullPath;
+ this.dataType = dataType;
+ }
@Override
public TimeValuePair current() throws IOException {
@@ -46,4 +57,28 @@ public class ClusterAllDataReader implements IPointReader {
public void close() throws IOException {
}
+
+ public IBatchReader getRpcBatchReader() {
+ return rpcBatchReader;
+ }
+
+ public void setRpcBatchReader(IBatchReader rpcBatchReader) {
+ this.rpcBatchReader = rpcBatchReader;
+ }
+
+ public String getFullPath() {
+ return fullPath;
+ }
+
+ public void setFullPath(String fullPath) {
+ this.fullPath = fullPath;
+ }
+
+ public TSDataType getDataType() {
+ return dataType;
+ }
+
+ public void setDataType(TSDataType dataType) {
+ this.dataType = dataType;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
index a11fae4..79b2acd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
@@ -38,10 +38,8 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
-import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.Metadata;
import org.apache.iotdb.db.qp.QueryProcessor;
-import org.apache.iotdb.db.qp.executor.OverflowQPExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -72,11 +70,6 @@ public class TSServiceClusterImpl extends TSServiceImpl {
private NonQueryExecutor nonQueryExecutor = new NonQueryExecutor();
private QueryMetadataExecutor queryMetadataExecutor = new QueryMetadataExecutor();
- /**
- * Key is query statement, Value is corresponding job id which is assigned in <class>QueryResourceManager</class>
- */
- private ThreadLocal<HashMap<String, Long>> queryJobIdMap = new ThreadLocal<>();
-
public TSServiceClusterImpl() throws IOException {
super();
}
@@ -278,18 +271,6 @@ public class TSServiceClusterImpl extends TSServiceImpl {
}
@Override
- public void recordANewQuery(String statement, PhysicalPlan physicalPlan) {
- long jobId = QueryResourceManager.getInstance().assignJobId();
- queryStatus.get().put(statement, physicalPlan);
- queryJobIdMap.get().put(statement, jobId);
- queryManager.addSingleQuery(jobId, (QueryPlan) physicalPlan);
- // refresh current queryRet for statement
- if (queryRet.get().containsKey(statement)) {
- queryRet.get().remove(statement);
- }
- }
-
- @Override
public void releaseQueryResource(TSCloseOperationReq req) throws FileNodeManagerException {
Map<Long, QueryContext> contextMap = contextMapLocal.get();
if (contextMap == null) {
@@ -310,35 +291,18 @@ public class TSServiceClusterImpl extends TSServiceImpl {
}
@Override
- public void clearAllStatusForCurrentRequest() {
- if (this.queryRet.get() != null) {
- this.queryRet.get().clear();
- }
-
- if (this.queryJobIdMap.get() != null) {
- this.queryJobIdMap.get().clear();
- }
-
- if (this.queryStatus.get() != null) {
- this.queryStatus.get().clear();
- }
- }
-
-
- @Override
public QueryDataSet createNewDataSet(String statement, int fetchSize, TSFetchResultsReq req)
throws PathErrorException, QueryFilterOptimizationException, FileNodeManagerException,
ProcessorException, IOException {
PhysicalPlan physicalPlan = queryStatus.get().get(statement);
processor.getExecutor().setFetchSize(fetchSize);
- long jobId = queryJobIdMap.get().get(statement);
+ long jobId = QueryResourceManager.getInstance().assignJobId();
QueryContext context = new QueryContext(jobId);
initContextMap();
contextMapLocal.get().put(req.queryId, context);
- queryManager.getSingleQuery(jobId).dividePhysicalPlan();
-
+ queryManager.addSingleQuery(jobId, (QueryPlan) physicalPlan);
QueryDataSet queryDataSet = processor.getExecutor().processQuery((QueryPlan) physicalPlan,
context);
queryRet.get().put(statement, queryDataSet);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/QPExecutorUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/QPExecutorUtils.java
new file mode 100644
index 0000000..63a5be6
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/QPExecutorUtils.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.utils;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.entity.Server;
+import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
+import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
+import org.apache.iotdb.cluster.utils.hash.Router;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.metadata.MManager;
+
+public class QPExecutorUtils {
+
+ private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig();
+
+ private static final Router router = Router.getInstance();
+
+ private static final PhysicalNode localNode = new PhysicalNode(CLUSTER_CONFIG.getIp(),
+ CLUSTER_CONFIG.getPort());
+
+ private static final MManager mManager = MManager.getInstance();
+
+ private static final Server server = Server.getInstance();
+
+
+ /**
+ * Get Storage Group Name by device name
+ */
+ public static String getStroageGroupByDevice(String device) throws PathErrorException {
+ String storageGroup;
+ try {
+ storageGroup = MManager.getInstance().getFileNameByPath(device);
+ } catch (PathErrorException e) {
+ throw new PathErrorException(String.format("File level of %s doesn't exist.", device));
+ }
+ return storageGroup;
+ }
+
+ /**
+ * Get all Storage Group Names by path
+ */
+ public static List<String> getAllStroageGroupsByPath(String path) throws PathErrorException {
+ List<String> storageGroupList;
+ try {
+ storageGroupList = mManager.getAllFileNamesByPath(path);
+ } catch (PathErrorException e) {
+ throw new PathErrorException(String.format("File level of %s doesn't exist.", path));
+ }
+ return storageGroupList;
+ }
+
+ /**
+ * Classify the input storage group list by which data group it belongs to.
+ *
+ * @return key is groupId, value is all SGs belong to this data group
+ */
+ public static Map<String, Set<String>> classifySGByGroupId(List<String> sgList) {
+ Map<String, Set<String>> map = new HashMap<>();
+ for (int i = 0; i < sgList.size(); i++) {
+ String sg = sgList.get(i);
+ String groupId = router.getGroupIdBySG(sg);
+ if (map.containsKey(groupId)) {
+ map.get(groupId).add(sg);
+ } else {
+ Set<String> set = new HashSet<>();
+ set.add(sg);
+ map.put(groupId, set);
+ }
+ }
+ return map;
+ }
+
+ /**
+ * Check if the non query command can execute in local. 1. If this node belongs to the storage
+ * group 2. If this node is leader.
+ */
+ public static boolean canHandleNonQueryByGroupId(String groupId) {
+ boolean canHandle = false;
+ if(groupId.equals(ClusterConfig.METADATA_GROUP_ID)){
+ canHandle = ((MetadataRaftHolder) (server.getMetadataHolder())).getFsm().isLeader();
+ }else {
+ if (router.containPhysicalNodeByGroupId(groupId, localNode) && RaftUtils
+ .getPhysicalNodeFrom(RaftUtils.getLeaderPeerID(groupId)).equals(localNode)) {
+ canHandle = true;
+ }
+ }
+ return canHandle;
+ }
+
+ /**
+ * Check if the query command can execute in local. Check if this node belongs to the group id
+ */
+ public static boolean canHandleQueryByGroupId(String groupId) {
+ return router.containPhysicalNodeByGroupId(groupId, localNode);
+ }
+
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index c1a3270..5c3f3cb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -33,6 +33,8 @@ import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.exception.RaftConnectionException;
import org.apache.iotdb.cluster.qp.task.QPTask;
import org.apache.iotdb.cluster.qp.task.SingleQPTask;
import org.apache.iotdb.cluster.config.ClusterConfig;
@@ -40,7 +42,9 @@ import org.apache.iotdb.cluster.entity.Server;
import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
import org.apache.iotdb.cluster.entity.raft.RaftService;
+import org.apache.iotdb.cluster.rpc.raft.NodeAsClient;
import org.apache.iotdb.cluster.rpc.raft.closure.ResponseClosure;
+import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager;
import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
import org.apache.iotdb.cluster.rpc.raft.response.MetaGroupNonQueryResponse;
@@ -51,10 +55,17 @@ import org.slf4j.LoggerFactory;
public class RaftUtils {
+ private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig();
+
private static final Logger LOGGER = LoggerFactory.getLogger(RaftUtils.class);
private static final Server server = Server.getInstance();
private static final Router router = Router.getInstance();
private static final AtomicInteger requestId = new AtomicInteger(0);
+ /**
+ * Raft as client manager.
+ */
+ private static final RaftNodeAsClientManager CLIENT_MANAGER = RaftNodeAsClientManager
+ .getInstance();
/**
* The cache will be update in two case: 1. When @onLeaderStart() method of state machine is
@@ -302,4 +313,18 @@ public class RaftUtils {
status.setCode(-1);
return status;
}
+
+ /**
+ * try to get raft rpc client
+ */
+ public static NodeAsClient getRaftNodeAsClient() throws RaftConnectionException {
+ NodeAsClient client = CLIENT_MANAGER.getRaftNodeAsClient();
+ if (client == null) {
+ throw new RaftConnectionException(String
+ .format("Raft inner rpc clients have reached the max numbers %s",
+ CLUSTER_CONFIG.getMaxNumOfInnerRpcClient() + CLUSTER_CONFIG
+ .getMaxQueueNumOfInnerRpcClient()));
+ }
+ return client;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
index 7c7b2be..12d3b01 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
@@ -236,4 +236,11 @@ public class Router {
public Set<String> getAllGroupId() {
return groupIdMapNodeCache.keySet();
}
+
+ /**
+ * Get raft group id by storage group name
+ */
+ public String getGroupIdBySG(String storageGroup) {
+ return getGroupID(routeGroup(storageGroup));
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java
index 1d0b91b..01c1aed 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/IEngineQueryRouter.java
@@ -40,7 +40,7 @@ public interface IEngineQueryRouter {
* Execute physical plan.
*/
QueryDataSet query(QueryExpression queryExpression, QueryContext context)
- throws FileNodeManagerException;
+ throws FileNodeManagerException, PathErrorException;
/**
* Execute aggregation query.