You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/11/03 08:59:10 UTC
[iotdb] branch parallel_before updated: cache leader for batch plan
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch parallel_before
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/parallel_before by this push:
new e41e3b3 cache leader for batch plan
e41e3b3 is described below
commit e41e3b3600125a0f1b688c56287d4629a5a53ba7
Author: MrQuansy <10...@qq.com>
AuthorDate: Mon Nov 1 17:54:50 2021 +0800
cache leader for batch plan
cache leader for batch plan
---
.../iotdb/cluster/coordinator/Coordinator.java | 114 ++++++++++++---------
.../main/java/org/apache/iotdb/rpc/RpcUtils.java | 3 +-
2 files changed, 70 insertions(+), 47 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index db0fce3..4acc8bb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -223,7 +223,7 @@ public class Coordinator {
if (!checkPrivilegeForBatchExecution(plan)) {
return concludeFinalStatus(
- plan, plan.getPaths().size(), true, null, false, null, Collections.emptyList());
+ plan, plan.getPaths().size(), true, false, false, null, Collections.emptyList());
}
// split the plan into sub-plans that each only involve one data group
@@ -484,7 +484,7 @@ public class Coordinator {
TSStatus[] subStatus = null;
boolean noFailure = true;
boolean isBatchFailure = false;
- EndPoint endPoint = null;
+ boolean isBatchRedirect = false;
int totalRowNum = parentPlan.getPaths().size();
// send sub-plans to each belonging data group and collect results
for (Map.Entry<PhysicalPlan, PartitionGroup> entry : planGroupMap.entrySet()) {
@@ -493,10 +493,9 @@ public class Coordinator {
noFailure = (tmpStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) && noFailure;
isBatchFailure =
(tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) || isBatchFailure;
- if (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- if (parentPlan instanceof InsertTabletPlan) {
- totalRowNum = ((InsertTabletPlan) parentPlan).getRowCount();
- } else if (parentPlan instanceof InsertMultiTabletPlan) {
+ if (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()
+ || tmpStatus.isSetRedirectNode() && !(parentPlan instanceof CreateMultiTimeSeriesPlan)) {
+ if (parentPlan instanceof InsertMultiTabletPlan) {
// the subStatus is the two-dimensional array,
// The first dimension is the number of InsertTabletPlans,
// and the second dimension is the number of rows per InsertTabletPlan
@@ -518,25 +517,46 @@ public class Coordinator {
InsertTabletPlan tmpInsertTabletPlan = tmpMultiTabletPlan.getInsertTabletPlan(i);
int parentIndex = tmpMultiTabletPlan.getParentIndex(i);
int parentPlanRowCount = ((InsertMultiTabletPlan) parentPlan).getRowCount(parentIndex);
- if (subStatus[parentIndex].subStatus == null) {
- TSStatus[] tmpSubTsStatus = new TSStatus[parentPlanRowCount];
- Arrays.fill(tmpSubTsStatus, RpcUtils.SUCCESS_STATUS);
- subStatus[parentIndex].subStatus = Arrays.asList(tmpSubTsStatus);
+ if (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ subStatus[parentIndex] = tmpStatus.subStatus.get(i);
+ if (tmpStatus.subStatus.get(i).getCode()
+ == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ if (subStatus[parentIndex].subStatus == null) {
+ TSStatus[] tmpSubTsStatus = new TSStatus[parentPlanRowCount];
+ Arrays.fill(tmpSubTsStatus, RpcUtils.SUCCESS_STATUS);
+ subStatus[parentIndex].subStatus = Arrays.asList(tmpSubTsStatus);
+ }
+ TSStatus[] reorderTsStatus =
+ subStatus[parentIndex].subStatus.toArray(new TSStatus[] {});
+
+ PartitionUtils.reordering(
+ tmpInsertTabletPlan,
+ reorderTsStatus,
+ tmpStatus.subStatus.get(i).subStatus.toArray(new TSStatus[] {}));
+ subStatus[parentIndex].subStatus = Arrays.asList(reorderTsStatus);
+ }
+ if (tmpStatus.isSetRedirectNode()) {
+ if (tmpStatus.isSetRedirectNode()
+ && tmpInsertTabletPlan.getMaxTime()
+ == ((InsertMultiTabletPlan) parentPlan)
+ .getInsertTabletPlan(parentIndex)
+ .getMaxTime()) {
+ subStatus[parentIndex].setRedirectNode(tmpStatus.redirectNode);
+ isBatchRedirect = true;
+ }
+ }
+ } else if (tmpStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ if (tmpStatus.isSetRedirectNode()
+ && tmpInsertTabletPlan.getMaxTime()
+ == ((InsertMultiTabletPlan) parentPlan)
+ .getInsertTabletPlan(parentIndex)
+ .getMaxTime()) {
+ subStatus[parentIndex] =
+ StatusUtils.getStatus(RpcUtils.SUCCESS_STATUS, tmpStatus.redirectNode);
+ isBatchRedirect = true;
+ }
}
- TSStatus[] reorderTsStatus =
- subStatus[parentIndex].subStatus.toArray(new TSStatus[] {});
-
- PartitionUtils.reordering(
- tmpInsertTabletPlan,
- reorderTsStatus,
- tmpStatus.subStatus.toArray(new TSStatus[] {}));
- subStatus[parentIndex].subStatus = Arrays.asList(reorderTsStatus);
}
- } else if (parentPlan instanceof InsertTabletPlan) {
- PartitionUtils.reordering(
- (InsertTabletPlan) entry.getKey(),
- subStatus,
- tmpStatus.subStatus.toArray(new TSStatus[] {}));
} else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
CreateMultiTimeSeriesPlan subPlan = (CreateMultiTimeSeriesPlan) entry.getKey();
for (int i = 0; i < subPlan.getIndexes().size(); i++) {
@@ -544,8 +564,24 @@ public class Coordinator {
}
} else if (parentPlan instanceof InsertRowsPlan) {
InsertRowsPlan subPlan = (InsertRowsPlan) entry.getKey();
- for (int i = 0; i < subPlan.getInsertRowPlanIndexList().size(); i++) {
- subStatus[subPlan.getInsertRowPlanIndexList().get(i)] = tmpStatus.subStatus.get(i);
+ if (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ for (int i = 0; i < subPlan.getInsertRowPlanIndexList().size(); i++) {
+ subStatus[subPlan.getInsertRowPlanIndexList().get(i)] = tmpStatus.subStatus.get(i);
+ if (tmpStatus.isSetRedirectNode()) {
+ subStatus[subPlan.getInsertRowPlanIndexList().get(i)].setRedirectNode(
+ tmpStatus.getRedirectNode());
+ isBatchRedirect = true;
+ }
+ }
+ } else if (tmpStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ if (tmpStatus.isSetRedirectNode()) {
+ isBatchRedirect = true;
+ TSStatus redirectStatus =
+ StatusUtils.getStatus(RpcUtils.SUCCESS_STATUS, tmpStatus.getRedirectNode());
+ for (int i = 0; i < subPlan.getInsertRowPlanIndexList().size(); i++) {
+ subStatus[subPlan.getInsertRowPlanIndexList().get(i)] = redirectStatus;
+ }
+ }
}
}
}
@@ -560,28 +596,12 @@ public class Coordinator {
tmpStatus.getMessage(),
tmpStatus.subStatus));
}
-
- if (tmpStatus.isSetRedirectNode()) {
- boolean isLastInsertTabletPlan =
- parentPlan instanceof InsertTabletPlan
- && ((InsertTabletPlan) entry.getKey()).getMaxTime()
- == ((InsertTabletPlan) parentPlan).getMaxTime();
-
- boolean isLastInsertMultiTabletPlan =
- parentPlan instanceof InsertMultiTabletPlan
- && ((InsertMultiTabletPlan) entry.getKey()).getMaxTime()
- == ((InsertMultiTabletPlan) parentPlan).getMaxTime();
-
- if (isLastInsertTabletPlan || isLastInsertMultiTabletPlan) {
- endPoint = tmpStatus.getRedirectNode();
- }
- }
}
return concludeFinalStatus(
parentPlan,
totalRowNum,
noFailure,
- endPoint,
+ isBatchRedirect,
isBatchFailure,
subStatus,
errorCodePartitionGroups);
@@ -591,7 +611,7 @@ public class Coordinator {
PhysicalPlan parentPlan,
int totalRowNum,
boolean noFailure,
- EndPoint endPoint,
+ boolean isBatchRedirect,
boolean isBatchFailure,
TSStatus[] subStatus,
List<String> errorCodePartitionGroups) {
@@ -639,9 +659,11 @@ public class Coordinator {
TSStatus status;
if (noFailure) {
- status = StatusUtils.OK;
- if (endPoint != null) {
- status = StatusUtils.getStatus(status, endPoint);
+ if (isBatchRedirect) {
+ status = RpcUtils.getStatus(Arrays.asList(subStatus));
+ status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+ } else {
+ status = StatusUtils.OK;
}
} else if (isBatchFailure) {
status = RpcUtils.getStatus(Arrays.asList(subStatus));
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index 0ace7d4..dec3b78 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -98,7 +98,8 @@ public class RpcUtils {
public static void verifySuccessWithRedirectionForMultiDevices(
TSStatus status, List<String> devices) throws StatementExecutionException, RedirectException {
verifySuccess(status);
- if (status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ if (status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()
+ || status.getCode() == TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
Map<String, EndPoint> deviceEndPointMap = new HashMap<>();
List<TSStatus> statusSubStatus = status.getSubStatus();
for (int i = 0; i < statusSubStatus.size(); i++) {