You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ea...@apache.org on 2019/05/28 02:18:09 UTC
[incubator-iotdb] branch cluster updated: fix bug
This is an automated email from the ASF dual-hosted git repository.
east pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster by this push:
new 1af785f fix bug
1af785f is described below
commit 1af785fbda9e6c87ebc6eb2e0f4bdb8a4a9767d1
Author: mdf369 <95...@qq.com>
AuthorDate: Tue May 28 10:17:48 2019 +0800
fix bug
---
.../cluster/qp/executor/AbstractQPExecutor.java | 32 ++++++++++++++++-----
.../org/apache/iotdb/cluster/utils/RaftUtils.java | 33 ++++++++++++++++++++++
2 files changed, 58 insertions(+), 7 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
index 033bfca..21c7786 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.cluster.qp.executor;
import com.alipay.sofa.jraft.entity.PeerId;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
@@ -90,15 +92,17 @@ public abstract class AbstractQPExecutor {
* @param taskRetryNum Number of QPTask retries due to timeout and redirected.
* @return basic response
*/
- protected BasicResponse syncHandleSingleTaskGetRes(SingleQPTask task, int taskRetryNum, String taskInfo, String groupId)
+ private BasicResponse syncHandleSingleTaskGetRes(SingleQPTask task, int taskRetryNum, String taskInfo, String groupId, Set<PeerId> downNodeSet)
throws InterruptedException, RaftConnectionException {
PeerId firstNode = task.getTargetNode();
+ RaftUtils.updatePeerIDOrder(firstNode, groupId);
BasicResponse response = null;
try {
asyncSendSingleTask(task, taskRetryNum);
- response = syncGetSingleTaskRes(task, taskRetryNum, taskInfo, groupId);
+ response = syncGetSingleTaskRes(task, taskRetryNum, taskInfo, groupId, downNodeSet);
} catch (RaftConnectionException ex) {
boolean success = false;
+ downNodeSet.add(firstNode);
while (!success) {
PeerId nextNode = null;
try {
@@ -113,11 +117,12 @@ public abstract class AbstractQPExecutor {
task.setTargetNode(nextNode);
task.setTaskState(TaskState.INITIAL);
asyncSendSingleTask(task, taskRetryNum);
- response = syncGetSingleTaskRes(task, taskRetryNum, taskInfo, groupId);
+ response = syncGetSingleTaskRes(task, taskRetryNum, taskInfo, groupId, downNodeSet);
LOGGER.debug("{} task for group {} to node {} succeed.", taskInfo, groupId, nextNode);
success = true;
} catch (RaftConnectionException e1) {
LOGGER.debug("{} task for group {} to node {} fail.", taskInfo, groupId, nextNode);
+ downNodeSet.add(nextNode);
}
}
LOGGER.debug("The final result for {} task is {}", taskInfo, success);
@@ -128,6 +133,11 @@ public abstract class AbstractQPExecutor {
return response;
}
+ protected BasicResponse syncHandleSingleTaskGetRes(SingleQPTask task, int taskRetryNum, String taskInfo, String groupId)
+ throws RaftConnectionException, InterruptedException {
+ return syncHandleSingleTaskGetRes(task, taskRetryNum, taskInfo, groupId, new HashSet<>());
+ }
+
/**
* Asynchronous send rpc task via client
* @param task rpc task
@@ -149,7 +159,7 @@ public abstract class AbstractQPExecutor {
* @param task rpc task
* @param taskRetryNum Retry time of the task
*/
- private BasicResponse syncGetSingleTaskRes(SingleQPTask task, int taskRetryNum, String taskInfo, String groupId)
+ private BasicResponse syncGetSingleTaskRes(SingleQPTask task, int taskRetryNum, String taskInfo, String groupId, Set<PeerId> downNodeSet)
throws InterruptedException, RaftConnectionException {
task.await();
PeerId leader;
@@ -160,8 +170,16 @@ public abstract class AbstractQPExecutor {
} else if (task.getTaskState() == TaskState.REDIRECT) {
// redirect to the right leader
leader = PeerId.parsePeer(task.getResponse().getLeaderStr());
- LOGGER.debug("Redirect leader: {}, group id = {}", leader, task.getRequest().getGroupID());
- RaftUtils.updateRaftGroupLeader(task.getRequest().getGroupID(), leader);
+
+ if (downNodeSet.contains(leader)) {
+ LOGGER.debug("Redirect leader {} is down, group {} might be down.", leader, groupId);
+ throw new RaftConnectionException(
+ String.format("Can not connect to leader of remote node : %s", task.getTargetNode()));
+ } else {
+ LOGGER
+ .debug("Redirect leader: {}, group id = {}", leader, task.getRequest().getGroupID());
+ RaftUtils.updateRaftGroupLeader(task.getRequest().getGroupID(), leader);
+ }
} else {
RaftUtils.removeCachedRaftGroupLeader(groupId);
LOGGER.debug("Remove cached raft group leader of {}", groupId);
@@ -169,7 +187,7 @@ public abstract class AbstractQPExecutor {
}
task.setTargetNode(leader);
task.resetTask();
- return syncHandleSingleTaskGetRes(task, taskRetryNum + 1, taskInfo, groupId);
+ return syncHandleSingleTaskGetRes(task, taskRetryNum + 1, taskInfo, groupId, downNodeSet);
}
return task.getResponse();
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index f9901b8..44b8747 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -134,9 +134,42 @@ public class RaftUtils {
}
nodeIndexMap.get().put(groupId, (index + 1) % len);
+ LOGGER.debug("Get node {} for group {}", peerId, groupId);
+
return peerId;
}
+ public static void updatePeerIDOrder(PeerId peerId, String groupId) {
+ int index = -1;
+ int len;
+ if (groupId.equals(ClusterConfig.METADATA_GROUP_ID)) {
+ RaftService service = (RaftService) server.getMetadataHolder().getService();
+ List<PeerId> peerIdList = service.getPeerIdList();
+ len = peerIdList.size();
+ index = peerIdList.indexOf(peerId);
+ } else {
+ PhysicalNode[] physicalNodes = router.getNodesByGroupId(groupId);
+ len = physicalNodes.length;
+ PhysicalNode node = getPhysicalNodeFrom(peerId);
+ for (int i = 0; i < physicalNodes.length; i++) {
+ if (physicalNodes[i].equals(node)) {
+ index = i;
+ break;
+ }
+ }
+ }
+
+ if (index == -1) {
+ LOGGER.warn(
+ "Fail to update order of node {} for group {}, because the group doesn't contain it.",
+ peerId, groupId);
+ } else {
+ LOGGER.debug("Update order of node {} for group {}, current index is {}", peerId, groupId,
+ index);
+ nodeIndexMap.get().put(groupId, (index + 1) % len);
+ }
+ }
+
/**
* Get peer id to send request. If groupLeaderCache has the group id, then return leader id of the
* group.Otherwise, random get a peer of the group.