You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ea...@apache.org on 2019/05/28 02:18:09 UTC

[incubator-iotdb] branch cluster updated: fix bug

This is an automated email from the ASF dual-hosted git repository.

east pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster by this push:
     new 1af785f  fix bug
1af785f is described below

commit 1af785fbda9e6c87ebc6eb2e0f4bdb8a4a9767d1
Author: mdf369 <95...@qq.com>
AuthorDate: Tue May 28 10:17:48 2019 +0800

    fix bug
---
 .../cluster/qp/executor/AbstractQPExecutor.java    | 32 ++++++++++++++++-----
 .../org/apache/iotdb/cluster/utils/RaftUtils.java  | 33 ++++++++++++++++++++++
 2 files changed, 58 insertions(+), 7 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
index 033bfca..21c7786 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
@@ -19,6 +19,8 @@
 package org.apache.iotdb.cluster.qp.executor;
 
 import com.alipay.sofa.jraft.entity.PeerId;
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
@@ -90,15 +92,17 @@ public abstract class AbstractQPExecutor {
    * @param taskRetryNum Number of QPTask retries due to timeout and redirected.
    * @return basic response
    */
-  protected BasicResponse syncHandleSingleTaskGetRes(SingleQPTask task, int taskRetryNum, String taskInfo, String groupId)
+  private BasicResponse syncHandleSingleTaskGetRes(SingleQPTask task, int taskRetryNum, String taskInfo, String groupId, Set<PeerId> downNodeSet)
       throws InterruptedException, RaftConnectionException {
     PeerId firstNode = task.getTargetNode();
+    RaftUtils.updatePeerIDOrder(firstNode, groupId);
     BasicResponse response = null;
     try {
       asyncSendSingleTask(task, taskRetryNum);
-      response = syncGetSingleTaskRes(task, taskRetryNum, taskInfo, groupId);
+      response = syncGetSingleTaskRes(task, taskRetryNum, taskInfo, groupId, downNodeSet);
     } catch (RaftConnectionException ex) {
       boolean success = false;
+      downNodeSet.add(firstNode);
       while (!success) {
         PeerId nextNode = null;
         try {
@@ -113,11 +117,12 @@ public abstract class AbstractQPExecutor {
           task.setTargetNode(nextNode);
           task.setTaskState(TaskState.INITIAL);
           asyncSendSingleTask(task, taskRetryNum);
-          response = syncGetSingleTaskRes(task, taskRetryNum, taskInfo, groupId);
+          response = syncGetSingleTaskRes(task, taskRetryNum, taskInfo, groupId, downNodeSet);
           LOGGER.debug("{} task for group {} to node {} succeed.", taskInfo, groupId, nextNode);
           success = true;
         } catch (RaftConnectionException e1) {
           LOGGER.debug("{} task for group {} to node {} fail.", taskInfo, groupId, nextNode);
+          downNodeSet.add(nextNode);
         }
       }
       LOGGER.debug("The final result for {} task is {}", taskInfo, success);
@@ -128,6 +133,11 @@ public abstract class AbstractQPExecutor {
     return response;
   }
 
+  protected BasicResponse syncHandleSingleTaskGetRes(SingleQPTask task, int taskRetryNum, String taskInfo, String groupId)
+      throws RaftConnectionException, InterruptedException {
+    return syncHandleSingleTaskGetRes(task, taskRetryNum, taskInfo, groupId, new HashSet<>());
+  }
+
   /**
    * Asynchronous send rpc task via client
    *  @param task rpc task
@@ -149,7 +159,7 @@ public abstract class AbstractQPExecutor {
    * @param task rpc task
    * @param taskRetryNum Retry time of the task
    */
-  private BasicResponse syncGetSingleTaskRes(SingleQPTask task, int taskRetryNum, String taskInfo, String groupId)
+  private BasicResponse syncGetSingleTaskRes(SingleQPTask task, int taskRetryNum, String taskInfo, String groupId, Set<PeerId> downNodeSet)
       throws InterruptedException, RaftConnectionException {
     task.await();
     PeerId leader;
@@ -160,8 +170,16 @@ public abstract class AbstractQPExecutor {
       } else if (task.getTaskState() == TaskState.REDIRECT) {
         // redirect to the right leader
         leader = PeerId.parsePeer(task.getResponse().getLeaderStr());
-        LOGGER.debug("Redirect leader: {}, group id = {}", leader, task.getRequest().getGroupID());
-        RaftUtils.updateRaftGroupLeader(task.getRequest().getGroupID(), leader);
+
+        if (downNodeSet.contains(leader)) {
+          LOGGER.debug("Redirect leader {} is down, group {} might be down.", leader, groupId);
+          throw new RaftConnectionException(
+              String.format("Can not connect to leader of remote node : %s", task.getTargetNode()));
+        } else {
+          LOGGER
+              .debug("Redirect leader: {}, group id = {}", leader, task.getRequest().getGroupID());
+          RaftUtils.updateRaftGroupLeader(task.getRequest().getGroupID(), leader);
+        }
       } else {
         RaftUtils.removeCachedRaftGroupLeader(groupId);
         LOGGER.debug("Remove cached raft group leader of {}", groupId);
@@ -169,7 +187,7 @@ public abstract class AbstractQPExecutor {
       }
       task.setTargetNode(leader);
       task.resetTask();
-      return syncHandleSingleTaskGetRes(task, taskRetryNum + 1, taskInfo, groupId);
+      return syncHandleSingleTaskGetRes(task, taskRetryNum + 1, taskInfo, groupId, downNodeSet);
     }
     return task.getResponse();
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index f9901b8..44b8747 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -134,9 +134,42 @@ public class RaftUtils {
     }
     nodeIndexMap.get().put(groupId, (index + 1) % len);
 
+    LOGGER.debug("Get node {} for group {}", peerId, groupId);
+
     return peerId;
   }
 
+  public static void updatePeerIDOrder(PeerId peerId, String groupId) {
+    int index = -1;
+    int len;
+    if (groupId.equals(ClusterConfig.METADATA_GROUP_ID)) {
+      RaftService service = (RaftService) server.getMetadataHolder().getService();
+      List<PeerId> peerIdList = service.getPeerIdList();
+      len = peerIdList.size();
+      index = peerIdList.indexOf(peerId);
+    } else {
+      PhysicalNode[] physicalNodes = router.getNodesByGroupId(groupId);
+      len = physicalNodes.length;
+      PhysicalNode node = getPhysicalNodeFrom(peerId);
+      for (int i = 0; i < physicalNodes.length; i++) {
+        if (physicalNodes[i].equals(node)) {
+          index = i;
+          break;
+        }
+      }
+    }
+
+    if (index == -1) {
+      LOGGER.warn(
+          "Fail to update order of node {} for group {}, because the group doesn't contain it.",
+          peerId, groupId);
+    } else {
+      LOGGER.debug("Update order of node {} for group {}, current index is {}", peerId, groupId,
+          index);
+      nodeIndexMap.get().put(groupId, (index + 1) % len);
+    }
+  }
+
   /**
    * Get peer id to send request. If groupLeaderCache has the group id, then return leader id of the
    * group.Otherwise, random get a peer of the group.