You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/18 01:37:56 UTC

[incubator-iotdb] branch cluster_read updated: add comments

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_read
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster_read by this push:
     new 4fe63ca  add comments
4fe63ca is described below

commit 4fe63cabbfed86bc468cec8c6893a16abdda8afd
Author: lta <li...@163.com>
AuthorDate: Thu Apr 18 09:37:33 2019 +0800

    add comments
---
 .../java/org/apache/iotdb/cluster/query/PathType.java    |  9 ++++++++-
 .../java/org/apache/iotdb/cluster/query/QueryType.java   | 16 +++++++++++++++-
 .../iotdb/cluster/query/executor/ClusterQueryRouter.java |  7 ++++---
 .../query/factory/ClusterSeriesReaderFactory.java        |  2 +-
 .../manager/coordinatornode/ClusterRpcQueryManager.java  | 10 +++++++---
 .../coordinatornode/ClusterRpcSingleQueryManager.java    | 10 +++++-----
 .../coordinatornode/IClusterRpcSingleQueryManager.java   | 11 ++++++-----
 .../manager/querynode/ClusterLocalQueryManager.java      | 14 ++++++++++++++
 .../querynode/ClusterLocalSingleQueryManager.java        |  6 ++++--
 .../iotdb/cluster/rpc/raft/request/querydata/Stage.java  | 16 +++++++++++++++-
 .../org/apache/iotdb/cluster/utils/QPExecutorUtils.java  |  3 +++
 .../iotdb/cluster/utils/query/ClusterRpcReaderUtils.java | 16 ++++++++++++++--
 .../cluster/utils/query/QueryPlanPartitionUtils.java     |  4 ++--
 13 files changed, 98 insertions(+), 26 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/PathType.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/PathType.java
