You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/05/22 11:33:20 UTC

[incubator-iotdb] branch cluster updated (61e8452 -> 411ea77)

This is an automated email from the ASF dual-hosted git repository.

lta pushed a change to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


    from 61e8452  remove useless code
     new 7ca00da  fix  some bug
     new 2ac9056  format codes
     new 411ea77  remove groupIdMapNodeCache

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/iotdb/cluster/entity/Server.java    | 34 ++++++++++------------
 .../cluster/entity/raft/DataStateMachine.java      |  2 +-
 .../cluster/qp/executor/QueryMetadataExecutor.java |  4 +--
 .../apache/iotdb/cluster/qp/task/BatchQPTask.java  | 10 +++----
 .../qp/task/{QueryTask.java => DataQueryTask.java} |  4 +--
 .../org/apache/iotdb/cluster/qp/task/QPTask.java   |  2 +-
 .../apache/iotdb/cluster/qp/task/SingleQPTask.java |  2 +-
 .../query/common/ClusterNullableBatchData.java     |  2 +-
 ...lusterGroupBySelectSeriesBatchReaderEntity.java |  7 +++--
 .../querynode/ClusterSelectSeriesBatchReader.java  |  3 --
 .../querynode/IClusterSeriesBatchReaderEntity.java |  4 +--
 .../cluster/query/utils/ClusterRpcReaderUtils.java | 14 +++------
 .../query/utils/ClusterTimeValuePairUtils.java     |  2 +-
 .../iotdb/cluster/rpc/raft/NodeAsClient.java       |  4 +--
 .../rpc/raft/impl/RaftNodeAsClientManager.java     | 16 +++++-----
 .../org/apache/iotdb/cluster/utils/RaftUtils.java  |  6 ++--
 .../iotdb/cluster/utils/hash/PhysicalNode.java     |  9 ++++++
 .../apache/iotdb/cluster/utils/hash/Router.java    | 26 +++--------------
 .../iotdb/cluster/utils/hash/RouterTest.java       |  2 +-
 19 files changed, 67 insertions(+), 86 deletions(-)
 rename cluster/src/main/java/org/apache/iotdb/cluster/qp/task/{QueryTask.java => DataQueryTask.java} (94%)


[incubator-iotdb] 03/03: remove groupIdMapNodeCache

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 411ea778316b8df7f6e5a0727dbb24af7fbc2e3a
Author: lta <li...@163.com>
AuthorDate: Wed May 22 19:33:06 2019 +0800

    remove groupIdMapNodeCache
---
 .../org/apache/iotdb/cluster/entity/Server.java    | 34 ++++++++++------------
 .../{QueryDataTask.java => DataQueryTask.java}     |  4 +--
 .../cluster/query/utils/ClusterRpcReaderUtils.java |  8 ++---
 .../iotdb/cluster/rpc/raft/NodeAsClient.java       |  4 +--
 .../rpc/raft/impl/RaftNodeAsClientManager.java     |  8 ++---
 .../iotdb/cluster/utils/hash/PhysicalNode.java     |  9 ++++++
 .../apache/iotdb/cluster/utils/hash/Router.java    | 26 +++--------------
 .../iotdb/cluster/utils/hash/RouterTest.java       |  2 +-
 8 files changed, 41 insertions(+), 54 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index 27b0e75..a84e389 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -32,9 +32,6 @@ import org.apache.iotdb.cluster.entity.metadata.MetadataHolder;
 import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
 import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager;
-import org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryJobNumAsyncProcessor;
-import org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryLeaderAsyncProcessor;
-import org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryMetricAsyncProcessor;
 import org.apache.iotdb.cluster.rpc.raft.processor.nonquery.DataGroupNonQueryAsyncProcessor;
 import org.apache.iotdb.cluster.rpc.raft.processor.nonquery.MetaGroupNonQueryAsyncProcessor;
 import org.apache.iotdb.cluster.rpc.raft.processor.querydata.CloseSeriesReaderSyncProcessor;
@@ -46,15 +43,18 @@ import org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryMetadataIn
 import org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryPathsAsyncProcessor;
 import org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QuerySeriesTypeAsyncProcessor;
 import org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryTimeSeriesAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryJobNumAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryLeaderAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryMetricAsyncProcessor;
 import org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryStatusAsyncProcessor;
