You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/18 01:37:56 UTC
[incubator-iotdb] branch cluster_read updated: add comments
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster_read
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_read by this push:
new 4fe63ca add comments
4fe63ca is described below
commit 4fe63cabbfed86bc468cec8c6893a16abdda8afd
Author: lta <li...@163.com>
AuthorDate: Thu Apr 18 09:37:33 2019 +0800
add comments
---
.../java/org/apache/iotdb/cluster/query/PathType.java | 9 ++++++++-
.../java/org/apache/iotdb/cluster/query/QueryType.java | 16 +++++++++++++++-
.../iotdb/cluster/query/executor/ClusterQueryRouter.java | 7 ++++---
.../query/factory/ClusterSeriesReaderFactory.java | 2 +-
.../manager/coordinatornode/ClusterRpcQueryManager.java | 10 +++++++---
.../coordinatornode/ClusterRpcSingleQueryManager.java | 10 +++++-----
.../coordinatornode/IClusterRpcSingleQueryManager.java | 11 ++++++-----
.../manager/querynode/ClusterLocalQueryManager.java | 14 ++++++++++++++
.../querynode/ClusterLocalSingleQueryManager.java | 6 ++++--
.../iotdb/cluster/rpc/raft/request/querydata/Stage.java | 16 +++++++++++++++-
.../org/apache/iotdb/cluster/utils/QPExecutorUtils.java | 3 +++
.../iotdb/cluster/utils/query/ClusterRpcReaderUtils.java | 16 ++++++++++++++--
.../cluster/utils/query/QueryPlanPartitionUtils.java | 4 ++--
13 files changed, 98 insertions(+), 26 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/PathType.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/PathType.java
index d25bb86..045b1ee 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/PathType.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/PathType.java
@@ -19,5 +19,12 @@
package org.apache.iotdb.cluster.query;
public enum PathType {
- SELECT_PATH, FILTER_PATH
+ /**
+ * Select paths in a query
+ */
+ SELECT_PATH,
+ /**
+ * Filter paths in a query
+ */
+ FILTER_PATH
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/QueryType.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/QueryType.java
index e3a8138..5bf8c53 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/QueryType.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/QueryType.java
@@ -18,6 +18,20 @@
*/
package org.apache.iotdb.cluster.query;
+/**
+ * Type of query
+ */
public enum QueryType {
- NO_FILTER, GLOBAL_TIME, FILTER
+ /**
+ * Query with no filter
+ */
+ NO_FILTER,
+ /**
+ * Query with global time
+ */
+ GLOBAL_TIME,
+ /**
+ * Query with value filter
+ */
+ FILTER
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
index 65ac626..fec3ad1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
*/
public class ClusterQueryRouter implements IEngineQueryRouter {
+
private ThreadLocal<Integer> readDataConsistencyLevel = new ThreadLocal<>();
@Override
@@ -89,7 +90,7 @@ public class ClusterQueryRouter implements IEngineQueryRouter {
public QueryDataSet aggregate(List<Path> selectedSeries, List<String> aggres,
IExpression expression, QueryContext context)
throws QueryFilterOptimizationException, FileNodeManagerException, IOException, PathErrorException, ProcessorException {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
@@ -97,13 +98,13 @@ public class ClusterQueryRouter implements IEngineQueryRouter {
IExpression expression, long unit, long origin, List<Pair<Long, Long>> intervals,
QueryContext context)
throws ProcessorException, QueryFilterOptimizationException, FileNodeManagerException, PathErrorException, IOException {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
public QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillType,
QueryContext context) throws FileNodeManagerException, PathErrorException, IOException {
- return null;
+ throw new UnsupportedOperationException();
}
public int getReadDataConsistencyLevel() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/factory/ClusterSeriesReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/factory/ClusterSeriesReaderFactory.java
index d4dbb3c..e107bf5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/factory/ClusterSeriesReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/factory/ClusterSeriesReaderFactory.java
@@ -37,7 +37,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
public class ClusterSeriesReaderFactory {
/**
- * construct ByTimestampReader, include sequential data and unsequential data.
+ * Construct ByTimestampReader, include sequential data and unsequential data.
*
* @param paths selected series path
* @param context query context
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcQueryManager.java
index cfce004..7597529 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcQueryManager.java
@@ -26,9 +26,10 @@ import org.apache.iotdb.cluster.exception.RaftConnectionException;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
/**
- * Manage all query in cluster
+ * Manage all query series reader resources which fetch data from remote query nodes in coordinator
+ * node
*/
-public class ClusterRpcQueryManager{
+public class ClusterRpcQueryManager {
/**
* Key is job id, value is task id.
@@ -41,12 +42,15 @@ public class ClusterRpcQueryManager{
private static final ConcurrentHashMap<String, ClusterRpcSingleQueryManager> SINGLE_QUERY_MANAGER_MAP = new ConcurrentHashMap<>();
/**
- * Assign every query a work id
+ * Assign every query a work id, which uniquely identify a query in remote query node
*/
private static final AtomicLong TASK_ID = new AtomicLong(0);
private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig();
+ /**
+ * Local address
+ */
private static final String LOCAL_ADDR = String.format("%s:%d", CLUSTER_CONFIG.getIp(), CLUSTER_CONFIG.getPort());
/**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
index b6f4a74..d8ec4f6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
@@ -43,7 +43,7 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Path;
/**
- * Manage all remote series reader resource in coordinator node.
+ * Manage all remote series reader resource in a query resource in coordinator node.
*/
public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManager {
@@ -174,7 +174,7 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
/**
* Handle response of initial reader. In order to reduce the number of RPC communications,
* fetching data from remote query node will fetch for all series in the same data group. If the
- * cached data for specific series is exceed limit, ignore this fetching data process.
+ * cached data for specific series exceed limit, ignore this fetching data process of the series.
*/
@Override
public void fetchData(String groupId, PathType pathType) throws RaftConnectionException {
@@ -203,7 +203,7 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
}
/**
- * Handle response of fetch data, add batch data to corresponding reader.
+ * Handle response of fetching data, and add batch data to corresponding reader.
*/
private void handleFetchDataResponse(PathType pathType, List<String> fetchDataSeries,
QuerySeriesDataResponse response) {
@@ -225,12 +225,12 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
}
@Override
- public PhysicalPlan getSelectPathPhysicalPlan(String fullPath) {
+ public QueryPlan getSelectPathQueryPlan(String fullPath) {
return selectPathPlans.get(fullPath);
}
@Override
- public PhysicalPlan getFilterPathPhysicalPlan(String fullPath) {
+ public QueryPlan getFilterPathQueryPlan(String fullPath) {
return filterPathPlans.get(fullPath);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java
index ac5bc62..174f658 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.cluster.query.PathType;
import org.apache.iotdb.cluster.query.QueryType;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
/**
* Manage a single query.
@@ -44,18 +45,18 @@ public interface IClusterRpcSingleQueryManager {
void fetchData(String groupId, PathType pathType) throws RaftConnectionException;
/**
- * Get physical plan of select path
+ * Get query plan of select path
*
* @param fullPath Timeseries full path in select paths
*/
- PhysicalPlan getSelectPathPhysicalPlan(String fullPath);
+ QueryPlan getSelectPathQueryPlan(String fullPath);
/**
- * Get physical plan of filter path
+ * Get query plan of filter path
*
* @param fullPath Timeseries full path in filter
*/
- PhysicalPlan getFilterPathPhysicalPlan(String fullPath);
+ QueryPlan getFilterPathQueryPlan(String fullPath);
/**
* Set reader node of a data group
@@ -73,7 +74,7 @@ public interface IClusterRpcSingleQueryManager {
PeerId getDataGroupReaderNode(String groupId);
/**
- * Release query resource
+ * Release query resource in remote query node
*/
void releaseQueryResource() throws RaftConnectionException;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
index 2de23d8..f22ef18 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
@@ -28,6 +28,9 @@ import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+/**
+ * Manage all local query resources which provide data for coordinator node in cluster query node.
+ */
public class ClusterLocalQueryManager {
/**
@@ -41,9 +44,15 @@ public class ClusterLocalQueryManager {
*/
private static final ConcurrentHashMap<Long, ClusterLocalSingleQueryManager> SINGLE_QUERY_MANAGER_MAP = new ConcurrentHashMap<>();
+
private ClusterLocalQueryManager() {
}
+ /**
+ * Initially create query data set for coordinator node.
+ *
+ * @param request request for query data from coordinator node
+ */
public void createQueryDataSet(QuerySeriesDataRequest request, QuerySeriesDataResponse response)
throws IOException, FileNodeManagerException, PathErrorException, ProcessorException, QueryFilterOptimizationException {
long jobId = QueryResourceManager.getInstance().assignJobId();
@@ -54,6 +63,11 @@ public class ClusterLocalQueryManager {
SINGLE_QUERY_MANAGER_MAP.put(jobId, localQueryManager);
}
+ /**
+ * Read batch data of all querying series in request and set response.
+ *
+ * @param request request of querying series
+ */
public void readBatchData(QuerySeriesDataRequest request, QuerySeriesDataResponse response)
throws IOException {
long jobId = TASK_ID_MAP_JOB_ID.get(request.getTaskId());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
index d1b654c..2eaf068 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.cluster.query.manager.querynode;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -118,7 +117,10 @@ public class ClusterLocalSingleQueryManager {
/**
* Handle query with no filter or only global time filter
*
- * @param plan query plan
+ * @param plan plan query plan
+ * @param context query context
+ * @param response response for coordinator node
+ * @param pathType type of series
*/
private void handleDataSetWithoutTimeGenerator(QueryPlan plan, QueryContext context,
QuerySeriesDataResponse response, PathType pathType)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/Stage.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/Stage.java
index 0687927..b8c18fc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/Stage.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/Stage.java
@@ -18,6 +18,20 @@
*/
package org.apache.iotdb.cluster.rpc.raft.request.querydata;
+/**
+ * Stage of querying series data from remote query node
+ */
public enum Stage {
- INITIAL, READ_DATA, CLOSE
+ /**
+ * Initially create corresponding series readers in remote query node
+ */
+ INITIAL,
+ /**
+ * Read batch data from series reader from remote query node.
+ */
+ READ_DATA,
+ /**
+ * Release series reader resource in remote query node
+ */
+ CLOSE
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/QPExecutorUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/QPExecutorUtils.java
index 63a5be6..8ee954c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/QPExecutorUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/QPExecutorUtils.java
@@ -32,6 +32,9 @@ import org.apache.iotdb.cluster.utils.hash.Router;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.metadata.MManager;
+/**
+ * Utils for QP executor
+ */
public class QPExecutorUtils {
private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/ClusterRpcReaderUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/ClusterRpcReaderUtils.java
index 7e5f54a..4b135d8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/ClusterRpcReaderUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/ClusterRpcReaderUtils.java
@@ -47,7 +47,7 @@ public class ClusterRpcReaderUtils {
.getQpTaskRedoCount();
/**
- * Create cluster series reader and get the first batch data
+ * Create cluster series reader
*
* @param peerId query node to fetch data
* @param readDataConsistencyLevel consistency level of read data
@@ -66,6 +66,14 @@ public class ClusterRpcReaderUtils {
return handleQueryRequest(request, peerId, 0);
}
+ /**
+ * Send query request to remote node and return response
+ *
+ * @param request query request
+ * @param peerId target remote query node
+ * @param taskRetryNum retry num of the request
+ * @return Response from remote query node
+ */
private static BasicResponse handleQueryRequest(BasicRequest request, PeerId peerId,
int taskRetryNum)
throws RaftConnectionException {
@@ -91,7 +99,11 @@ public class ClusterRpcReaderUtils {
}
/**
- * Release remote query resource
+ * Release remote query resources
+ *
+ * @param groupId data group id
+ * @param peerId target query node
+ * @param taskId unique task id
*/
public static void releaseRemoteQueryResource(String groupId, PeerId peerId, String taskId)
throws RaftConnectionException {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/QueryPlanPartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/QueryPlanPartitionUtils.java
index 3ac3108..8ab042d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/QueryPlanPartitionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/QueryPlanPartitionUtils.java
@@ -31,7 +31,7 @@ import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.tsfile.read.common.Path;
/**
- * Utils for spliting query plan to several sub query plan by group id.
+ * Utils for splitting query plan to several sub query plans by group id.
*/
public class QueryPlanPartitionUtils {
@@ -71,7 +71,7 @@ public class QueryPlanPartitionUtils {
}
/**
- * Split query plan with not only global time filter.
+ * Split query plan with value filter.
*/
public static void splitQueryPlanWithValueFilter(ClusterRpcSingleQueryManager singleQueryManager) {