index d25bb86..045b1ee 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/PathType.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/PathType.java
@@ -19,5 +19,12 @@
 package org.apache.iotdb.cluster.query;
 
 public enum PathType {
-  SELECT_PATH, FILTER_PATH
+  /**
+   * Select paths in a query
+   */
+  SELECT_PATH,
+  /**
+   * Filter paths in a query
+   */
+  FILTER_PATH
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/QueryType.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/QueryType.java
index e3a8138..5bf8c53 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/QueryType.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/QueryType.java
@@ -18,6 +18,20 @@
  */
 package org.apache.iotdb.cluster.query;
 
+/**
+ * Type of query
+ */
 public enum QueryType {
-  NO_FILTER, GLOBAL_TIME, FILTER
+  /**
+   * Query with no filter
+   */
+  NO_FILTER,
+  /**
+   * Query with global time
+   */
+  GLOBAL_TIME,
+  /**
+   * Query with value filter
+   */
+  FILTER
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
index 65ac626..fec3ad1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
  */
 public class ClusterQueryRouter implements IEngineQueryRouter {
 
+
   private ThreadLocal<Integer> readDataConsistencyLevel = new ThreadLocal<>();
 
   @Override
@@ -89,7 +90,7 @@ public class ClusterQueryRouter implements IEngineQueryRouter {
   public QueryDataSet aggregate(List<Path> selectedSeries, List<String> aggres,
       IExpression expression, QueryContext context)
       throws QueryFilterOptimizationException, FileNodeManagerException, IOException, PathErrorException, ProcessorException {
-    return null;
+    throw new UnsupportedOperationException();
   }
 
   @Override
@@ -97,13 +98,13 @@ public class ClusterQueryRouter implements IEngineQueryRouter {
       IExpression expression, long unit, long origin, List<Pair<Long, Long>> intervals,
       QueryContext context)
       throws ProcessorException, QueryFilterOptimizationException, FileNodeManagerException, PathErrorException, IOException {
-    return null;
+    throw new UnsupportedOperationException();
   }
 
   @Override
   public QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillType,
       QueryContext context) throws FileNodeManagerException, PathErrorException, IOException {
-    return null;
+    throw new UnsupportedOperationException();
   }
 
   public int getReadDataConsistencyLevel() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/factory/ClusterSeriesReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/factory/ClusterSeriesReaderFactory.java
index d4dbb3c..e107bf5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/factory/ClusterSeriesReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/factory/ClusterSeriesReaderFactory.java
@@ -37,7 +37,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
 public class ClusterSeriesReaderFactory {
 
   /**
-   * construct ByTimestampReader, include sequential data and unsequential data.
+   * Construct ByTimestampReader, include sequential data and unsequential data.
    *
    * @param paths selected series path
    * @param context query context
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcQueryManager.java
index cfce004..7597529 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcQueryManager.java
@@ -26,9 +26,10 @@ import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 
 /**
- * Manage all query in cluster
+ * Manage all query series reader resources which fetch data from remote query nodes in coordinator
+ * node
  */
-public class ClusterRpcQueryManager{
+public class ClusterRpcQueryManager {
 
   /**
    * Key is job id, value is task id.
@@ -41,12 +42,15 @@ public class ClusterRpcQueryManager{
   private static final ConcurrentHashMap<String, ClusterRpcSingleQueryManager> SINGLE_QUERY_MANAGER_MAP = new ConcurrentHashMap<>();
 
   /**
-   * Assign every query a work id
+   * Assign every query a work id, which uniquely identify a query in remote query node
    */
   private static final AtomicLong TASK_ID = new AtomicLong(0);
 
   private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig();
 
+  /**
+   * Local address
+   */
   private static final String LOCAL_ADDR = String.format("%s:%d", CLUSTER_CONFIG.getIp(), CLUSTER_CONFIG.getPort());
 
   /**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
index b6f4a74..d8ec4f6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
@@ -43,7 +43,7 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Path;
 
 /**
- * Manage all remote series reader resource in coordinator node.
+ * Manage all remote series reader resource in a query resource in coordinator node.
  */
 public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManager {
 
@@ -174,7 +174,7 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
   /**
    * Handle response of initial reader. In order to reduce the number of RPC communications,
    * fetching data from remote query node will fetch for all series in the same data group. If the
-   * cached data for specific series is exceed limit, ignore this fetching data process.
+   * cached data for specific series exceed limit, ignore this fetching data process of the series.
    */
   @Override
   public void fetchData(String groupId, PathType pathType) throws RaftConnectionException {
@@ -203,7 +203,7 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
   }
 
   /**
-   * Handle response of fetch data, add batch data to corresponding reader.
+   * Handle response of fetching data, and add batch data to corresponding reader.
    */
   private void handleFetchDataResponse(PathType pathType, List<String> fetchDataSeries,
       QuerySeriesDataResponse response) {
@@ -225,12 +225,12 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
   }
 
   @Override
-  public PhysicalPlan getSelectPathPhysicalPlan(String fullPath) {
+  public QueryPlan getSelectPathQueryPlan(String fullPath) {
     return selectPathPlans.get(fullPath);
   }
 
   @Override
-  public PhysicalPlan getFilterPathPhysicalPlan(String fullPath) {
+  public QueryPlan getFilterPathQueryPlan(String fullPath) {
     return filterPathPlans.get(fullPath);
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java
index ac5bc62..174f658 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcSingleQueryManager.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.cluster.query.PathType;
 import org.apache.iotdb.cluster.query.QueryType;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 
 /**
  * Manage a single query.
@@ -44,18 +45,18 @@ public interface IClusterRpcSingleQueryManager {
   void fetchData(String groupId, PathType pathType) throws RaftConnectionException;
 
   /**
-   * Get physical plan of select path
+   * Get query plan of select path
    *
    * @param fullPath Timeseries full path in select paths
    */
-  PhysicalPlan getSelectPathPhysicalPlan(String fullPath);
+  QueryPlan getSelectPathQueryPlan(String fullPath);
 
   /**
-   * Get physical plan of filter path
+   * Get query plan of filter path
    *
    * @param fullPath Timeseries full path in filter
    */
-  PhysicalPlan getFilterPathPhysicalPlan(String fullPath);
+  QueryPlan getFilterPathQueryPlan(String fullPath);
 
   /**
    * Set reader node of a data group
@@ -73,7 +74,7 @@ public interface IClusterRpcSingleQueryManager {
   PeerId getDataGroupReaderNode(String groupId);
 
   /**
-   * Release query resource
+   * Release query resource in remote query node
    */
   void releaseQueryResource() throws RaftConnectionException;
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
index 2de23d8..f22ef18 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
@@ -28,6 +28,9 @@ import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 
+/**
+ * Manage all local query resources which provide data for coordinator node in cluster query node.
+ */
 public class ClusterLocalQueryManager {
 
   /**
@@ -41,9 +44,15 @@ public class ClusterLocalQueryManager {
    */
   private static final ConcurrentHashMap<Long, ClusterLocalSingleQueryManager> SINGLE_QUERY_MANAGER_MAP = new ConcurrentHashMap<>();
 
+
   private ClusterLocalQueryManager() {
   }
 
+  /**
+   * Initially create query data set for coordinator node.
+   *
+   * @param request request for query data from coordinator node
+   */
   public void createQueryDataSet(QuerySeriesDataRequest request, QuerySeriesDataResponse response)
       throws IOException, FileNodeManagerException, PathErrorException, ProcessorException, QueryFilterOptimizationException {
     long jobId = QueryResourceManager.getInstance().assignJobId();
@@ -54,6 +63,11 @@ public class ClusterLocalQueryManager {
     SINGLE_QUERY_MANAGER_MAP.put(jobId, localQueryManager);
   }
 
+  /**
+   * Read batch data of all querying series in request and set response.
+   *
+   * @param request request of querying series
+   */
   public void readBatchData(QuerySeriesDataRequest request, QuerySeriesDataResponse response)
       throws IOException {
     long jobId = TASK_ID_MAP_JOB_ID.get(request.getTaskId());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
index d1b654c..2eaf068 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.cluster.query.manager.querynode;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -118,7 +117,10 @@ public class ClusterLocalSingleQueryManager {
   /**
    * Handle query with no filter or only global time filter
    *
-   * @param plan query plan
+   * @param plan plan query plan
+   * @param context query context
+   * @param response response for coordinator node
+   * @param pathType type of series
    */
   private void handleDataSetWithoutTimeGenerator(QueryPlan plan, QueryContext context,
       QuerySeriesDataResponse response, PathType pathType)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/Stage.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/Stage.java
index 0687927..b8c18fc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/Stage.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/Stage.java
@@ -18,6 +18,20 @@
  */
 package org.apache.iotdb.cluster.rpc.raft.request.querydata;
 
+/**
+ * Stage of querying series data from remote query node
+ */
 public enum Stage {
-  INITIAL, READ_DATA, CLOSE
+  /**
+   * Initially create corresponding series readers in remote query node
+   */
+  INITIAL,
+  /**
+   * Read batch data from series reader from remote query node.
+   */
+  READ_DATA,
+  /**
+   * Release series reader resource in remote query node
+   */
+  CLOSE
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/QPExecutorUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/QPExecutorUtils.java
index 63a5be6..8ee954c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/QPExecutorUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/QPExecutorUtils.java
@@ -32,6 +32,9 @@ import org.apache.iotdb.cluster.utils.hash.Router;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.metadata.MManager;
 
+/**
+ * Utils for QP executor
+ */
 public class QPExecutorUtils {
 
   private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/ClusterRpcReaderUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/ClusterRpcReaderUtils.java
index 7e5f54a..4b135d8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/ClusterRpcReaderUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/ClusterRpcReaderUtils.java
@@ -47,7 +47,7 @@ public class ClusterRpcReaderUtils {
       .getQpTaskRedoCount();
 
   /**
-   * Create cluster series reader and get the first batch data
+   * Create cluster series reader
    *
    * @param peerId query node to fetch data
    * @param readDataConsistencyLevel consistency level of read data
@@ -66,6 +66,14 @@ public class ClusterRpcReaderUtils {
     return handleQueryRequest(request, peerId, 0);
   }
 
+  /**
+   * Send query request to remote node and return response
+   *
+   * @param request query request
+   * @param peerId target remote query node
+   * @param taskRetryNum retry num of the request
+   * @return Response from remote query node
+   */
   private static BasicResponse handleQueryRequest(BasicRequest request, PeerId peerId,
       int taskRetryNum)
       throws RaftConnectionException {
@@ -91,7 +99,11 @@ public class ClusterRpcReaderUtils {
   }
 
   /**
-   * Release remote query resource
+   * Release remote query resources
+   *
+   * @param groupId data group id
+   * @param peerId target query node
+   * @param taskId unique task id
    */
   public static void releaseRemoteQueryResource(String groupId, PeerId peerId, String taskId)
       throws RaftConnectionException {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/QueryPlanPartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/QueryPlanPartitionUtils.java
index 3ac3108..8ab042d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/QueryPlanPartitionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/query/QueryPlanPartitionUtils.java
@@ -31,7 +31,7 @@ import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.tsfile.read.common.Path;
 
 /**
- * Utils for spliting query plan to several sub query plan by group id.
+ * Utils for splitting query plan to several sub query plans by group id.
  */
 public class QueryPlanPartitionUtils {
 
@@ -71,7 +71,7 @@ public class QueryPlanPartitionUtils {
   }
 
   /**
-   * Split query plan with not only global time filter.
+   * Split query plan with value filter.
    */
   public static void splitQueryPlanWithValueFilter(ClusterRpcSingleQueryManager singleQueryManager) {