You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ea...@apache.org on 2019/05/20 08:35:16 UTC

[incubator-iotdb] branch cluster updated: improve robustness of query metadata

This is an automated email from the ASF dual-hosted git repository.

east pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster by this push:
     new 618cc39  improve robustness of query metadata
618cc39 is described below

commit 618cc392dc4ac0881d73b9b4620fce06660391ec
Author: mdf369 <95...@qq.com>
AuthorDate: Mon May 20 16:34:57 2019 +0800

    improve robustness of query metadata
---
 .../cluster/qp/executor/QueryMetadataExecutor.java | 148 +++++++++++++++++++--
 .../org/apache/iotdb/cluster/utils/RaftUtils.java  |  38 ++++++
 2 files changed, 172 insertions(+), 14 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 60117aa..9a3f61d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
 import org.apache.iotdb.cluster.qp.task.SingleQPTask;
 import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterConstant;
@@ -123,7 +124,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
    *
    * @param groupId data group id
    */
-  private void handleTimseriesQuery(String groupId, List<String> pathList, List<List<String>> res)
+  private void  handleTimseriesQuery(String groupId, List<String> pathList, List<List<String>> res)
       throws ProcessorException, InterruptedException {
     QueryTimeSeriesRequest request = new QueryTimeSeriesRequest(groupId,
         getReadMetadataConsistencyLevel(), pathList);
@@ -136,12 +137,35 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
       LOGGER.debug("Execute show timeseries {} statement locally for group {} by sending request to local node.", pathList, groupId);
       holder = this.server.getServerId();
     } else {
-      holder = RaftUtils.getRandomPeerID(groupId);
+      holder = RaftUtils.getPeerIDInOrder(groupId);
     }
     try {
+      LOGGER.debug("Send show timeseries {} task for group {} to node {}.", pathList, groupId, holder);
       res.addAll(queryTimeSeries(task, holder));
     } catch (RaftConnectionException e) {
-      throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+      boolean success = false;
+      while (!success) {
+        PeerId nextNode = null;
+        try {
+          nextNode = RaftUtils.getPeerIDInOrder(groupId);
+          if (holder.equals(nextNode)) {
+            break;
+          }
+          LOGGER.debug("Previous task fail, then send show timeseries {} task for group {} to node {}.", pathList, groupId, nextNode);
+          task.resetTask();
+          task.setTaskState(TaskState.INITIAL);
+          res.addAll(queryTimeSeries(task, nextNode));
+          LOGGER.debug("Show timeseries {} task for group {} to node {} succeed.", pathList, groupId, nextNode);
+          success = true;
+        } catch (RaftConnectionException e1) {
+          LOGGER.debug("Show timeseries {} task for group {} to node {} fail.", pathList, groupId, nextNode);
+          continue;
+        }
+      }
+      LOGGER.debug("The final result for show timeseries {} task is {}", pathList, success);
+      if (!success) {
+        throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+      }
     }
   }
 
@@ -164,12 +188,35 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
         LOGGER.debug("Execute show metadata in string statement locally for group {} by sending request to local node.", groupId);
         holder = this.server.getServerId();
       } else {
-        holder = RaftUtils.getRandomPeerID(groupId);
+        holder = RaftUtils.getPeerIDInOrder(groupId);
       }
       try {
+        LOGGER.debug("Send show metadata in string task for group {} to node {}.", groupId, holder);
         asyncSendNonQuerySingleTask(task, holder, 0);
       } catch (RaftConnectionException e) {
-        throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+        boolean success = false;
+        while (!success) {
+          PeerId nextNode = null;
+          try {
+            nextNode = RaftUtils.getPeerIDInOrder(groupId);
+            if (holder.equals(nextNode)) {
+              break;
+            }
+            LOGGER.debug("Previous task fail, then send show metadata in string task for group {} to node {}.", groupId, nextNode);
+            task.resetTask();
+            task.setTaskState(TaskState.INITIAL);
+            asyncSendNonQuerySingleTask(task, nextNode, 0);
+            LOGGER.debug("Show metadata in string task for group {} to node {} succeed.", groupId, nextNode);
+            success = true;
+          } catch (RaftConnectionException e1) {
+            LOGGER.debug("Show metadata in string task for group {} to node {} fail.", groupId, nextNode);
+            continue;
+          }
+        }
+        LOGGER.debug("The final result for show metadata in string task is {}", success);
+        if (!success) {
+          throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+        }
       }
     }
     for (int i = 0; i < taskList.size(); i++) {
@@ -177,7 +224,11 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
       task.await();
       BasicResponse response = task.getResponse();
       if (response == null || !response.isSuccess()) {
-        throw new ProcessorException();
+        String errorMessage = "response is null";
+        if (response != null && response.getErrorMsg() != null) {
+          errorMessage = response.getErrorMsg();
+        }
+        throw new ProcessorException("Execute show metadata in string statement fail because " + errorMessage);
       }
       metadataList.add(((QueryMetadataInStringResponse)response).getMetadata());
     }
