You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ea...@apache.org on 2019/05/20 08:35:16 UTC
[incubator-iotdb] branch cluster updated: improve robustness of
query metadata
This is an automated email from the ASF dual-hosted git repository.
east pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster by this push:
new 618cc39 improve robustness of query metadata
618cc39 is described below
commit 618cc392dc4ac0881d73b9b4620fce06660391ec
Author: mdf369 <95...@qq.com>
AuthorDate: Mon May 20 16:34:57 2019 +0800
improve robustness of query metadata
---
.../cluster/qp/executor/QueryMetadataExecutor.java | 148 +++++++++++++++++++--
.../org/apache/iotdb/cluster/utils/RaftUtils.java | 38 ++++++
2 files changed, 172 insertions(+), 14 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 60117aa..9a3f61d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
import org.apache.iotdb.cluster.qp.task.SingleQPTask;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterConstant;
@@ -123,7 +124,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
*
* @param groupId data group id
*/
- private void handleTimseriesQuery(String groupId, List<String> pathList, List<List<String>> res)
+ private void handleTimseriesQuery(String groupId, List<String> pathList, List<List<String>> res)
throws ProcessorException, InterruptedException {
QueryTimeSeriesRequest request = new QueryTimeSeriesRequest(groupId,
getReadMetadataConsistencyLevel(), pathList);
@@ -136,12 +137,35 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
LOGGER.debug("Execute show timeseries {} statement locally for group {} by sending request to local node.", pathList, groupId);
holder = this.server.getServerId();
} else {
- holder = RaftUtils.getRandomPeerID(groupId);
+ holder = RaftUtils.getPeerIDInOrder(groupId);
}
try {
+ LOGGER.debug("Send show timeseries {} task for group {} to node {}.", pathList, groupId, holder);
res.addAll(queryTimeSeries(task, holder));
} catch (RaftConnectionException e) {
- throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+ boolean success = false;
+ while (!success) {
+ PeerId nextNode = null;
+ try {
+ nextNode = RaftUtils.getPeerIDInOrder(groupId);
+ if (holder.equals(nextNode)) {
+ break;
+ }
+ LOGGER.debug("Previous task fail, then send show timeseries {} task for group {} to node {}.", pathList, groupId, nextNode);
+ task.resetTask();
+ task.setTaskState(TaskState.INITIAL);
+ res.addAll(queryTimeSeries(task, nextNode));
+ LOGGER.debug("Show timeseries {} task for group {} to node {} succeed.", pathList, groupId, nextNode);
+ success = true;
+ } catch (RaftConnectionException e1) {
+ LOGGER.debug("Show timeseries {} task for group {} to node {} fail.", pathList, groupId, nextNode);
+ continue;
+ }
+ }
+ LOGGER.debug("The final result for show timeseries {} task is {}", pathList, success);
+ if (!success) {
+ throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+ }
}
}
@@ -164,12 +188,35 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
LOGGER.debug("Execute show metadata in string statement locally for group {} by sending request to local node.", groupId);
holder = this.server.getServerId();
} else {
- holder = RaftUtils.getRandomPeerID(groupId);
+ holder = RaftUtils.getPeerIDInOrder(groupId);
}
try {
+ LOGGER.debug("Send show metadata in string task for group {} to node {}.", groupId, holder);
asyncSendNonQuerySingleTask(task, holder, 0);
} catch (RaftConnectionException e) {
- throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+ boolean success = false;
+ while (!success) {
+ PeerId nextNode = null;
+ try {
+ nextNode = RaftUtils.getPeerIDInOrder(groupId);
+ if (holder.equals(nextNode)) {
+ break;
+ }
+ LOGGER.debug("Previous task fail, then send show metadata in string task for group {} to node {}.", groupId, nextNode);
+ task.resetTask();
+ task.setTaskState(TaskState.INITIAL);
+ asyncSendNonQuerySingleTask(task, nextNode, 0);
+ LOGGER.debug("Show metadata in string task for group {} to node {} succeed.", groupId, nextNode);
+ success = true;
+ } catch (RaftConnectionException e1) {
+ LOGGER.debug("Show metadata in string task for group {} to node {} fail.", groupId, nextNode);
+ continue;
+ }
+ }
+ LOGGER.debug("The final result for show metadata in string task is {}", success);
+ if (!success) {
+ throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+ }
}
}
for (int i = 0; i < taskList.size(); i++) {
@@ -177,7 +224,11 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
task.await();
BasicResponse response = task.getResponse();
if (response == null || !response.isSuccess()) {
- throw new ProcessorException();
+ String errorMessage = "response is null";
+ if (response != null && response.getErrorMsg() != null) {
+ errorMessage = response.getErrorMsg();
+ }
+ throw new ProcessorException("Execute show metadata in string statement fail because " + errorMessage);
}
metadataList.add(((QueryMetadataInStringResponse)response).getMetadata());
}
@@ -203,12 +254,35 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
LOGGER.debug("Execute query metadata statement locally for group {} by sending request to local node.", groupId);
holder = this.server.getServerId();
} else {
- holder = RaftUtils.getRandomPeerID(groupId);
+ holder = RaftUtils.getPeerIDInOrder(groupId);
}
try {
+ LOGGER.debug("Send query metadata task for group {} to node {}.", groupId, holder);
asyncSendNonQuerySingleTask(task, holder, 0);
} catch (RaftConnectionException e) {
- throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+ boolean success = false;
+ while (!success) {
+ PeerId nextNode = null;
+ try {
+ nextNode = RaftUtils.getPeerIDInOrder(groupId);
+ if (holder.equals(nextNode)) {
+ break;
+ }
+ LOGGER.debug("Previous task fail, then send query metadata task for group {} to node {}.", groupId, nextNode);
+ task.resetTask();
+ task.setTaskState(TaskState.INITIAL);
+ asyncSendNonQuerySingleTask(task, nextNode, 0);
+ LOGGER.debug("Query metadata task for group {} to node {} succeed.", groupId, nextNode);
+ success = true;
+ } catch (RaftConnectionException e1) {
+ LOGGER.debug("Query metadata task for group {} to node {} fail.", groupId, nextNode);
+ continue;
+ }
+ }
+ LOGGER.debug("The final result for query metadata task is {}", success);
+ if (!success) {
+ throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+ }
}
}
for (int i = 0; i < taskList.size(); i++) {
@@ -220,7 +294,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
if (response != null && response.getErrorMsg() != null) {
errorMessage = response.getErrorMsg();
}
- throw new ProcessorException("Execute query metadata statement false because " + errorMessage);
+ throw new ProcessorException("Execute query metadata statement fail because " + errorMessage);
}
metadatas[i] = ((QueryMetadataResponse)response).getMetadata();
}
@@ -229,7 +303,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
public TSDataType processSeriesTypeQuery(String path)
throws InterruptedException, ProcessorException, PathErrorException {
- TSDataType dataType;
+ TSDataType dataType = null;
List<String> storageGroupList = mManager.getAllFileNamesByPath(path);
if (storageGroupList.size() != 1) {
throw new PathErrorException("path " + path + " is not valid.");
@@ -246,12 +320,35 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
LOGGER.debug("Execute get series type for {} statement locally for group {} by sending request to local node.", path, groupId);
holder = this.server.getServerId();
} else {
- holder = RaftUtils.getRandomPeerID(groupId);
+ holder = RaftUtils.getPeerIDInOrder(groupId);
}
try {
+ LOGGER.debug("Send get series type for {} task for group {} to node {}.", path, groupId, holder);
dataType = querySeriesType(task, holder);
} catch (RaftConnectionException e) {
- throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+ boolean success = false;
+ while (!success) {
+ PeerId nextNode = null;
+ try {
+ nextNode = RaftUtils.getPeerIDInOrder(groupId);
+ if (holder.equals(nextNode)) {
+ break;
+ }
+ LOGGER.debug("Previous task fail, then send get series type for {} task for group {} to node {}.", path, groupId, nextNode);
+ task.resetTask();
+ task.setTaskState(TaskState.INITIAL);
+ dataType = querySeriesType(task, nextNode);
+ LOGGER.debug("Get series type for {} task for group {} to node {} succeed.", path, groupId, nextNode);
+ success = true;
+ } catch (RaftConnectionException e1) {
+ LOGGER.debug("Get series type for {} task for group {} to node {} fail.", path, groupId, nextNode);
+ continue;
+ }
+ }
+ LOGGER.debug("The final result for get series type for {} task is {}", path, success);
+ if (!success) {
+ throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+ }
}
}
return dataType;
@@ -295,12 +392,35 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
LOGGER.debug("Execute get paths for {} statement locally for group {} by sending request to local node.", pathList, groupId);
holder = this.server.getServerId();
} else {
- holder = RaftUtils.getRandomPeerID(groupId);
+ holder = RaftUtils.getPeerIDInOrder(groupId);
}
try {
+ LOGGER.debug("Send get paths for {} task for group {} to node {}.", pathList, groupId, holder);
res.addAll(queryPaths(task, holder));
} catch (RaftConnectionException e) {
- throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+ boolean success = false;
+ while (!success) {
+ PeerId nextNode = null;
+ try {
+ nextNode = RaftUtils.getPeerIDInOrder(groupId);
+ if (holder.equals(nextNode)) {
+ break;
+ }
+ LOGGER.debug("Previous task fail, then send get paths for {} task for group {} to node {}.", pathList, groupId, nextNode);
+ task.resetTask();
+ task.setTaskState(TaskState.INITIAL);
+ res.addAll(queryPaths(task, nextNode));
+ LOGGER.debug("Get paths for {} task for group {} to node {} succeed.", pathList, groupId, nextNode);
+ success = true;
+ } catch (RaftConnectionException e1) {
+ LOGGER.debug("Get paths for {} task for group {} to node {} fail.", pathList, groupId, nextNode);
+ continue;
+ }
+ }
+ LOGGER.debug("The final result for get paths for {} task is {}", pathList, success);
+ if (!success) {
+ throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+ }
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index f1cb1fa..5f33652 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -98,6 +98,18 @@ public class RaftUtils {
*/
private static final ConcurrentHashMap<String, PeerId> groupLeaderCache = new ConcurrentHashMap<>();
+ private static ThreadLocal<Map<String, Integer>> nodeIndexMap = new ThreadLocal<Map<String, Integer>>() {
+ @Override
+ protected Map<String, Integer> initialValue() {
+ Map<String, Integer> map = new HashMap<>();
+ router.getAllGroupId().forEach(groupId -> {
+ PhysicalNode[] physicalNodes = router.getNodesByGroupId(groupId);
+ map.put(groupId, getRandomInt(physicalNodes.length));
+ });
+ return map;
+ }
+ };
+
private RaftUtils() {
}
@@ -118,6 +130,32 @@ public class RaftUtils {
}
/**
+ * Get peer ID in order
+ *
+ * @return node id
+ */
+ public static PeerId getPeerIDInOrder(String groupId) {
+ int index;
+ PeerId peerId;
+ int len;
+ if (groupId.equals(ClusterConfig.METADATA_GROUP_ID)) {
+ RaftService service = (RaftService) server.getMetadataHolder().getService();
+ List<PeerId> peerIdList = service.getPeerIdList();
+ len = peerIdList.size();
+ index = nodeIndexMap.get().getOrDefault(groupId, getRandomInt(peerIdList.size()));
+ peerId = peerIdList.get(index);
+ } else {
+ PhysicalNode[] physicalNodes = router.getNodesByGroupId(groupId);
+ len = physicalNodes.length;
+ index = nodeIndexMap.get().getOrDefault(groupId, getRandomInt(physicalNodes.length));
+ peerId = getPeerIDFrom(physicalNodes[index]);
+ }
+ nodeIndexMap.get().put(groupId, (index + 1) % len);
+
+ return peerId;
+ }
+
+ /**
* Get peer id to send request. If groupLeaderCache has the group id, then return leader id of the
* group.Otherwise, random get a peer of the group.
*