+import org.apache.iotdb.cluster.service.ClusterMonitor;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
 import org.apache.iotdb.cluster.utils.hash.Router;
-import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.RegisterManager;
-import org.apache.iotdb.cluster.service.ClusterMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -124,20 +124,16 @@ public class Server {
     Router router = Router.getInstance();
     PhysicalNode[][] groups = router.getGroupsNodes(serverId.getIp(), serverId.getPort());
 
-    try {
-      for (int i = 0; i < groups.length; i++) {
-        PhysicalNode[] group = groups[i];
-        String groupId = router.getGroupID(group);
-        DataPartitionHolder dataPartitionHolder = new DataPartitionRaftHolder(groupId,
-            RaftUtils.getPeerIdArrayFrom(group), serverId, rpcServer, false);
-        dataPartitionHolder.init();
-        dataPartitionHolder.start();
-        dataPartitionHolderMap.put(groupId, dataPartitionHolder);
-        LOGGER.info("{} group has started", groupId);
-        Router.getInstance().showPhysicalNodes(groupId);
-      }
-    }catch (Exception e){
-      e.printStackTrace();
+    for (int i = 0; i < groups.length; i++) {
+      PhysicalNode[] group = groups[i];
+      String groupId = router.getGroupID(group);
+      DataPartitionHolder dataPartitionHolder = new DataPartitionRaftHolder(groupId,
+          RaftUtils.getPeerIdArrayFrom(group), serverId, rpcServer, false);
+      dataPartitionHolder.init();
+      dataPartitionHolder.start();
+      dataPartitionHolderMap.put(groupId, dataPartitionHolder);
+      LOGGER.info("{} group has started", groupId);
+      Router.getInstance().showPhysicalNodes(groupId);
     }
 
     try {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QueryDataTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/DataQueryTask.java
similarity index 94%
rename from cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QueryDataTask.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/qp/task/DataQueryTask.java
index 6946925..3b905d8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QueryDataTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/DataQueryTask.java
@@ -21,11 +21,11 @@ package org.apache.iotdb.cluster.qp.task;
 import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
 
-public class QueryDataTask {
+public class DataQueryTask {
   private BasicResponse basicResponse;
   private TaskState state;
 
-  public QueryDataTask(BasicResponse basicResponse,
+  public DataQueryTask(BasicResponse basicResponse,
       TaskState state) {
     this.basicResponse = basicResponse;
     this.state = state;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
index 0cc9805..75c2381 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.entity.Server;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
-import org.apache.iotdb.cluster.qp.task.QueryDataTask;
+import org.apache.iotdb.cluster.qp.task.DataQueryTask;
 import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
 import org.apache.iotdb.cluster.rpc.raft.NodeAsClient;
 import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
@@ -93,9 +93,9 @@ public class ClusterRpcReaderUtils {
               TASK_MAX_RETRY));
     }
     NodeAsClient nodeAsClient = RaftUtils.getRaftNodeAsClient();
-    QueryDataTask queryDataTask = nodeAsClient.syncHandleRequest(request, peerId);
-    if (queryDataTask.getState() == TaskState.FINISH) {
-      return queryDataTask.getBasicResponse();
+    DataQueryTask dataQueryTask = nodeAsClient.syncHandleRequest(request, peerId);
+    if (dataQueryTask.getState() == TaskState.FINISH) {
+      return dataQueryTask.getBasicResponse();
     } else {
       return handleQueryRequest(request, peerId, taskRetryNum + 1);
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
index 865b6ef..994fc07 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.cluster.rpc.raft;
 
 import com.alipay.sofa.jraft.entity.PeerId;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
-import org.apache.iotdb.cluster.qp.task.QueryDataTask;
+import org.apache.iotdb.cluster.qp.task.DataQueryTask;
 import org.apache.iotdb.cluster.qp.task.SingleQPTask;
 import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
 
@@ -43,7 +43,7 @@ public interface NodeAsClient {
    *
    * @param peerId leader node of the target group
    */
-  QueryDataTask syncHandleRequest(BasicRequest request, PeerId peerId);
+  DataQueryTask syncHandleRequest(BasicRequest request, PeerId peerId);
 
   /**
    * Shut down client
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
index eafa843..1d32fd5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
@@ -33,7 +33,7 @@ import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
-import org.apache.iotdb.cluster.qp.task.QueryDataTask;
+import org.apache.iotdb.cluster.qp.task.DataQueryTask;
 import org.apache.iotdb.cluster.qp.task.SingleQPTask;
 import org.apache.iotdb.cluster.rpc.raft.NodeAsClient;
 import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
@@ -266,13 +266,13 @@ public class RaftNodeAsClientManager {
     }
 
     @Override
-    public QueryDataTask syncHandleRequest(BasicRequest request, PeerId peerId) {
+    public DataQueryTask syncHandleRequest(BasicRequest request, PeerId peerId) {
       try {
         BasicResponse response = (BasicResponse) boltClientService.getRpcClient()
             .invokeSync(peerId.getEndpoint().toString(), request, TASK_TIMEOUT_MS);
-        return new QueryDataTask(response, TaskState.FINISH);
+        return new DataQueryTask(response, TaskState.FINISH);
       } catch (RemotingException | InterruptedException e) {
-        return new QueryDataTask(null, TaskState.EXCEPTION);
+        return new DataQueryTask(null, TaskState.EXCEPTION);
       } finally {
         releaseClient(RaftNodeAsClient.this);
       }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/PhysicalNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/PhysicalNode.java
index 66544a8..84cf431 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/PhysicalNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/PhysicalNode.java
@@ -24,6 +24,7 @@ public class PhysicalNode {
 
   private String ip;
   private int port;
+  private String groupId;
 
   public PhysicalNode(String ip, int port) {
     this.ip = ip;
@@ -77,6 +78,14 @@ public class PhysicalNode {
     return port;
   }
 
+  public String getGroupId() {
+    return groupId;
+  }
+
+  public void setGroupId(String groupId) {
+    this.groupId = groupId;
+  }
+
   @OnlyForTest
   public void setIp(String ip) {
     this.ip = ip;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
index 0552950..cc2604a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
@@ -51,11 +51,6 @@ public class Router {
   private Map<PhysicalNode, PhysicalNode[][]> dataPartitionCache = new HashMap<>();
 
   /**
-   * Key is the first node of the group, value is group id.
-   */
-  private Map<PhysicalNode, String> nodeMapGroupIdCache = new HashMap<>();
-
-  /**
    * Key is group id, value is the first node of the group.
    */
   private Map<String, PhysicalNode> groupIdMapNodeCache = new HashMap<>();
@@ -68,6 +63,7 @@ public class Router {
   private HashFunction hashFunction = new MD5Hash();
 
   private final SortedMap<Integer, PhysicalNode> physicalRing = new TreeMap<>();
+
   private final SortedMap<Integer, VirtualNode> virtualRing = new TreeMap<>();
 
   private static class RouterHolder {
@@ -103,17 +99,9 @@ public class Router {
       if (len < replicator) {
         throw new ErrorConfigureExecption(String.format("Replicator number %d is greater "
             + "than cluster number %d", replicator, len));
-      } else if (len == replicator) {
-        PhysicalNode[][] val = new PhysicalNode[1][len];
-        nodeMapGroupIdCache.put(first, DATA_GROUP_STR + "0");
-        groupIdMapNodeCache.put(DATA_GROUP_STR + "0", first);
-        for (int j = 0; j < len; j++) {
-          val[0][j] = nodes[(i + j) % len];
-        }
-        dataPartitionCache.put(first, val);
-      }  else {
+      } else {
         PhysicalNode[][] val = new PhysicalNode[replicator][replicator];
-        nodeMapGroupIdCache.put(first, DATA_GROUP_STR + i);
+        first.setGroupId(DATA_GROUP_STR + i);
         groupIdMapNodeCache.put(DATA_GROUP_STR + i, first);
         for (int j = 0; j < replicator; j++) {
           for (int k = 0; k < replicator; k++) {
@@ -149,7 +137,7 @@ public class Router {
   }
 
   public String getGroupID(PhysicalNode[] nodes) {
-    return nodeMapGroupIdCache.get(nodes[0]);
+    return nodes[0].getGroupId();
   }
 
   public PhysicalNode[][] getGroupsNodes(String ip, int port) {
@@ -192,7 +180,6 @@ public class Router {
     virtualRing.clear();
     sgRouter.clear();
     dataPartitionCache.clear();
-    nodeMapGroupIdCache.clear();
     groupIdMapNodeCache.clear();
   }
 
@@ -210,11 +197,6 @@ public class Router {
     }
   }
 
-  public boolean containPhysicalNodeBySG(String storageGroup, PhysicalNode node) {
-    PhysicalNode[] nodes = routeGroup(storageGroup);
-    return Arrays.asList(nodes).contains(node);
-  }
-
   public boolean containPhysicalNodeByGroupId(String groupId, PhysicalNode node) {
     PhysicalNode[] nodes = getNodesByGroupId(groupId);
     return Arrays.asList(nodes).contains(node);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/utils/hash/RouterTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/utils/hash/RouterTest.java
index 5a515fe..3799119 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/utils/hash/RouterTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/utils/hash/RouterTest.java
@@ -111,7 +111,7 @@ public class RouterTest {
     assertPhysicalNodeEquals(expected1, router.routeGroup(sg1));
     // test cache
     assertPhysicalNodeEquals(expected1, router.routeGroup(sg1));
-    assertEquals(Router.DATA_GROUP_STR + "0", router.getGroupID(router.routeGroup(sg1)));
+    assertEquals(Router.DATA_GROUP_STR + "2", router.getGroupID(router.routeGroup(sg1)));
 
     String sg2 = "root.vehicle.d1";
     assertEquals(router.routeNode(sg2), new PhysicalNode("192.168.130.2", PORT));


[incubator-iotdb] 02/03: format codes

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 2ac9056dc94131ae33a7495c65806943655799aa
Author: lta <li...@163.com>
AuthorDate: Wed May 22 18:19:37 2019 +0800

    format codes
---
 .../iotdb/cluster/entity/raft/DataStateMachine.java      |  2 +-
 .../iotdb/cluster/qp/executor/QueryMetadataExecutor.java |  4 ++--
 .../org/apache/iotdb/cluster/qp/task/BatchQPTask.java    | 10 +++++-----
 .../java/org/apache/iotdb/cluster/qp/task/QPTask.java    |  2 +-
 .../qp/task/{QueryTask.java => QueryDataTask.java}       |  4 ++--
 .../org/apache/iotdb/cluster/qp/task/SingleQPTask.java   |  2 +-
 .../iotdb/cluster/query/utils/ClusterRpcReaderUtils.java | 14 ++++----------
 .../org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java  |  4 ++--
 .../cluster/rpc/raft/impl/RaftNodeAsClientManager.java   | 16 ++++++++--------
 .../java/org/apache/iotdb/cluster/utils/RaftUtils.java   |  6 +++---
 10 files changed, 29 insertions(+), 35 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
index b8c6f43..de46d3a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
@@ -116,7 +116,7 @@ public class DataStateMachine extends StateMachineAdapter {
         PhysicalPlan plan = PhysicalPlanLogTransfer.logToOperator(planByte);
 
         LOGGER.debug("OperatorType :{}", plan.getOperatorType());
-        /** If the request is to set path and sg of the path doesn't exist, it needs to run null-read in meta group to avoid out of data sync **/
+        /** If the request is to set path and sg of the path doesn't exist, it needs to receive null-read in meta group to avoid out of data sync **/
         if (plan.getOperatorType() == OperatorType.CREATE_TIMESERIES && !checkPathExistence(
             ((MetadataPlan) plan).getPath().getFullPath())) {
           RaftUtils.handleNullReadToMetaGroup(status);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 229009e..6043ce6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -452,7 +452,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
       QueryStorageGroupResponse response;
       response = QueryStorageGroupResponse
           .createSuccessResponse(metadataHolder.getFsm().getAllStorageGroups());
-      task.run(response);
+      task.receive(response);
     } else {
       ((RaftService) metadataHolder.getService()).getNode()
           .readIndex(reqContext, new ReadIndexClosure() {
@@ -466,7 +466,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
               } else {
                 response = QueryStorageGroupResponse.createErrorResponse(status.getErrorMsg());
               }
-              task.run(response);
+              task.receive(response);
             }
           });
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
index 667a55e..4562d77 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
@@ -91,7 +91,7 @@ public class BatchQPTask extends MultiQPTask {
    * @param basicResponse response from receiver
    */
   @Override
-  public void run(BasicResponse basicResponse) {
+  public void receive(BasicResponse basicResponse) {
     lock.lock();
     try {
       String groupId = basicResponse.getGroupId();
@@ -140,10 +140,10 @@ public class BatchQPTask extends MultiQPTask {
   private void executeLocalSubTask(QPTask subTask, String groupId) {
     try {
       executor.handleNonQueryRequestLocally(groupId, subTask);
-      this.run(subTask.getResponse());
+      this.receive(subTask.getResponse());
     } catch (InterruptedException e) {
       LOGGER.error("Handle sub task locally failed.");
-      this.run(DataGroupNonQueryResponse.createErrorResponse(groupId, e.getMessage()));
+      this.receive(DataGroupNonQueryResponse.createErrorResponse(groupId, e.getMessage()));
     }
   }
 
@@ -153,10 +153,10 @@ public class BatchQPTask extends MultiQPTask {
   private void executeRpcSubTask(SingleQPTask subTask, PeerId leader, String groupId) {
     try {
       executor.asyncHandleNonQueryTask(subTask, leader);
-      this.run(subTask.getResponse());
+      this.receive(subTask.getResponse());
     } catch (RaftConnectionException | InterruptedException e) {
       LOGGER.error("Async handle sub task failed.");
-      this.run(DataGroupNonQueryResponse.createErrorResponse(groupId, e.getMessage()));
+      this.receive(DataGroupNonQueryResponse.createErrorResponse(groupId, e.getMessage()));
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QPTask.java
index 96a517a..2ca1359 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QPTask.java
@@ -78,7 +78,7 @@ public abstract class QPTask {
    *
    * @param basicResponse response from receiver
    */
-  public abstract void run(BasicResponse basicResponse);
+  public abstract void receive(BasicResponse basicResponse);
 
   public boolean isSyncTask() {
     return isSyncTask;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QueryTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QueryDataTask.java
similarity index 94%
rename from cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QueryTask.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QueryDataTask.java
index f4cb4b5..6946925 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QueryTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QueryDataTask.java
@@ -21,11 +21,11 @@ package org.apache.iotdb.cluster.qp.task;
 import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
 
-public class QueryTask {
+public class QueryDataTask {
   private BasicResponse basicResponse;
   private TaskState state;
 
-  public QueryTask(BasicResponse basicResponse,
+  public QueryDataTask(BasicResponse basicResponse,
       TaskState state) {
     this.basicResponse = basicResponse;
     this.state = state;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java
index 805834e..b3d5f47 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java
@@ -41,7 +41,7 @@ public class SingleQPTask extends QPTask {
    * Process response. If it's necessary to redirect leader, redo the task.
    */
   @Override
-  public void run(BasicResponse response) {
+  public void receive(BasicResponse response) {
     if(taskState != TaskState.EXCEPTION) {
       this.response = response;
       if(response == null){
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
index d38ca83..0cc9805 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
@@ -24,17 +24,11 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.entity.Server;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
-import org.apache.iotdb.cluster.qp.task.QueryTask;
-import org.apache.iotdb.cluster.query.PathType;
+import org.apache.iotdb.cluster.qp.task.QueryDataTask;
 import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
 import org.apache.iotdb.cluster.rpc.raft.NodeAsClient;
 import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
-import org.apache.iotdb.cluster.rpc.raft.request.querydata.CloseSeriesReaderRequest;
-import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataByTimestampRequest;
-import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
-import org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataByTimestampResponse;
-import org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataResponse;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.cluster.utils.hash.Router;
 import org.slf4j.Logger;
@@ -99,9 +93,9 @@ public class ClusterRpcReaderUtils {
               TASK_MAX_RETRY));
     }
     NodeAsClient nodeAsClient = RaftUtils.getRaftNodeAsClient();
-    QueryTask queryTask = nodeAsClient.syncHandleRequest(request, peerId);
-    if (queryTask.getState() == TaskState.FINISH) {
-      return queryTask.getBasicResponse();
+    QueryDataTask queryDataTask = nodeAsClient.syncHandleRequest(request, peerId);
+    if (queryDataTask.getState() == TaskState.FINISH) {
+      return queryDataTask.getBasicResponse();
     } else {
       return handleQueryRequest(request, peerId, taskRetryNum + 1);
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
index b4a2f25..865b6ef 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.cluster.rpc.raft;
 
 import com.alipay.sofa.jraft.entity.PeerId;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
-import org.apache.iotdb.cluster.qp.task.QueryTask;
+import org.apache.iotdb.cluster.qp.task.QueryDataTask;
 import org.apache.iotdb.cluster.qp.task.SingleQPTask;
 import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
 
@@ -43,7 +43,7 @@ public interface NodeAsClient {
    *
    * @param peerId leader node of the target group
    */
-  QueryTask syncHandleRequest(BasicRequest request, PeerId peerId);
+  QueryDataTask syncHandleRequest(BasicRequest request, PeerId peerId);
 
   /**
    * Shut down client
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
index 19f1343..eafa843 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
@@ -33,7 +33,7 @@ import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
-import org.apache.iotdb.cluster.qp.task.QueryTask;
+import org.apache.iotdb.cluster.qp.task.QueryDataTask;
 import org.apache.iotdb.cluster.qp.task.SingleQPTask;
 import org.apache.iotdb.cluster.rpc.raft.NodeAsClient;
 import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
@@ -55,7 +55,7 @@ public class RaftNodeAsClientManager {
   private static final int TASK_TIMEOUT_MS = CLUSTER_CONFIG.getQpTaskTimeout();
 
   /**
-   * Max valid number of @NodeAsClient usage, represent the number can run simultaneously
+   * Max valid number of @NodeAsClient usage, represent the number can receive simultaneously
    * at the same time
    */
   private static final int MAX_VALID_CLIENT_NUM = CLUSTER_CONFIG.getMaxNumOfInnerRpcClient();
@@ -240,7 +240,7 @@ public class RaftNodeAsClientManager {
                   public void onResponse(Object result) {
                     BasicResponse response = (BasicResponse) result;
                     releaseClient(RaftNodeAsClient.this);
-                    qpTask.run(response);
+                    qpTask.receive(response);
                   }
 
                   @Override
@@ -248,7 +248,7 @@ public class RaftNodeAsClientManager {
                     LOGGER.error("Bolt rpc client occurs errors when handling Request", e);
                     qpTask.setTaskState(TaskState.EXCEPTION);
                     releaseClient(RaftNodeAsClient.this);
-                    qpTask.run(null);
+                    qpTask.receive(null);
                   }
 
                   @Override
@@ -260,19 +260,19 @@ public class RaftNodeAsClientManager {
         LOGGER.error(e.getMessage());
         qpTask.setTaskState(TaskState.EXCEPTION);
         releaseClient(RaftNodeAsClient.this);
-        qpTask.run(null);
+        qpTask.receive(null);
         throw new RaftConnectionException(e);
       }
     }
 
     @Override
-    public QueryTask syncHandleRequest(BasicRequest request, PeerId peerId) {
+    public QueryDataTask syncHandleRequest(BasicRequest request, PeerId peerId) {
       try {
         BasicResponse response = (BasicResponse) boltClientService.getRpcClient()
             .invokeSync(peerId.getEndpoint().toString(), request, TASK_TIMEOUT_MS);
-        return new QueryTask(response, TaskState.FINISH);
+        return new QueryDataTask(response, TaskState.FINISH);
       } catch (RemotingException | InterruptedException e) {
-        return new QueryTask(null, TaskState.EXCEPTION);
+        return new QueryDataTask(null, TaskState.EXCEPTION);
       } finally {
         releaseClient(RaftNodeAsClient.this);
       }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index 650dbe4..601b902 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -298,7 +298,7 @@ public class RaftUtils {
       if (!status.isOk()) {
         response.setErrorMsg(status.getErrorMsg());
       }
-      qpTask.run(response);
+      qpTask.receive(response);
     });
     task.setDone(closure);
     try {
@@ -399,7 +399,7 @@ public class RaftUtils {
                 status.setCode(-1);
                 status.setErrorMsg(status.getErrorMsg());
               }
-              nullReadTask.run(response);
+              nullReadTask.receive(response);
             }
           });
       nullReadTask.await();
@@ -435,7 +435,7 @@ public class RaftUtils {
                 status.setCode(-1);
                 status.setErrorMsg(status.getErrorMsg());
               }
-              nullReadTask.run(response);
+              nullReadTask.receive(response);
             }
           });
       nullReadTask.await();


[incubator-iotdb] 01/03: fix some bug

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 7ca00daa26f47e35b9765415942b4e409d5f9256
Author: lta <li...@163.com>
AuthorDate: Wed May 22 15:00:00 2019 +0800

    fix  some bug
---
 .../iotdb/cluster/query/common/ClusterNullableBatchData.java       | 2 +-
 .../querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java     | 7 +++++--
 .../query/reader/querynode/ClusterSelectSeriesBatchReader.java     | 3 ---
 .../query/reader/querynode/IClusterSeriesBatchReaderEntity.java    | 4 ++--
 .../iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java       | 2 +-
 5 files changed, 9 insertions(+), 9 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/common/ClusterNullableBatchData.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/common/ClusterNullableBatchData.java
index 8315a04..699db3e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/common/ClusterNullableBatchData.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/common/ClusterNullableBatchData.java
@@ -69,7 +69,7 @@ public class ClusterNullableBatchData extends BatchData {
     return timeValuePairList.size();
   }
 
-  public TimeValuePair getTimeValuePair() {
+  public TimeValuePair getCurrentTimeValuePair() {
     return index < length() ? timeValuePairList.get(index) : null;
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java
index 5c1d9d1..30ecf1b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java
@@ -26,13 +26,15 @@ import java.util.List;
 import org.apache.iotdb.cluster.query.common.ClusterNullableBatchData;
 import org.apache.iotdb.cluster.query.utils.ClusterTimeValuePairUtils;
 import org.apache.iotdb.db.query.dataset.groupby.GroupByWithOnlyTimeFilterDataSet;
-import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Field;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 
+/**
+ * Batch reader entity for select paths in group by query with only time filter.
+ */
 public class ClusterGroupBySelectSeriesBatchReaderEntity implements
     IClusterSeriesBatchReaderEntity {
 
@@ -73,7 +75,8 @@ public class ClusterGroupBySelectSeriesBatchReaderEntity implements
       for (int j = 0; j < paths.size(); j++) {
         ClusterNullableBatchData batchData = (ClusterNullableBatchData) batchDataList.get(j);
         Object value = fieldList.get(j).getObjectValue(dataTypes.get(j));
-        batchData.addTimeValuePair(fieldList.get(j).toString().equals("null") ? null : ClusterTimeValuePairUtils.getTimeValuePair(time, value,dataTypes.get(j)));
+        batchData.addTimeValuePair(fieldList.get(j).toString().equals("null") ? null
+            : ClusterTimeValuePairUtils.getTimeValuePair(time, value, dataTypes.get(j)));
       }
     }
     return batchDataList;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java
index 28b6346..cbbad2e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java
@@ -51,9 +51,6 @@ public class ClusterSelectSeriesBatchReader extends
     this.reader = reader;
   }
 
-  public ClusterSelectSeriesBatchReader() {
-  }
-
   @Override
   public boolean hasNext() throws IOException {
     return reader.hasNext();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterSeriesBatchReaderEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterSeriesBatchReaderEntity.java
index e6e9e86..80e72b6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterSeriesBatchReaderEntity.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/IClusterSeriesBatchReaderEntity.java
@@ -23,14 +23,14 @@ import java.util.List;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 
 /**
- * Batch reader for filter series which is used in query node.
+ * Batch reader for series which is used in query node.
  */
 public interface IClusterSeriesBatchReaderEntity {
 
   boolean hasNext() throws IOException;
 
   /**
-   * Get next batch data of all filter series.
+   * Get next batch data of all series.
    */
   List<BatchData> nextBatchList() throws IOException;
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
index d9c8d75..3141f99 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
@@ -40,7 +40,7 @@ public class ClusterTimeValuePairUtils {
    */
   public static TimeValuePair getCurrentTimeValuePair(BatchData data) {
     if (data instanceof ClusterNullableBatchData) {
-      return ((ClusterNullableBatchData) data).getTimeValuePair();
+      return ((ClusterNullableBatchData) data).getCurrentTimeValuePair();
     } else {
       return TimeValuePairUtils.getCurrentTimeValuePair(data);
     }