You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ea...@apache.org on 2019/05/27 08:47:27 UTC
[incubator-iotdb] branch cluster updated: update logic of trying
all nodes based on update of NodeAsClient
This is an automated email from the ASF dual-hosted git repository.
east pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster by this push:
new 4916e26 update logic of trying all nodes based on update of NodeAsClient
4916e26 is described below
commit 4916e26deca556bc18a4eb6e6c7386716ab30880
Author: mdf369 <95...@qq.com>
AuthorDate: Mon May 27 16:47:12 2019 +0800
update logic of trying all nodes based on update of NodeAsClient
---
.../cluster/qp/executor/AbstractQPExecutor.java | 57 ++++++-
.../cluster/qp/executor/NonQueryExecutor.java | 51 +-----
.../cluster/qp/executor/QueryMetadataExecutor.java | 187 +++++----------------
.../apache/iotdb/cluster/qp/task/BatchQPTask.java | 40 ++++-
.../cluster/service/TSServiceClusterImpl.java | 4 +-
5 files changed, 131 insertions(+), 208 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
index faacfff..9b2da09 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
@@ -90,10 +90,42 @@ public abstract class AbstractQPExecutor {
* @param taskRetryNum Number of QPTask retries due to timeout and redirected.
* @return basic response
*/
- protected BasicResponse syncHandleNonQuerySingleTaskGetRes(SingleQPTask task, int taskRetryNum)
+ protected BasicResponse syncHandleSingleTaskGetRes(SingleQPTask task, int taskRetryNum, String taskInfo, String groupId)
throws InterruptedException, RaftConnectionException {
- asyncSendNonQuerySingleTask(task, taskRetryNum);
- return syncGetNonQueryRes(task, taskRetryNum);
+ PeerId firstNode = task.getTargetNode();
+ BasicResponse response = null;
+ try {
+ asyncSendSingleTask(task, taskRetryNum);
+ response = syncGetSingleTaskRes(task, taskRetryNum, taskInfo, groupId);
+ } catch (RaftConnectionException ex) {
+ boolean success = false;
+ while (!success) {
+ PeerId nextNode = null;
+ try {
+ nextNode = RaftUtils.getPeerIDInOrder(groupId);
+ if (firstNode.equals(nextNode)) {
+ break;
+ }
+ LOGGER.debug(
+ "Previous task fail, then send {} task for group {} to node {}.", taskInfo, groupId,
+ nextNode);
+ task.resetTask();
+ task.setTargetNode(nextNode);
+ task.setTaskState(TaskState.INITIAL);
+ asyncSendSingleTask(task, taskRetryNum);
+ response = syncGetSingleTaskRes(task, taskRetryNum, taskInfo, groupId);
+ LOGGER.debug("{} task for group {} to node {} succeed.", taskInfo, groupId, nextNode);
+ success = true;
+ } catch (RaftConnectionException e1) {
+ LOGGER.debug("{} task for group {} to node {} fail.", taskInfo, groupId, nextNode);
+ }
+ }
+ LOGGER.debug("The final result for {} task is {}", taskInfo, success);
+ if (!success) {
+ throw ex;
+ }
+ }
+ return response;
}
/**
@@ -101,7 +133,7 @@ public abstract class AbstractQPExecutor {
* @param task rpc task
* @param taskRetryNum Retry time of the task
*/
- protected void asyncSendNonQuerySingleTask(SingleQPTask task, int taskRetryNum)
+ protected void asyncSendSingleTask(SingleQPTask task, int taskRetryNum)
throws RaftConnectionException {
if (taskRetryNum >= TASK_MAX_RETRY) {
throw new RaftConnectionException(String.format("QPTask retries reach the upper bound %s",
@@ -117,7 +149,7 @@ public abstract class AbstractQPExecutor {
* @param task rpc task
* @param taskRetryNum Retry time of the task
*/
- private BasicResponse syncGetNonQueryRes(SingleQPTask task, int taskRetryNum)
+ private BasicResponse syncGetSingleTaskRes(SingleQPTask task, int taskRetryNum, String taskInfo, String groupId)
throws InterruptedException, RaftConnectionException {
task.await();
PeerId leader;
@@ -131,14 +163,13 @@ public abstract class AbstractQPExecutor {
LOGGER.debug("Redirect leader: {}, group id = {}", leader, task.getRequest().getGroupID());
RaftUtils.updateRaftGroupLeader(task.getRequest().getGroupID(), leader);
} else {
- String groupId = task.getRequest().getGroupID();
RaftUtils.removeCachedRaftGroupLeader(groupId);
LOGGER.debug("Remove cached raft group leader of {}", groupId);
leader = RaftUtils.getLocalLeaderPeerID(groupId);
}
task.setTargetNode(leader);
task.resetTask();
- return syncHandleNonQuerySingleTaskGetRes(task, taskRetryNum + 1);
+ return syncHandleSingleTaskGetRes(task, taskRetryNum + 1, taskInfo, groupId);
}
return task.getResponse();
}
@@ -174,4 +205,16 @@ public abstract class AbstractQPExecutor {
checkInitConsistencyLevel();
return readDataConsistencyLevel.get();
}
+
+ /**
+ * Async handle task by SingleQPTask and leader id.
+ *
+ * @param task request SingleQPTask
+ * @return request result
+ */
+ public boolean syncHandleSingleTask(SingleQPTask task, String taskInfo, String groupId)
+ throws RaftConnectionException, InterruptedException {
+ BasicResponse response = syncHandleSingleTaskGetRes(task, 0, taskInfo, groupId);
+ return response != null && response.isSuccess();
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index 7ba9ef7..b7dd242 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.cluster.entity.raft.RaftService;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
import org.apache.iotdb.cluster.qp.task.BatchQPTask;
import org.apache.iotdb.cluster.qp.task.QPTask;
-import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
import org.apache.iotdb.cluster.qp.task.SingleQPTask;
import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
import org.apache.iotdb.cluster.rpc.raft.request.nonquery.DataGroupNonQueryRequest;
@@ -327,43 +326,8 @@ public class NonQueryExecutor extends AbstractQPExecutor {
return handleNonQueryRequestLocally(groupId, qpTask);
} else {
PeerId leader = RaftUtils.getLocalLeaderPeerID(groupId);
- boolean res = false;
qpTask.setTargetNode(leader);
- try {
- res = syncHandleNonQueryTask(qpTask);
- } catch (RaftConnectionException ex) {
- boolean success = false;
- PeerId nextNode = RaftUtils.getPeerIDInOrder(groupId);
- PeerId firstNode = nextNode;
- boolean first = true;
- while (!success) {
- try {
- if (!first) {
- nextNode = RaftUtils.getPeerIDInOrder(groupId);
- if (firstNode.equals(nextNode)) {
- break;
- }
- }
- first = false;
- LOGGER.debug("Previous task fail, then send non-query task for group {} to node {}.", groupId, nextNode);
- qpTask.resetTask();
- qpTask.setTargetNode(nextNode);
- qpTask.setTaskState(TaskState.INITIAL);
- currentTask.set(qpTask);
- res = syncHandleNonQueryTask(qpTask);
- LOGGER.debug("Non-query task for group {} to node {} succeed.", groupId, nextNode);
- success = true;
- RaftUtils.updateRaftGroupLeader(groupId, nextNode);
- } catch (RaftConnectionException e1) {
- LOGGER.debug("Non-query task for group {} to node {} fail.", groupId, nextNode);
- }
- }
- LOGGER.debug("The final result for non-query task is {}", success);
- if (!success) {
- throw ex;
- }
- }
- return res;
+ return syncHandleSingleTask(qpTask, "non-query", groupId);
}
}
@@ -387,17 +351,4 @@ public class NonQueryExecutor extends AbstractQPExecutor {
/** Apply qpTask to Raft Node **/
return RaftUtils.executeRaftTaskForLocalProcessor(service, qpTask, response);
}
-
- /**
- * Async handle task by QPTask and leader id.
- *
- * @param task request QPTask
- * @return request result
- */
- public boolean syncHandleNonQueryTask(SingleQPTask task)
- throws RaftConnectionException, InterruptedException {
- BasicResponse response = syncHandleNonQuerySingleTaskGetRes(task, 0);
- return response != null && response.isSuccess();
- }
-
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 625269e..a2528c6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -22,6 +22,7 @@ import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import com.alipay.sofa.jraft.entity.PeerId;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -31,6 +32,7 @@ import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
import org.apache.iotdb.cluster.entity.raft.RaftService;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
+import org.apache.iotdb.cluster.qp.task.BatchQPTask;
import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
import org.apache.iotdb.cluster.qp.task.SingleQPTask;
import org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QueryMetadataInStringRequest;
@@ -46,6 +48,7 @@ import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QueryPathsRespon
import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QuerySeriesTypeResponse;
import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QueryStorageGroupResponse;
import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QueryTimeSeriesResponse;
+import org.apache.iotdb.cluster.service.TSServiceClusterImpl.BatchResult;
import org.apache.iotdb.cluster.utils.QPExecutorUtils;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.db.exception.PathErrorException;
@@ -145,50 +148,34 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
try {
LOGGER.debug("Send show timeseries {} task for group {} to node {}.", pathList, groupId,
holder);
- res.addAll(queryTimeSeries(task));
+ res.addAll(queryTimeSeries(task, pathList, groupId));
} catch (RaftConnectionException e) {
- boolean success = false;
- while (!success) {
- PeerId nextNode = null;
- try {
- nextNode = RaftUtils.getPeerIDInOrder(groupId);
- if (holder.equals(nextNode)) {
- break;
- }
- LOGGER.debug(
- "Previous task fail, then send show timeseries {} task for group {} to node {}.",
- pathList, groupId, nextNode);
- task.resetTask();
- task.setTargetNode(holder);
- task.setTaskState(TaskState.INITIAL);
- res.addAll(queryTimeSeries(task));
- LOGGER
- .debug("Show timeseries {} task for group {} to node {} succeed.", pathList, groupId,
- nextNode);
- success = true;
- } catch (RaftConnectionException e1) {
- LOGGER.debug("Show timeseries {} task for group {} to node {} fail.", pathList, groupId,
- nextNode);
- }
- }
- LOGGER.debug("The final result for show timeseries {} task is {}", pathList, success);
- if (!success) {
- throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
- }
+ throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
}
}
+ private List<List<String>> queryTimeSeries(SingleQPTask task, List<String> pathList, String groupId)
+ throws InterruptedException, RaftConnectionException {
+ BasicResponse response = syncHandleSingleTaskGetRes(task, 0, "show timeseries " + pathList, groupId);
+ return response == null ? new ArrayList<>()
+ : ((QueryTimeSeriesResponse) response).getTimeSeries();
+ }
+
public String processMetadataInStringQuery()
throws InterruptedException, ProcessorException {
Set<String> groupIdSet = router.getAllGroupId();
List<String> metadataList = new ArrayList<>(groupIdSet.size());
- List<SingleQPTask> taskList = new ArrayList<>();
+
+ BatchResult batchResult = new BatchResult(true, new StringBuilder(), new int[groupIdSet.size()]);
+ Map<String, List<Integer>> planIndexMap = new HashMap<>();
+ Map<String, SingleQPTask> subTaskMap = new HashMap<>();
+
+ int index = 0;
for (String groupId : groupIdSet) {
QueryMetadataInStringRequest request = new QueryMetadataInStringRequest(groupId,
getReadMetadataConsistencyLevel());
SingleQPTask task = new SingleQPTask(false, request);
- taskList.add(task);
LOGGER.debug("Execute show metadata in string statement for group {}.", groupId);
PeerId holder;
@@ -202,43 +189,17 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
holder = RaftUtils.getPeerIDInOrder(groupId);
}
task.setTargetNode(holder);
- try {
- LOGGER.debug("Send show metadata in string task for group {} to node {}.", groupId, holder);
- asyncSendNonQuerySingleTask(task, 0);
- } catch (RaftConnectionException e) {
- boolean success = false;
- while (!success) {
- PeerId nextNode = null;
- try {
- nextNode = RaftUtils.getPeerIDInOrder(groupId);
- if (holder.equals(nextNode)) {
- break;
- }
- LOGGER.debug(
- "Previous task fail, then send show metadata in string task for group {} to node {}.",
- groupId, nextNode);
- task.resetTask();
- task.setTargetNode(nextNode);
- task.setTaskState(TaskState.INITIAL);
- asyncSendNonQuerySingleTask(task, 0);
- LOGGER.debug("Show metadata in string task for group {} to node {} succeed.", groupId,
- nextNode);
- success = true;
- } catch (RaftConnectionException e1) {
- LOGGER.debug("Show metadata in string task for group {} to node {} fail.", groupId,
- nextNode);
- }
- }
- LOGGER.debug("The final result for show metadata in string task is {}", success);
- if (!success) {
- throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
- }
- }
+ subTaskMap.put(groupId, task);
+ planIndexMap.computeIfAbsent(groupId, l -> new ArrayList<>()).add(index++);
}
- for (int i = 0; i < taskList.size(); i++) {
- SingleQPTask task = taskList.get(i);
- task.await();
- BasicResponse response = task.getResponse();
+
+ BatchQPTask batchTask = new BatchQPTask(subTaskMap.size(), batchResult, subTaskMap, planIndexMap);
+ currentTask.set(batchTask);
+ batchTask.executeQueryMetadataBy(this, "show metadata in string");
+ batchTask.await();
+
+ for (SingleQPTask subTask : subTaskMap.values()) {
+ BasicResponse response = subTask.getResponse();
if (response == null || !response.isSuccess()) {
String errorMessage = "response is null";
if (response != null && response.getErrorMsg() != null) {
@@ -278,7 +239,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
task.setTargetNode(holder);
try {
LOGGER.debug("Send query metadata task for group {} to node {}.", groupId, holder);
- asyncSendNonQuerySingleTask(task, 0);
+ asyncSendSingleTask(task, 0);
} catch (RaftConnectionException e) {
boolean success = false;
while (!success) {
@@ -294,7 +255,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
task.resetTask();
task.setTargetNode(nextNode);
task.setTaskState(TaskState.INITIAL);
- asyncSendNonQuerySingleTask(task, 0);
+ asyncSendSingleTask(task, 0);
LOGGER.debug("Query metadata task for group {} to node {} succeed.", groupId, nextNode);
success = true;
} catch (RaftConnectionException e1) {
@@ -352,41 +313,21 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
try {
LOGGER.debug("Send get series type for {} task for group {} to node {}.", path, groupId,
holder);
- dataType = querySeriesType(task);
+ dataType = querySeriesType(task, path, groupId);
} catch (RaftConnectionException e) {
- boolean success = false;
- while (!success) {
- PeerId nextNode = null;
- try {
- nextNode = RaftUtils.getPeerIDInOrder(groupId);
- if (holder.equals(nextNode)) {
- break;
- }
- LOGGER.debug(
- "Previous task fail, then send get series type for {} task for group {} to node {}.",
- path, groupId, nextNode);
- task.resetTask();
- task.setTargetNode(nextNode);
- task.setTaskState(TaskState.INITIAL);
- dataType = querySeriesType(task);
- LOGGER.debug("Get series type for {} task for group {} to node {} succeed.", path,
- groupId, nextNode);
- success = true;
- } catch (RaftConnectionException e1) {
- LOGGER.debug("Get series type for {} task for group {} to node {} fail.", path, groupId,
- nextNode);
- continue;
- }
- }
- LOGGER.debug("The final result for get series type for {} task is {}", path, success);
- if (!success) {
- throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
- }
+ throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
}
}
return dataType;
}
+ private TSDataType querySeriesType(SingleQPTask task, String path, String groupId)
+ throws InterruptedException, RaftConnectionException {
+ BasicResponse response = syncHandleSingleTaskGetRes(task, 0, "get series type for " + path, groupId);
+ return response == null ? null
+ : ((QuerySeriesTypeResponse) response).getDataType();
+ }
+
/**
* Handle show timeseries <path> statement
*/
@@ -433,50 +374,17 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
try {
LOGGER
.debug("Send get paths for {} task for group {} to node {}.", pathList, groupId, holder);
- res.addAll(queryPaths(task));
+ res.addAll(queryPaths(task, pathList, groupId));
} catch (RaftConnectionException e) {
- boolean success = false;
- while (!success) {
- PeerId nextNode = null;
- try {
- nextNode = RaftUtils.getPeerIDInOrder(groupId);
- if (holder.equals(nextNode)) {
- break;
- }
- LOGGER
- .debug("Previous task fail, then send get paths for {} task for group {} to node {}.",
- pathList, groupId, nextNode);
- task.setTargetNode(nextNode);
- task.resetTask();
- task.setTaskState(TaskState.INITIAL);
- res.addAll(queryPaths(task));
- LOGGER.debug("Get paths for {} task for group {} to node {} succeed.", pathList, groupId,
- nextNode);
- success = true;
- } catch (RaftConnectionException e1) {
- LOGGER.debug("Get paths for {} task for group {} to node {} fail.", pathList, groupId,
- nextNode);
- }
- }
- LOGGER.debug("The final result for get paths for {} task is {}", pathList, success);
- if (!success) {
- throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
- }
+ throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
}
}
- private List<List<String>> queryTimeSeries(SingleQPTask task)
+ private List<String> queryPaths(SingleQPTask task, List<String> pathList, String groupId)
throws InterruptedException, RaftConnectionException {
- BasicResponse response = syncHandleNonQuerySingleTaskGetRes(task, 0);
+ BasicResponse response = syncHandleSingleTaskGetRes(task, 0, "get paths for " + pathList, groupId);
return response == null ? new ArrayList<>()
- : ((QueryTimeSeriesResponse) response).getTimeSeries();
- }
-
- private TSDataType querySeriesType(SingleQPTask task)
- throws InterruptedException, RaftConnectionException {
- BasicResponse response = syncHandleNonQuerySingleTaskGetRes(task, 0);
- return response == null ? null
- : ((QuerySeriesTypeResponse) response).getDataType();
+ : ((QueryPathsResponse) response).getPaths();
}
/**
@@ -516,13 +424,6 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
return ((QueryStorageGroupResponse) task.getResponse()).getStorageGroups();
}
- private List<String> queryPaths(SingleQPTask task)
- throws InterruptedException, RaftConnectionException {
- BasicResponse response = syncHandleNonQuerySingleTaskGetRes(task, 0);
- return response == null ? new ArrayList<>()
- : ((QueryPathsResponse) response).getPaths();
- }
-
/**
* Combine multiple metadata in String format into single String
*
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
index b881549..8548879 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
@@ -28,7 +28,9 @@ import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.iotdb.cluster.concurrent.pool.QPTaskThreadManager;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
+import org.apache.iotdb.cluster.qp.executor.AbstractQPExecutor;
import org.apache.iotdb.cluster.qp.executor.NonQueryExecutor;
+import org.apache.iotdb.cluster.qp.executor.QueryMetadataExecutor;
import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
import org.apache.iotdb.cluster.rpc.raft.response.nonquery.DataGroupNonQueryResponse;
import org.apache.iotdb.cluster.service.TSServiceClusterImpl.BatchResult;
@@ -66,7 +68,7 @@ public class BatchQPTask extends MultiQPTask {
*/
private ReentrantLock lock = new ReentrantLock();
- private NonQueryExecutor executor;
+ private AbstractQPExecutor executor;
public BatchQPTask(int taskNum, BatchResult result, Map<String, SingleQPTask> taskMap,
Map<String, List<Integer>> planIndexMap) {
@@ -90,8 +92,6 @@ public class BatchQPTask extends MultiQPTask {
String groupId = basicResponse.getGroupId();
List<Boolean> results = basicResponse.getResults();
List<Integer> indexList = planIndexMap.get(groupId);
- List<String> errorMsgList = ((DataGroupNonQueryResponse) basicResponse).getErrorMsgList();
- int errorMsgIndex = 0;
for (int i = 0; i < indexList.size(); i++) {
if (i >= results.size()) {
resultArray[indexList.get(i)] = Statement.EXECUTE_FAILED;
@@ -101,19 +101,47 @@ public class BatchQPTask extends MultiQPTask {
resultArray[indexList.get(i)] = Statement.SUCCESS_NO_INFO;
} else {
resultArray[indexList.get(i)] = Statement.EXECUTE_FAILED;
- batchResult.addBatchErrorMessage(indexList.get(i), errorMsgList.get(errorMsgIndex++));
+ batchResult.addBatchErrorMessage(indexList.get(i), basicResponse.getErrorMsg());
}
}
}
if (!basicResponse.isSuccess()) {
batchResult.setAllSuccessful(false);
}
+ } catch (Exception ex) {
+ ex.printStackTrace();
} finally {
lock.unlock();
}
taskCountDownLatch.countDown();
}
+ public void executeQueryMetadataBy(QueryMetadataExecutor executor, String taskInfo) {
+ this.executor = executor;
+
+ for (Entry<String, SingleQPTask> entry : taskMap.entrySet()) {
+ String groupId = entry.getKey();
+ SingleQPTask subTask = entry.getValue();
+ Future<?> taskThread;
+ taskThread = QPTaskThreadManager.getInstance()
+ .submit(() -> executeRpcSubQueryMetadataTask(subTask, taskInfo, groupId));
+ taskThreadMap.put(groupId, taskThread);
+ }
+ }
+
+ /**
+ * Execute RPC sub task
+ */
+ private void executeRpcSubQueryMetadataTask(SingleQPTask subTask, String taskInfo, String groupId) {
+ try {
+ executor.syncHandleSingleTask(subTask, taskInfo, groupId);
+ this.receive(subTask.getResponse());
+ } catch (RaftConnectionException | InterruptedException e) {
+ LOGGER.error("Async handle sub {} task failed.", taskInfo);
+ this.receive(DataGroupNonQueryResponse.createErrorResponse(groupId, e.getMessage()));
+ }
+ }
+
public void executeBy(NonQueryExecutor executor) {
this.executor = executor;
@@ -139,7 +167,7 @@ public class BatchQPTask extends MultiQPTask {
*/
private void executeLocalSubTask(QPTask subTask, String groupId) {
try {
- executor.handleNonQueryRequestLocally(groupId, subTask);
+ ((NonQueryExecutor) executor).handleNonQueryRequestLocally(groupId, subTask);
this.receive(subTask.getResponse());
} catch (InterruptedException e) {
LOGGER.error("Handle sub task locally failed.");
@@ -152,7 +180,7 @@ public class BatchQPTask extends MultiQPTask {
*/
private void executeRpcSubTask(SingleQPTask subTask, String groupId) {
try {
- executor.syncHandleNonQueryTask(subTask);
+ executor.syncHandleSingleTask(subTask, "sub non-query", groupId);
this.receive(subTask.getResponse());
} catch (RaftConnectionException | InterruptedException e) {
LOGGER.error("Async handle sub task failed.");
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
index 1da5072..7ae40c2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
@@ -204,13 +204,13 @@ public class TSServiceClusterImpl extends TSServiceImpl {
/**
* Present batch results.
*/
- public class BatchResult {
+ public static class BatchResult {
private boolean isAllSuccessful;
private StringBuilder batchErrorMessage;
private int[] resultArray;
- private BatchResult(boolean isAllSuccessful, StringBuilder batchErrorMessage,
+ public BatchResult(boolean isAllSuccessful, StringBuilder batchErrorMessage,
int[] resultArray) {
this.isAllSuccessful = isAllSuccessful;
this.batchErrorMessage = batchErrorMessage;