@@ -203,12 +254,35 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
         LOGGER.debug("Execute query metadata statement locally for group {} by sending request to local node.", groupId);
         holder = this.server.getServerId();
       } else {
-        holder = RaftUtils.getRandomPeerID(groupId);
+        holder = RaftUtils.getPeerIDInOrder(groupId);
       }
       try {
+        LOGGER.debug("Send query metadata task for group {} to node {}.", groupId, holder);
         asyncSendNonQuerySingleTask(task, holder, 0);
       } catch (RaftConnectionException e) {
-        throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+        boolean success = false;
+        while (!success) {
+          PeerId nextNode = null;
+          try {
+            nextNode = RaftUtils.getPeerIDInOrder(groupId);
+            if (holder.equals(nextNode)) {
+              break;
+            }
+            LOGGER.debug("Previous task fail, then send query metadata task for group {} to node {}.", groupId, nextNode);
+            task.resetTask();
+            task.setTaskState(TaskState.INITIAL);
+            asyncSendNonQuerySingleTask(task, nextNode, 0);
+            LOGGER.debug("Query metadata task for group {} to node {} succeed.", groupId, nextNode);
+            success = true;
+          } catch (RaftConnectionException e1) {
+            LOGGER.debug("Query metadata task for group {} to node {} fail.", groupId, nextNode);
+            continue;
+          }
+        }
+        LOGGER.debug("The final result for query metadata task is {}", success);
+        if (!success) {
+          throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+        }
       }
     }
     for (int i = 0; i < taskList.size(); i++) {
@@ -220,7 +294,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
         if (response != null && response.getErrorMsg() != null) {
           errorMessage = response.getErrorMsg();
         }
-        throw new ProcessorException("Execute query metadata statement false because " + errorMessage);
+        throw new ProcessorException("Execute query metadata statement fail because " + errorMessage);
       }
       metadatas[i] = ((QueryMetadataResponse)response).getMetadata();
     }
@@ -229,7 +303,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
 
   public TSDataType processSeriesTypeQuery(String path)
       throws InterruptedException, ProcessorException, PathErrorException {
-    TSDataType dataType;
+    TSDataType dataType = null;
     List<String> storageGroupList = mManager.getAllFileNamesByPath(path);
     if (storageGroupList.size() != 1) {
       throw new PathErrorException("path " + path + " is not valid.");
@@ -246,12 +320,35 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
         LOGGER.debug("Execute get series type for {} statement locally for group {} by sending request to local node.", path, groupId);
         holder = this.server.getServerId();
       } else {
-        holder = RaftUtils.getRandomPeerID(groupId);
+        holder = RaftUtils.getPeerIDInOrder(groupId);
       }
       try {
+        LOGGER.debug("Send get series type for {} task for group {} to node {}.", path, groupId, holder);
         dataType = querySeriesType(task, holder);
       } catch (RaftConnectionException e) {
-        throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+        boolean success = false;
+        while (!success) {
+          PeerId nextNode = null;
+          try {
+            nextNode = RaftUtils.getPeerIDInOrder(groupId);
+            if (holder.equals(nextNode)) {
+              break;
+            }
+            LOGGER.debug("Previous task fail, then send get series type for {} task for group {} to node {}.", path, groupId, nextNode);
+            task.resetTask();
+            task.setTaskState(TaskState.INITIAL);
+            dataType = querySeriesType(task, nextNode);
+            LOGGER.debug("Get series type for {} task for group {} to node {} succeed.", path, groupId, nextNode);
+            success = true;
+          } catch (RaftConnectionException e1) {
+            LOGGER.debug("Get series type for {} task for group {} to node {} fail.", path, groupId, nextNode);
+            continue;
+          }
+        }
+        LOGGER.debug("The final result for get series type for {} task is {}", path, success);
+        if (!success) {
+          throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+        }
       }
     }
     return dataType;
