You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ea...@apache.org on 2019/05/21 03:03:53 UTC

[incubator-iotdb] branch cluster updated: make nodetool get accurate leader of data group

This is an automated email from the ASF dual-hosted git repository.

east pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster by this push:
     new ce845f2  make nodetool get accurate leader of data group
     new 99ffb6b  Merge remote-tracking branch 'origin/cluster' into cluster
ce845f2 is described below

commit ce845f201300b6bf36b91d22778db51c160327e4
Author: mdf369 <95...@qq.com>
AuthorDate: Tue May 21 11:03:22 2019 +0800

    make nodetool get accurate leader of data group
---
 .../org/apache/iotdb/cluster/entity/Server.java    |  2 +
 .../cluster/qp/executor/AbstractQPExecutor.java    |  2 +-
 .../cluster/qp/executor/NonQueryExecutor.java      |  2 +-
 .../apache/iotdb/cluster/qp/task/BatchQPTask.java  |  2 +-
 .../nonquery/DataGroupNonQueryAsyncProcessor.java  |  2 +-
 .../nonquery/MetaGroupNonQueryAsyncProcessor.java  |  2 +-
 .../querymetric/QueryLeaderAsyncProcessor.java     | 45 ++++++++++++++++
 .../request/querymetric/QueryLeaderRequest.java    | 31 +++++++++++
 .../response/querymetric/QueryLeaderResponse.java  | 47 +++++++++++++++++
 .../iotdb/cluster/utils/QPExecutorUtils.java       |  2 +-
 .../org/apache/iotdb/cluster/utils/RaftUtils.java  | 60 ++++++++++++++++++++--
 .../apache/iotdb/cluster/utils/RaftUtilsTest.java  |  6 +--
 12 files changed, 189 insertions(+), 14 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index 1567bcd..70d2c26 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
 import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager;
 import org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryJobNumAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryLeaderAsyncProcessor;
 import org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryMetricAsyncProcessor;
 import org.apache.iotdb.cluster.rpc.raft.processor.nonquery.DataGroupNonQueryAsyncProcessor;
 import org.apache.iotdb.cluster.rpc.raft.processor.nonquery.MetaGroupNonQueryAsyncProcessor;
