You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/16 12:27:07 UTC
[incubator-iotdb] 16/19: add query series data processor
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster_read
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 446cec98fd748105ba4c73728e655d62f2b307b6
Author: lta <li...@163.com>
AuthorDate: Tue Apr 16 19:02:17 2019 +0800
add query series data processor
---
.../iotdb/cluster/config/ClusterConstant.java | 2 +
.../org/apache/iotdb/cluster/entity/Server.java | 29 ++++-
.../cluster/qp/executor/NonQueryExecutor.java | 5 +-
.../executor/ClusterExecutorWithTimeGenerator.java | 6 +-
.../ClusterExecutorWithoutTimeGenerator.java | 6 +-
.../executor/ClusterQueryRouter.java | 6 +-
.../manager/ClusterRpcQueryManager.java | 8 +-
...ager.java => ClusterRpcSingleQueryManager.java} | 4 +-
...ger.java => IClusterRpcSingleQueryManager.java} | 4 +-
.../manager/ClusterLocalQueryManager.java | 74 +++++++++++
.../manager/ClusterLocalSingleQueryManager.java | 142 +++++++++++++++++++++
.../querydata/QuerySeriesDataSyncProcessor.java | 62 +++++++++
.../request/querydata/QuerySeriesDataRequest.java | 36 +++++-
.../cluster/rpc/raft/request/querydata/Stage.java | 2 +-
.../querydata/QuerySeriesDataResponse.java | 25 ++--
.../org/apache/iotdb/cluster/utils/RaftUtils.java | 41 +++++-
.../cluster/utils/query/ClusterRpcReaderUtils.java | 14 +-
.../dataset/EngineDataSetWithoutTimeGenerator.java | 4 +
18 files changed, 422 insertions(+), 48 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
index 5aca9b0..b78ce56 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
@@ -37,4 +37,6 @@ public class ClusterConstant {
*/
public static final int CLOSE_QP_SUB_TASK_BLOCK_TIMEOUT = 1000;
+ public static final int BATCH_READ_SIZE = 1000;
+
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index 1f0e4e3..acd57f8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager;
import org.apache.iotdb.cluster.rpc.raft.processor.nonquery.DataGroupNonQueryAsyncProcessor;
import org.apache.iotdb.cluster.rpc.raft.processor.nonquery.MetaGroupNonQueryAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.querydata.QuerySeriesDataSyncProcessor;
import org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryMetadataAsyncProcessor;
import org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryMetadataInStringAsyncProcessor;
import org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryPathsAsyncProcessor;
@@ -95,13 +96,9 @@ public class Server {
RpcServer rpcServer = new RpcServer(serverId.getPort());
RaftRpcServerFactory.addRaftRequestProcessors(rpcServer);
- rpcServer.registerUserProcessor(new DataGroupNonQueryAsyncProcessor());
- rpcServer.registerUserProcessor(new MetaGroupNonQueryAsyncProcessor());
- rpcServer.registerUserProcessor(new QueryTimeSeriesAsyncProcessor());
- rpcServer.registerUserProcessor(new QueryMetadataInStringAsyncProcessor());
- rpcServer.registerUserProcessor(new QueryMetadataAsyncProcessor());
- rpcServer.registerUserProcessor(new QuerySeriesTypeAsyncProcessor());
- rpcServer.registerUserProcessor(new QueryPathsAsyncProcessor());
+ registerNonQueryProcessor(rpcServer);
+ registerQueryMetadataProcessor(rpcServer);
+ registerQueryDataProcessor(rpcServer);
metadataHolder = new MetadataRaftHolder(peerIds, serverId, rpcServer, true);
metadataHolder.init();
@@ -127,6 +124,24 @@ public class Server {
}
+ private void registerNonQueryProcessor(RpcServer rpcServer) {
+ rpcServer.registerUserProcessor(new DataGroupNonQueryAsyncProcessor());
+ rpcServer.registerUserProcessor(new MetaGroupNonQueryAsyncProcessor());
+ }
+
+ private void registerQueryMetadataProcessor(RpcServer rpcServer) {
+ rpcServer.registerUserProcessor(new QueryTimeSeriesAsyncProcessor());
+ rpcServer.registerUserProcessor(new QueryMetadataInStringAsyncProcessor());
+ rpcServer.registerUserProcessor(new QueryMetadataAsyncProcessor());
+ rpcServer.registerUserProcessor(new QuerySeriesTypeAsyncProcessor());
+ rpcServer.registerUserProcessor(new QueryPathsAsyncProcessor());
+ rpcServer.registerUserProcessor(new QuerySeriesTypeAsyncProcessor());
+ }
+
+ private void registerQueryDataProcessor(RpcServer rpcServer) {
+ rpcServer.registerUserProcessor(new QuerySeriesDataSyncProcessor());
+ }
+
public void stop() throws ProcessorException {
QPTaskManager.getInstance().close(true, ClusterConstant.CLOSE_QP_SUB_TASK_BLOCK_TIMEOUT);
iotdb.deactivate();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index b6247c8..1420370 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -100,10 +100,13 @@ public class NonQueryExecutor extends AbstractQPExecutor {
* @param batchResult batch result
*/
public void processBatch(PhysicalPlan[] physicalPlans, BatchResult batchResult)
- throws InterruptedException {
+ throws InterruptedException, ProcessorException {
Status nullReadTaskStatus = Status.OK();
RaftUtils.handleNullReadToMetaGroup(nullReadTaskStatus);
+ if(!nullReadTaskStatus.isOk()){
+ throw new ProcessorException("Null read while processing batch failed");
+ }
nullReaderEnable = false;
/** 1. Classify physical plans by group id **/
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java
index 68a14c8..6c0354a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithTimeGenerator.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.cluster.query.coordinatornode.executor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.iotdb.cluster.query.coordinatornode.manager.IClusterSingleQueryManager;
+import org.apache.iotdb.cluster.query.coordinatornode.manager.IClusterRpcSingleQueryManager;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.metadata.MManager;
@@ -39,11 +39,11 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
public class ClusterExecutorWithTimeGenerator {
private QueryExpression queryExpression;
- private IClusterSingleQueryManager queryManager;
+ private IClusterRpcSingleQueryManager queryManager;
private int readDataConsistencyLevel;
ClusterExecutorWithTimeGenerator(QueryExpression queryExpression,
- IClusterSingleQueryManager queryManager, int readDataConsistencyLevel) {
+ IClusterRpcSingleQueryManager queryManager, int readDataConsistencyLevel) {
this.queryExpression = queryExpression;
this.queryManager = queryManager;
this.readDataConsistencyLevel = readDataConsistencyLevel;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithoutTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithoutTimeGenerator.java
index 65b7d8f..e52e604 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithoutTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterExecutorWithoutTimeGenerator.java
@@ -22,7 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterSingleQueryManager;
+import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterRpcSingleQueryManager;
import org.apache.iotdb.cluster.query.coordinatornode.reader.ClusterSeriesReader;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.FileNodeManagerException;
@@ -45,11 +45,11 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
public class ClusterExecutorWithoutTimeGenerator {
private QueryExpression queryExpression;
- private ClusterSingleQueryManager queryManager;
+ private ClusterRpcSingleQueryManager queryManager;
private int readDataConsistencyLevel;
public ClusterExecutorWithoutTimeGenerator(QueryExpression queryExpression,
- ClusterSingleQueryManager queryManager, int readDataConsistencyLevel) {
+ ClusterRpcSingleQueryManager queryManager, int readDataConsistencyLevel) {
this.queryExpression = queryExpression;
this.queryManager = queryManager;
this.readDataConsistencyLevel = readDataConsistencyLevel;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java
index da5a395..6f07e58 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java
@@ -23,8 +23,8 @@ import java.util.List;
import java.util.Map;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterRpcQueryManager;
-import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterSingleQueryManager;
-import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterSingleQueryManager.QueryType;
+import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterRpcSingleQueryManager;
+import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterRpcSingleQueryManager.QueryType;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
@@ -49,7 +49,7 @@ public class ClusterQueryRouter implements IEngineQueryRouter {
public QueryDataSet query(QueryExpression queryExpression, QueryContext context)
throws FileNodeManagerException, PathErrorException {
- ClusterSingleQueryManager queryManager = ClusterRpcQueryManager.getInstance()
+ ClusterRpcSingleQueryManager queryManager = ClusterRpcQueryManager.getInstance()
.getSingleQuery(context.getJobId());
try {
if (queryExpression.hasQueryFilter()) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterRpcQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterRpcQueryManager.java
index f33fe05..bdebd1a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterRpcQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterRpcQueryManager.java
@@ -27,21 +27,21 @@ import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
public class ClusterRpcQueryManager{
/**
- * Key is group id, value is manager of a client query.
+ * Key is job id, value is manager of a client query.
*/
- ConcurrentHashMap<Long, ClusterSingleQueryManager> singleQueryManagerMap = new ConcurrentHashMap<>();
+ ConcurrentHashMap<Long, ClusterRpcSingleQueryManager> singleQueryManagerMap = new ConcurrentHashMap<>();
/**
* Add a query
*/
public void addSingleQuery(long jobId, QueryPlan physicalPlan){
- singleQueryManagerMap.put(jobId, new ClusterSingleQueryManager(jobId, physicalPlan));
+ singleQueryManagerMap.put(jobId, new ClusterRpcSingleQueryManager(jobId, physicalPlan));
}
/**
* Get query manager by group id
*/
- public ClusterSingleQueryManager getSingleQuery(long jobId) {
+ public ClusterRpcSingleQueryManager getSingleQuery(long jobId) {
return singleQueryManagerMap.get(jobId);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterRpcSingleQueryManager.java
similarity index 98%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterSingleQueryManager.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterRpcSingleQueryManager.java
index d350a7c..ae982a1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/ClusterRpcSingleQueryManager.java
@@ -37,7 +37,7 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.tsfile.read.common.Path;
-public class ClusterSingleQueryManager implements IClusterSingleQueryManager {
+public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManager {
/**
* Query job id assigned by QueryResourceManager of coordinator node.
@@ -68,7 +68,7 @@ public class ClusterSingleQueryManager implements IClusterSingleQueryManager {
private Map<String, ClusterSeriesReader> filterPathReaders = new HashMap<>();
- public ClusterSingleQueryManager(long jobId,
+ public ClusterRpcSingleQueryManager(long jobId,
QueryPlan queryPlan) {
this.jobId = jobId;
this.queryPlan = queryPlan;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterRpcSingleQueryManager.java
similarity index 96%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterSingleQueryManager.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterRpcSingleQueryManager.java
index bc70e65..01cdb8a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/manager/IClusterRpcSingleQueryManager.java
@@ -21,14 +21,14 @@ package org.apache.iotdb.cluster.query.coordinatornode.manager;
import com.alipay.sofa.jraft.entity.PeerId;
import java.io.IOException;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
-import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterSingleQueryManager.QueryType;
+import org.apache.iotdb.cluster.query.coordinatornode.manager.ClusterRpcSingleQueryManager.QueryType;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
/**
* Manage a single query.
*/
-public interface IClusterSingleQueryManager {
+public interface IClusterRpcSingleQueryManager {
/**
* Divide physical plan into several sub physical plans according to timeseries full path.
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/querynode/manager/ClusterLocalQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/querynode/manager/ClusterLocalQueryManager.java
new file mode 100644
index 0000000..f23e70e
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/querynode/manager/ClusterLocalQueryManager.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.querynode.manager;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataResponse;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+
+public class ClusterLocalQueryManager {
+
+ /**
+ * Key is job id, value is manager of a client query.
+ */
+ ConcurrentHashMap<Long, ClusterLocalSingleQueryManager> singleQueryManagerMap = new ConcurrentHashMap<>();
+
+ private ClusterLocalQueryManager() {
+ }
+
+ public void createQueryDataSet(QuerySeriesDataRequest request, QuerySeriesDataResponse response)
+ throws IOException, FileNodeManagerException, PathErrorException, ProcessorException, QueryFilterOptimizationException {
+ long jobId = QueryResourceManager.getInstance().assignJobId();
+ response.setJobId(jobId);
+ ClusterLocalSingleQueryManager localQueryManager = new ClusterLocalSingleQueryManager(jobId);
+ localQueryManager.init(request, response);
+ singleQueryManagerMap.put(jobId, localQueryManager);
+ }
+
+ public void readBatchData(QuerySeriesDataRequest request, QuerySeriesDataResponse response)
+ throws IOException {
+ long jobId = request.getJobId();
+ singleQueryManagerMap.get(jobId).readBatchData(request, response);
+ }
+
+ public void close(long jobId) throws FileNodeManagerException {
+ if(singleQueryManagerMap.containsKey(jobId)){
+ singleQueryManagerMap.remove(jobId).close();
+ }
+ }
+
+ public static final ClusterLocalQueryManager getInstance() {
+ return ClusterLocalQueryManager.ClusterLocalQueryManagerHolder.INSTANCE;
+ }
+
+ private static class ClusterLocalQueryManagerHolder {
+
+ private static final ClusterLocalQueryManager INSTANCE = new ClusterLocalQueryManager();
+
+ private ClusterLocalQueryManagerHolder() {
+
+ }
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/querynode/manager/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/querynode/manager/ClusterLocalSingleQueryManager.java
new file mode 100644
index 0000000..cf9c9f5
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/querynode/manager/ClusterLocalSingleQueryManager.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.querynode.manager;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataResponse;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.qp.executor.OverflowQPExecutor;
+import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
+import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.ExpressionType;
+
+public class ClusterLocalSingleQueryManager {
+
+ private long jobId;
+
+ /**
+ * Key is series full path, value is reader of series
+ */
+ private Map<String, IPointReader> seriesReaders = new HashMap<>();
+
+ /**
+ * Key is series full path, value is data type of series
+ */
+ private Map<String, TSDataType> dataTypeMap = new HashMap<>();
+
+ private QueryProcessExecutor queryProcessExecutor = new OverflowQPExecutor();
+
+ public ClusterLocalSingleQueryManager(long jobId) {
+ this.jobId = jobId;
+ }
+
+ /**
+ * Init create series reader.
+ */
+ public void init(QuerySeriesDataRequest request, QuerySeriesDataResponse response)
+ throws IOException, PathErrorException, FileNodeManagerException, ProcessorException, QueryFilterOptimizationException {
+ List<byte[]> planBytes = request.getPhysicalPlanBytes();
+ for (byte[] planByte : planBytes) {
+ QueryPlan plan = (QueryPlan) PhysicalPlanLogTransfer.logToOperator(planByte);
+ if (plan instanceof GroupByPlan) {
+ throw new UnsupportedOperationException();
+ } else if (plan instanceof AggregationPlan) {
+ throw new UnsupportedOperationException();
+ } else {
+ QueryContext context = new QueryContext(jobId);
+ if (plan.getExpression() == null
+ || plan.getExpression().getType() == ExpressionType.GLOBAL_TIME) {
+ handleDataSetWithoutTimeGenerator(plan, context, request, response);
+ } else {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+ }
+ }
+
+ private void handleDataSetWithoutTimeGenerator(QueryPlan plan, QueryContext context,
+ QuerySeriesDataRequest request, QuerySeriesDataResponse response)
+ throws PathErrorException, QueryFilterOptimizationException, FileNodeManagerException, ProcessorException, IOException {
+ EngineDataSetWithoutTimeGenerator queryDataSet = (EngineDataSetWithoutTimeGenerator) queryProcessExecutor
+ .processQuery(plan, context);
+ List<Path> paths = plan.getPaths();
+ List<IPointReader> readers = queryDataSet.getReaders();
+ List<TSDataType> dataTypes = queryDataSet.getDataTypes();
+ for (int i = 0; i < paths.size(); i++) {
+ String fullPath = paths.get(i).getFullPath();
+ IPointReader reader = readers.get(i);
+ seriesReaders.put(fullPath, reader);
+ dataTypeMap.put(fullPath, dataTypes.get(i));
+ }
+ response.setSeriesType(dataTypes);
+ readBatchData(request, response);
+ }
+
+ /**
+ * Read batch data
+ */
+ public void readBatchData(QuerySeriesDataRequest request, QuerySeriesDataResponse response)
+ throws IOException {
+ List<String> paths = request.getPaths();
+ List<BatchData> batchDataList = new ArrayList<>();
+ for (String fullPath : paths) {
+ BatchData batchData = new BatchData(dataTypeMap.get(fullPath));
+ IPointReader reader = seriesReaders.get(fullPath);
+ for (int i = 0; i < ClusterConstant.BATCH_READ_SIZE; i++) {
+ if (reader.hasNext()) {
+ TimeValuePair pair = reader.next();
+ batchData.putTime(pair.getTimestamp());
+ batchData.putAnObject(pair.getValue().getValue());
+ } else {
+ break;
+ }
+ }
+ batchDataList.add(batchData);
+ }
+ response.setSeriesBatchData(batchDataList);
+ }
+
+ /**
+ * Release query resource
+ */
+ public void close() throws FileNodeManagerException {
+ QueryResourceManager.getInstance().endQueryForGivenJob(jobId);
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querydata/QuerySeriesDataSyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querydata/QuerySeriesDataSyncProcessor.java
new file mode 100644
index 0000000..d2064ba
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querydata/QuerySeriesDataSyncProcessor.java
@@ -0,0 +1,62 @@
+package org.apache.iotdb.cluster.rpc.raft.processor.querydata;
+
+import com.alipay.remoting.BizContext;
+import com.alipay.sofa.jraft.Status;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.query.PathType;
+import org.apache.iotdb.cluster.query.querynode.manager.ClusterLocalQueryManager;
+import org.apache.iotdb.cluster.rpc.raft.processor.BasicSyncUserProcessor;
+import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.querydata.Stage;
+import org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataResponse;
+import org.apache.iotdb.cluster.utils.RaftUtils;
+import org.apache.iotdb.db.exception.ProcessorException;
+
+public class QuerySeriesDataSyncProcessor extends
+ BasicSyncUserProcessor<QuerySeriesDataRequest> {
+
+ @Override
+ public Object handleRequest(BizContext bizContext, QuerySeriesDataRequest request)
+ throws Exception {
+ Stage stage = request.getStage();
+ String groupId = request.getGroupID();
+ PathType pathType = request.getPathType();
+ QuerySeriesDataResponse response = new QuerySeriesDataResponse(groupId, pathType);
+ switch (stage) {
+ case INITIAL:
+ handleNullRead(request.getReadConsistencyLevel(), groupId);
+ ClusterLocalQueryManager.getInstance().createQueryDataSet(request, response);
+ break;
+ case READ_DATA:
+ ClusterLocalQueryManager.getInstance().readBatchData(request, response);
+ break;
+ case CLOSE:
+ ClusterLocalQueryManager.getInstance().close(request.getJobId());
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ return response;
+ }
+
+ /**
+ * It's necessary to do null read while creating query data set with a strong consistency level.
+ *
+ * @param readConsistencyLevel read concistency level
+ * @param groupId group id
+ */
+ private void handleNullRead(int readConsistencyLevel, String groupId) throws ProcessorException {
+ if (readConsistencyLevel == ClusterConstant.STRONG_CONSISTENCY_LEVEL) {
+ Status nullReadTaskStatus = Status.OK();
+ RaftUtils.handleNullReadToDataGroup(nullReadTaskStatus, groupId);
+ if (!nullReadTaskStatus.isOk()) {
+ throw new ProcessorException("Null read to data group failed");
+ }
+ }
+ }
+
+ @Override
+ public String interest() {
+ return QuerySeriesDataSyncProcessor.class.getName();
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataRequest.java
index fd27997..4974c46 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/QuerySeriesDataRequest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
public class QuerySeriesDataRequest extends BasicQueryRequest {
private Stage stage;
+ private long jobId;
private PathType pathType;
private List<String> paths;
@@ -39,11 +40,44 @@ public class QuerySeriesDataRequest extends BasicQueryRequest {
this.pathType = pathType;
}
- public QuerySeriesDataRequest(String groupID, List<String> paths, PathType pathType)
+ public QuerySeriesDataRequest(String groupID, long jobId, List<String> paths, PathType pathType)
throws IOException {
super(groupID);
this.paths = paths;
stage = Stage.READ_DATA;
+ this.jobId = jobId;
this.pathType = pathType;
}
+
+ public Stage getStage() {
+ return stage;
+ }
+
+ public void setStage(Stage stage) {
+ this.stage = stage;
+ }
+
+ public PathType getPathType() {
+ return pathType;
+ }
+
+ public void setPathType(PathType pathType) {
+ this.pathType = pathType;
+ }
+
+ public List<String> getPaths() {
+ return paths;
+ }
+
+ public void setPaths(List<String> paths) {
+ this.paths = paths;
+ }
+
+ public long getJobId() {
+ return jobId;
+ }
+
+ public void setJobId(long jobId) {
+ this.jobId = jobId;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/Stage.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/Stage.java
index 53d2e2a..0687927 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/Stage.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/Stage.java
@@ -19,5 +19,5 @@
package org.apache.iotdb.cluster.rpc.raft.request.querydata;
public enum Stage {
- INITIAL, READ_DATA
+ INITIAL, READ_DATA, CLOSE
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querydata/QuerySeriesDataResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querydata/QuerySeriesDataResponse.java
index 9f62617..b77a3f3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querydata/QuerySeriesDataResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querydata/QuerySeriesDataResponse.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.cluster.rpc.raft.response.querydata;
-import java.util.Map;
+import java.util.List;
import org.apache.iotdb.cluster.query.PathType;
import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -28,27 +28,30 @@ public class QuerySeriesDataResponse extends BasicResponse {
private long jobId;
private PathType pathType;
- private Map<String, TSDataType> seriesType;
- private Map<String, BatchData> seriesBatchData;
+ private List<TSDataType> seriesType;
+ private List<BatchData> seriesBatchData;
public QuerySeriesDataResponse(String groupId, PathType pathType) {
super(groupId, false, null, null);
this.pathType = pathType;
}
- public QuerySeriesDataResponse setJobId(long jobId) {
+ public void setJobId(long jobId) {
this.jobId = jobId;
- return this;
}
- public QuerySeriesDataResponse setSeriesType(Map<String, TSDataType> seriesType) {
+ public void setPathType(PathType pathType) {
+ this.pathType = pathType;
+ }
+
+ public void setSeriesType(
+ List<TSDataType> seriesType) {
this.seriesType = seriesType;
- return this;
}
- public QuerySeriesDataResponse seySeriesBatchData(Map<String, BatchData> seriesBatchData) {
+ public void setSeriesBatchData(
+ List<BatchData> seriesBatchData) {
this.seriesBatchData = seriesBatchData;
- return this;
}
public long getJobId() {
@@ -59,11 +62,11 @@ public class QuerySeriesDataResponse extends BasicResponse {
return pathType;
}
- public Map<String, TSDataType> getSeriesType() {
+ public List<TSDataType> getSeriesType() {
return seriesType;
}
- public Map<String, BatchData> getSeriesBatchData() {
+ public List<BatchData> getSeriesBatchData() {
return seriesBatchData;
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index d2d5429..f007027 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.cluster.rpc.raft.closure.ResponseClosure;
import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager;
import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.nonquery.DataGroupNonQueryResponse;
import org.apache.iotdb.cluster.rpc.raft.response.nonquery.MetaGroupNonQueryResponse;
import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
import org.apache.iotdb.cluster.utils.hash.Router;
@@ -273,18 +274,16 @@ public class RaftUtils {
/**
* Handle null-read process in metadata group if the request is to set path.
- *
- * @param status status to return result if this node is leader of the data group
*/
public static void handleNullReadToMetaGroup(Status status) {
SingleQPTask nullReadTask = new SingleQPTask(false, null);
handleNullReadToMetaGroup(status, server, nullReadTask);
}
- public static void handleNullReadToMetaGroup(Status status, Server server,
+ private static void handleNullReadToMetaGroup(Status status, Server server,
SingleQPTask nullReadTask) {
try {
- LOGGER.debug("Handle null-read in meta group for adding path request.");
+ LOGGER.debug("Handle null-read in meta group for metadata request.");
final byte[] reqContext = RaftUtils.createRaftRequestContext();
MetadataRaftHolder metadataRaftHolder = (MetadataRaftHolder) server.getMetadataHolder();
((RaftService) metadataRaftHolder.getService()).getNode()
@@ -307,6 +306,40 @@ public class RaftUtils {
}
}
+ /**
+ * Handle null-read process in data group while reading process
+ */
+ public static void handleNullReadToDataGroup(Status status, String groupId) {
+ SingleQPTask nullReadTask = new SingleQPTask(false, null);
+ handleNullReadToDataGroup(status, server, nullReadTask, groupId);
+ }
+
+ private static void handleNullReadToDataGroup(Status status, Server server,
+ SingleQPTask nullReadTask, String groupId) {
+ try {
+ LOGGER.debug("Handle null-read in data group for reading.");
+ final byte[] reqContext = RaftUtils.createRaftRequestContext();
+ DataPartitionRaftHolder dataPartitionRaftHolder = (DataPartitionRaftHolder) server.getDataPartitionHolder(groupId);
+ ((RaftService) dataPartitionRaftHolder.getService()).getNode()
+ .readIndex(reqContext, new ReadIndexClosure() {
+ @Override
+ public void run(Status status, long index, byte[] reqCtx) {
+ BasicResponse response = DataGroupNonQueryResponse
+ .createEmptyResponse(groupId);
+ if (!status.isOk()) {
+ status.setCode(-1);
+ status.setErrorMsg(status.getErrorMsg());
+ }
+ nullReadTask.run(response);
+ }
+ });
+ nullReadTask.await();
+ } catch (InterruptedException e) {
+ status.setCode(-1);
+ status.setErrorMsg(e.getMessage());
+ }
+ }
+
public static Status createErrorStatus(String errorMsg){
Status status = new Status();
status.setErrorMsg(errorMsg);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/ClusterRpcReaderUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/ClusterRpcReaderUtils.java
index 35ff3fb..510112c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/ClusterRpcReaderUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/ClusterRpcReaderUtils.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.query.reader.IBatchReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
public class ClusterRpcReaderUtils {
@@ -65,14 +66,15 @@ public class ClusterRpcReaderUtils {
/** create cluster series reader **/
Map<String, ClusterSeriesReader> allSeriesReader = new HashMap<>();
- Map<String, TSDataType> seriesType = response.getSeriesType();
- Map<String, BatchData> seriesBatchData = response.getSeriesBatchData();
+ List<Path> paths = queryPlan.getPaths();
+ List<TSDataType> seriesType = response.getSeriesType();
+ List<BatchData> seriesBatchData = response.getSeriesBatchData();
long jobId = response.getJobId();
- for (Entry<String, TSDataType> entry : seriesType.entrySet()) {
- String seriesPath = entry.getKey();
- TSDataType dataType = entry.getValue();
+ for (int i =0 ; i < paths.size(); i++) {
+ String seriesPath = paths.get(i).getFullPath();
+ TSDataType dataType = seriesType.get(i);
IBatchReader batchDataReader = new ClusterRpcBatchDataReader(peerId, jobId,
- PathType.SELECT_PATH, seriesBatchData.get(seriesPath));
+ pathType, seriesBatchData.get(i));
ClusterSeriesReader seriesReader = new ClusterSeriesReader(batchDataReader, seriesPath,
dataType);
allSeriesReader.put(seriesPath, seriesReader);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutTimeGenerator.java
index 73fc71f..bc9bb08 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutTimeGenerator.java
@@ -154,4 +154,8 @@ public class EngineDataSetWithoutTimeGenerator extends QueryDataSet {
timeSet.remove(t);
return t;
}
+
+ public List<IPointReader> getReaders() {
+ return readers;
+ }
}