@@ -295,12 +392,35 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
       LOGGER.debug("Execute get paths for {} statement locally for group {} by sending request to local node.", pathList, groupId);
       holder = this.server.getServerId();
     } else {
-      holder = RaftUtils.getRandomPeerID(groupId);
+      holder = RaftUtils.getPeerIDInOrder(groupId);
     }
     try {
+      LOGGER.debug("Send get paths for {} task for group {} to node {}.", pathList, groupId, holder);
       res.addAll(queryPaths(task, holder));
     } catch (RaftConnectionException e) {
-      throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+      boolean success = false;
+      while (!success) {
+        PeerId nextNode = null;
+        try {
+          nextNode = RaftUtils.getPeerIDInOrder(groupId);
+          if (holder.equals(nextNode)) {
+            break;
+          }
+          LOGGER.debug("Previous task fail, then send get paths for {} task for group {} to node {}.", pathList, groupId, nextNode);
+          task.resetTask();
+          task.setTaskState(TaskState.INITIAL);
+          res.addAll(queryPaths(task, nextNode));
+          LOGGER.debug("Get paths for {} task for group {} to node {} succeed.", pathList, groupId, nextNode);
+          success = true;
+        } catch (RaftConnectionException e1) {
+          LOGGER.debug("Get paths for {} task for group {} to node {} fail.", pathList, groupId, nextNode);
+          continue;
+        }
+      }
+      LOGGER.debug("The final result for get paths for {} task is {}", pathList, success);
+      if (!success) {
+        throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+      }
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index f1cb1fa..5f33652 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -98,6 +98,18 @@ public class RaftUtils {
    */
   private static final ConcurrentHashMap<String, PeerId> groupLeaderCache = new ConcurrentHashMap<>();
 
+  private static ThreadLocal<Map<String, Integer>> nodeIndexMap = new ThreadLocal<Map<String, Integer>>() {
+    @Override
+    protected Map<String, Integer> initialValue() {
+      Map<String, Integer> map = new HashMap<>();
+      router.getAllGroupId().forEach(groupId -> {
+        PhysicalNode[] physicalNodes = router.getNodesByGroupId(groupId);
+        map.put(groupId, getRandomInt(physicalNodes.length));
+      });
+      return map;
+    }
+  };
+
   private RaftUtils() {
   }
 
@@ -118,6 +130,32 @@ public class RaftUtils {
   }
 
   /**
+   * Get peer ID in order
+   *
+   * @return node id
+   */
+  public static PeerId getPeerIDInOrder(String groupId) {
+    int index;
+    PeerId peerId;
+    int len;
+    if (groupId.equals(ClusterConfig.METADATA_GROUP_ID)) {
+      RaftService service = (RaftService) server.getMetadataHolder().getService();
+      List<PeerId> peerIdList = service.getPeerIdList();
+      len = peerIdList.size();
+      index = nodeIndexMap.get().getOrDefault(groupId, getRandomInt(peerIdList.size()));
+      peerId = peerIdList.get(index);
+    } else {
+      PhysicalNode[] physicalNodes = router.getNodesByGroupId(groupId);
+      len = physicalNodes.length;
+      index = nodeIndexMap.get().getOrDefault(groupId, getRandomInt(physicalNodes.length));
+      peerId = getPeerIDFrom(physicalNodes[index]);
+    }
+    nodeIndexMap.get().put(groupId, (index + 1) % len);
+
+    return peerId;
+  }
+
+  /**
    * Get peer id to send request. If groupLeaderCache has the group id, then return leader id of the
    * group.Otherwise, random get a peer of the group.
    *