@@ -167,6 +168,7 @@ public class Server {
     rpcServer.registerUserProcessor(new QueryMetricAsyncProcessor());
     rpcServer.registerUserProcessor(new QueryJobNumAsyncProcessor());
     rpcServer.registerUserProcessor(new QueryStatusAsyncProcessor());
+    rpcServer.registerUserProcessor(new QueryLeaderAsyncProcessor());
   }
 
   public void stop() throws ProcessorException, InterruptedException {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
index 1f0648c..9841781 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
@@ -136,7 +136,7 @@ public abstract class AbstractQPExecutor {
         String groupId = task.getRequest().getGroupID();
         RaftUtils.removeCachedRaftGroupLeader(groupId);
         LOGGER.debug("Remove cached raft group leader of {}", groupId);
-        leader = RaftUtils.getLeaderPeerID(groupId);
+        leader = RaftUtils.getLocalLeaderPeerID(groupId);
       }
       task.resetTask();
       return asyncHandleNonQuerySingleTaskGetRes(task, leader, taskRetryNum + 1);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index 1420370..75fc3a8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -326,7 +326,7 @@ public class NonQueryExecutor extends AbstractQPExecutor {
     if (QPExecutorUtils.canHandleNonQueryByGroupId(groupId)) {
       return handleNonQueryRequestLocally(groupId, qpTask);
     } else {
-      PeerId leader = RaftUtils.getLeaderPeerID(groupId);
+      PeerId leader = RaftUtils.getLocalLeaderPeerID(groupId);
       return asyncHandleNonQueryTask(qpTask, leader);
     }
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
index 43edd67..667a55e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
@@ -126,7 +126,7 @@ public class BatchQPTask extends MultiQPTask {
         taskThread = QPTaskManager.getInstance()
             .submit(() -> executeLocalSubTask(subTask, groupId));
       } else {
-        PeerId leader = RaftUtils.getLeaderPeerID(groupId);
+        PeerId leader = RaftUtils.getLocalLeaderPeerID(groupId);
         taskThread = QPTaskManager.getInstance()
             .submit(() -> executeRpcSubTask(subTask, leader, groupId));
       }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/DataGroupNonQueryAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/DataGroupNonQueryAsyncProcessor.java
index d3537fe..c331714 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/DataGroupNonQueryAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/DataGroupNonQueryAsyncProcessor.java
@@ -49,7 +49,7 @@ public class DataGroupNonQueryAsyncProcessor extends
     String groupId = request.getGroupID();
     DataPartitionRaftHolder dataPartitionRaftHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
     if (!dataPartitionRaftHolder.getFsm().isLeader()) {
-      PeerId leader = RaftUtils.getLeaderPeerID(groupId);
+      PeerId leader = RaftUtils.getLocalLeaderPeerID(groupId);
       LOGGER.debug("Request need to redirect leader: {}, groupId : {} ", leader, groupId);
 
       DataGroupNonQueryResponse response = DataGroupNonQueryResponse
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/MetaGroupNonQueryAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/MetaGroupNonQueryAsyncProcessor.java
index 9f09bbb..95f9e32 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/MetaGroupNonQueryAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/MetaGroupNonQueryAsyncProcessor.java
@@ -49,7 +49,7 @@ public class MetaGroupNonQueryAsyncProcessor extends
     String groupId = request.getGroupID();
     MetadataRaftHolder metadataHolder = RaftUtils.getMetadataRaftHolder();
     if (!metadataHolder.getFsm().isLeader()) {
-      PeerId leader = RaftUtils.getLeaderPeerID(groupId);
+      PeerId leader = RaftUtils.getLocalLeaderPeerID(groupId);
       LOGGER.debug("Request need to redirect leader: {}, groupId : {} ", leader, groupId);
 
       MetaGroupNonQueryResponse response = MetaGroupNonQueryResponse
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetric/QueryLeaderAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetric/QueryLeaderAsyncProcessor.java
new file mode 100644
index 0000000..9c5a2bf
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/querymetric/QueryLeaderAsyncProcessor.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.processor.querymetric;
+
+import com.alipay.remoting.AsyncContext;
+import com.alipay.remoting.BizContext;
+import org.apache.iotdb.cluster.rpc.raft.processor.BasicAsyncUserProcessor;
+import org.apache.iotdb.cluster.rpc.raft.request.querymetric.QueryLeaderRequest;
+import org.apache.iotdb.cluster.rpc.raft.response.querymetric.QueryLeaderResponse;
+import org.apache.iotdb.cluster.utils.RaftUtils;
+
+public class QueryLeaderAsyncProcessor extends BasicAsyncUserProcessor<QueryLeaderRequest> {
+
+  @Override
+  public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
+      QueryLeaderRequest request) {
+    String groupId = request.getGroupID();
+
+    QueryLeaderResponse response = QueryLeaderResponse.createSuccessResponse(groupId,
+        RaftUtils.getLocalLeaderPeerID(groupId));
+    response.addResult(true);
+    asyncContext.sendResponse(response);
+  }
+
+  @Override
+  public String interest() {
+    return QueryLeaderRequest.class.getName();
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetric/QueryLeaderRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetric/QueryLeaderRequest.java
new file mode 100644
index 0000000..a3a2c06
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querymetric/QueryLeaderRequest.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.request.querymetric;
+
+import java.io.Serializable;
+import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
+
+public class QueryLeaderRequest extends BasicRequest implements Serializable {
+
+  private static final long serialVersionUID = 8438291563829380829L;
+
+  public QueryLeaderRequest(String groupID) {
+    super(groupID);
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetric/QueryLeaderResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetric/QueryLeaderResponse.java
new file mode 100644
index 0000000..ad536aa
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/response/querymetric/QueryLeaderResponse.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.raft.response.querymetric;
+
+import com.alipay.sofa.jraft.entity.PeerId;
+import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
+
+public class QueryLeaderResponse extends BasicResponse {
+
+  private PeerId leader;
+
+  private QueryLeaderResponse(String groupId, boolean redirected, String leaderStr,
+      String errorMsg) {
+    super(groupId, redirected, leaderStr, errorMsg);
+  }
+
+  public static QueryLeaderResponse createSuccessResponse(String groupId, PeerId leader) {
+    QueryLeaderResponse response = new QueryLeaderResponse(groupId, false, null,
+        null);
+    response.leader = leader;
+    return response;
+  }
+
+  public static QueryLeaderResponse createErrorResponse(String groupId, String errorMsg) {
+    return new QueryLeaderResponse(groupId, false, null, errorMsg);
+  }
+
+  public PeerId getLeader() {
+    return leader;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/QPExecutorUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/QPExecutorUtils.java
index 5a60351..811928c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/QPExecutorUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/QPExecutorUtils.java
@@ -124,7 +124,7 @@ public class QPExecutorUtils {
   public static boolean checkDataGroupLeader(String groupId) {
     boolean isLeader = false;
     if (router.containPhysicalNodeByGroupId(groupId, localNode) && RaftUtils
-        .getPhysicalNodeFrom(RaftUtils.getLeaderPeerID(groupId)).equals(localNode)) {
+        .getPhysicalNodeFrom(RaftUtils.getLocalLeaderPeerID(groupId)).equals(localNode)) {
       isLeader = true;
     }
     return isLeader;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index 5f33652..9b59d6f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -61,10 +61,12 @@ import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager;
 import org.apache.iotdb.cluster.rpc.raft.request.BasicNonQueryRequest;
 import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
 import org.apache.iotdb.cluster.rpc.raft.request.querymetric.QueryJobNumRequest;
+import org.apache.iotdb.cluster.rpc.raft.request.querymetric.QueryLeaderRequest;
 import org.apache.iotdb.cluster.rpc.raft.request.querymetric.QueryMetricRequest;
 import org.apache.iotdb.cluster.rpc.raft.request.querymetric.QueryStatusRequest;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
 import org.apache.iotdb.cluster.rpc.raft.response.querymetric.QueryJobNumResponse;
+import org.apache.iotdb.cluster.rpc.raft.response.querymetric.QueryLeaderResponse;
 import org.apache.iotdb.cluster.rpc.raft.response.querymetric.QueryMetricResponse;
 import org.apache.iotdb.cluster.rpc.raft.response.nonquery.DataGroupNonQueryResponse;
 import org.apache.iotdb.cluster.rpc.raft.response.nonquery.MetaGroupNonQueryResponse;
@@ -92,7 +94,7 @@ public class RaftUtils {
 
   /**
    * The cache will be update in two case: 1. When @onLeaderStart() method of state machine is
-   * called, the cache will be update. 2. When @getLeaderPeerID() in this class is called and cache
+   * called, the cache will be update. 2. When @getLocalLeaderPeerID() in this class is called and cache
    * don't have the key, it's will get random peer and update. 3. When @redirected of BasicRequest
    * is true, the task will be retry and the cache will update.
    */
@@ -161,12 +163,44 @@ public class RaftUtils {
    *
    * @return leader id
    */
-  public static PeerId getLeaderPeerID(String groupId) {
+  public static PeerId getLocalLeaderPeerID(String groupId) {
     if (!groupLeaderCache.containsKey(groupId)) {
       PeerId randomPeerId = getRandomPeerID(groupId);
       groupLeaderCache.put(groupId, randomPeerId);
     }
-    return groupLeaderCache.get(groupId);
+    PeerId leader = groupLeaderCache.get(groupId);
+    LOGGER.debug("Get local cached leader {} of group {}.", leader, groupId);
+    return leader;
+  }
+
+  /**
+   * Get peer id to send request. If groupLeaderCache has the group id, then return leader id of the
+   * group.Otherwise, random get a peer of the group.
+   *
+   * @return leader id
+   */
+  public static PeerId getLeaderPeerIDFromRemoteNode(PeerId peerId, String groupId) {
+    QueryLeaderRequest request = new QueryLeaderRequest(groupId);
+    SingleQPTask task = new SingleQPTask(false, request);
+
+    LOGGER.debug("Execute get leader of group {} from node {}.", groupId, peerId);
+    try {
+      NodeAsClient client = RaftNodeAsClientManager.getInstance().getRaftNodeAsClient();
+      /** Call async method **/
+      client.asyncHandleRequest(task.getRequest(), peerId, task);
+
+      task.await();
+      PeerId leader = null;
+      if (task.getTaskState() == TaskState.FINISH) {
+        BasicResponse response = task.getResponse();
+        leader = response == null ? null : ((QueryLeaderResponse) response).getLeader();
+      }
+      LOGGER.debug("Get leader {} of group {} from node {}.", leader, groupId, peerId);
+      return leader;
+    } catch (RaftConnectionException | InterruptedException e) {
+      LOGGER.error("Fail to get leader of group {} from remote node {} because of {}.", groupId, peerId, e.getMessage());
+      return null;
+    }
   }
 
   /**
@@ -465,7 +499,23 @@ public class RaftUtils {
       groupId = router.getGroupID(group);
       nodes = getPeerIdArrayFrom(group);
     }
-    PeerId leader = RaftUtils.getLeaderPeerID(groupId);
+
+    PeerId leader = null;
+    for (PeerId node : nodes) {
+      LOGGER.debug("Try to get leader of group {} from node {}.", groupId, node);
+      leader = getLeaderPeerIDFromRemoteNode(node, groupId);
+      LOGGER.debug("Get leader {} of group {} from node {}.", leader, groupId, node);
+      if (leader != null) {
+        break;
+      }
+    }
+
+    if (leader == null) {
+      LOGGER.debug("Fail to get leader of group {} from all remote nodes, get it locally.", groupId);
+      leader = RaftUtils.getLocalLeaderPeerID(groupId);
+      LOGGER.debug("Get leader {} of group {} locally.", leader, groupId);
+    }
+
     for (int i = 0; i < nodes.length; i++) {
       if (leader.equals(nodes[i])) {
         PeerId t = nodes[i];
@@ -608,7 +658,7 @@ public class RaftUtils {
     SingleQPTask task = new SingleQPTask(false, request);
 
     LOGGER.debug("Execute get metric for {} statement for group {}.", metric, groupId);
-    PeerId holder = RaftUtils.getLeaderPeerID(groupId);
+    PeerId holder = RaftUtils.getLocalLeaderPeerID(groupId);
     LOGGER.debug("Get metric from node {}.", holder);
     try {
       NodeAsClient client = RaftNodeAsClientManager.getInstance().getRaftNodeAsClient();
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java
index e7a4201..b6866ad 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/utils/RaftUtilsTest.java
@@ -150,18 +150,18 @@ public class RaftUtilsTest {
     RaftUtils.clearRaftGroupLeader();
     PeerId metadtaLeader = PeerId.parsePeer(ipList[0]);
     RaftUtils.updateRaftGroupLeader(ClusterConfig.METADATA_GROUP_ID, metadtaLeader);
-    assertEquals(metadtaLeader, RaftUtils.getLeaderPeerID(ClusterConfig.METADATA_GROUP_ID));
+    assertEquals(metadtaLeader, RaftUtils.getLocalLeaderPeerID(ClusterConfig.METADATA_GROUP_ID));
 
     boolean[] isLeaderCached = {true, false, true, false, true};
     for (int i = 0; i < ipList.length; i++) {
       if (isLeaderCached[i]) {
         PeerId leaderExpeted = PeerId.parsePeer(ipList[(i + 1) % ipList.length]);
         RaftUtils.updateRaftGroupLeader(Router.DATA_GROUP_STR + i, leaderExpeted);
-        PeerId leaderActual = RaftUtils.getLeaderPeerID(Router.DATA_GROUP_STR + i);
+        PeerId leaderActual = RaftUtils.getLocalLeaderPeerID(Router.DATA_GROUP_STR + i);
         assertTrue(leaderExpeted.equals(leaderActual));
 
       } else {
-        PeerId leader = RaftUtils.getLeaderPeerID(Router.DATA_GROUP_STR + i);
+        PeerId leader = RaftUtils.getLocalLeaderPeerID(Router.DATA_GROUP_STR + i);
         boolean flag = false;
         for (int j = 0; j < replicator; j++) {
           String addr = ipList[(i + j) % ipList.length];