You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/11/03 08:59:10 UTC

[iotdb] branch parallel_before updated: cache leader for batch plan

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch parallel_before
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/parallel_before by this push:
     new e41e3b3  cache leader for batch plan
e41e3b3 is described below

commit e41e3b3600125a0f1b688c56287d4629a5a53ba7
Author: MrQuansy <10...@qq.com>
AuthorDate: Mon Nov 1 17:54:50 2021 +0800

    cache leader for batch plan
    
    cache leader for batch plan
---
 .../iotdb/cluster/coordinator/Coordinator.java     | 114 ++++++++++++---------
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |   3 +-
 2 files changed, 70 insertions(+), 47 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index db0fce3..4acc8bb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -223,7 +223,7 @@ public class Coordinator {
 
     if (!checkPrivilegeForBatchExecution(plan)) {
       return concludeFinalStatus(
-          plan, plan.getPaths().size(), true, null, false, null, Collections.emptyList());
+          plan, plan.getPaths().size(), true, false, false, null, Collections.emptyList());
     }
 
     // split the plan into sub-plans that each only involve one data group
@@ -484,7 +484,7 @@ public class Coordinator {
     TSStatus[] subStatus = null;
     boolean noFailure = true;
     boolean isBatchFailure = false;
-    EndPoint endPoint = null;
+    boolean isBatchRedirect = false;
     int totalRowNum = parentPlan.getPaths().size();
     // send sub-plans to each belonging data group and collect results
     for (Map.Entry<PhysicalPlan, PartitionGroup> entry : planGroupMap.entrySet()) {
@@ -493,10 +493,9 @@ public class Coordinator {
       noFailure = (tmpStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) && noFailure;
       isBatchFailure =
           (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) || isBatchFailure;
-      if (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-        if (parentPlan instanceof InsertTabletPlan) {
-          totalRowNum = ((InsertTabletPlan) parentPlan).getRowCount();
-        } else if (parentPlan instanceof InsertMultiTabletPlan) {
+      if (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()
+          || tmpStatus.isSetRedirectNode() && !(parentPlan instanceof CreateMultiTimeSeriesPlan)) {
+        if (parentPlan instanceof InsertMultiTabletPlan) {
           // the subStatus is the two-dimensional array,
           // The first dimension is the number of InsertTabletPlans,
           // and the second dimension is the number of rows per InsertTabletPlan
@@ -518,25 +517,46 @@ public class Coordinator {
             InsertTabletPlan tmpInsertTabletPlan = tmpMultiTabletPlan.getInsertTabletPlan(i);
             int parentIndex = tmpMultiTabletPlan.getParentIndex(i);
             int parentPlanRowCount = ((InsertMultiTabletPlan) parentPlan).getRowCount(parentIndex);
-            if (subStatus[parentIndex].subStatus == null) {
-              TSStatus[] tmpSubTsStatus = new TSStatus[parentPlanRowCount];
-              Arrays.fill(tmpSubTsStatus, RpcUtils.SUCCESS_STATUS);
-              subStatus[parentIndex].subStatus = Arrays.asList(tmpSubTsStatus);
+            if (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+              subStatus[parentIndex] = tmpStatus.subStatus.get(i);
+              if (tmpStatus.subStatus.get(i).getCode()
+                  == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+                if (subStatus[parentIndex].subStatus == null) {
+                  TSStatus[] tmpSubTsStatus = new TSStatus[parentPlanRowCount];
+                  Arrays.fill(tmpSubTsStatus, RpcUtils.SUCCESS_STATUS);
+                  subStatus[parentIndex].subStatus = Arrays.asList(tmpSubTsStatus);
+                }
+                TSStatus[] reorderTsStatus =
+                    subStatus[parentIndex].subStatus.toArray(new TSStatus[] {});
+
+                PartitionUtils.reordering(
+                    tmpInsertTabletPlan,
+                    reorderTsStatus,
+                    tmpStatus.subStatus.get(i).subStatus.toArray(new TSStatus[] {}));
+                subStatus[parentIndex].subStatus = Arrays.asList(reorderTsStatus);
+              }
+              if (tmpStatus.isSetRedirectNode()) {
+                if (tmpStatus.isSetRedirectNode()
+                    && tmpInsertTabletPlan.getMaxTime()
+                        == ((InsertMultiTabletPlan) parentPlan)
+                            .getInsertTabletPlan(parentIndex)
+                            .getMaxTime()) {
+                  subStatus[parentIndex].setRedirectNode(tmpStatus.redirectNode);
+                  isBatchRedirect = true;
+                }
+              }
+            } else if (tmpStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+              if (tmpStatus.isSetRedirectNode()
+                  && tmpInsertTabletPlan.getMaxTime()
+                      == ((InsertMultiTabletPlan) parentPlan)
+                          .getInsertTabletPlan(parentIndex)
+                          .getMaxTime()) {
+                subStatus[parentIndex] =
+                    StatusUtils.getStatus(RpcUtils.SUCCESS_STATUS, tmpStatus.redirectNode);
+                isBatchRedirect = true;
+              }
             }
-            TSStatus[] reorderTsStatus =
-                subStatus[parentIndex].subStatus.toArray(new TSStatus[] {});
-
-            PartitionUtils.reordering(
-                tmpInsertTabletPlan,
-                reorderTsStatus,
-                tmpStatus.subStatus.toArray(new TSStatus[] {}));
-            subStatus[parentIndex].subStatus = Arrays.asList(reorderTsStatus);
           }
-        } else if (parentPlan instanceof InsertTabletPlan) {
-          PartitionUtils.reordering(
-              (InsertTabletPlan) entry.getKey(),
-              subStatus,
-              tmpStatus.subStatus.toArray(new TSStatus[] {}));
         } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
           CreateMultiTimeSeriesPlan subPlan = (CreateMultiTimeSeriesPlan) entry.getKey();
           for (int i = 0; i < subPlan.getIndexes().size(); i++) {
@@ -544,8 +564,24 @@ public class Coordinator {
           }
         } else if (parentPlan instanceof InsertRowsPlan) {
           InsertRowsPlan subPlan = (InsertRowsPlan) entry.getKey();
-          for (int i = 0; i < subPlan.getInsertRowPlanIndexList().size(); i++) {
-            subStatus[subPlan.getInsertRowPlanIndexList().get(i)] = tmpStatus.subStatus.get(i);
+          if (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+            for (int i = 0; i < subPlan.getInsertRowPlanIndexList().size(); i++) {
+              subStatus[subPlan.getInsertRowPlanIndexList().get(i)] = tmpStatus.subStatus.get(i);
+              if (tmpStatus.isSetRedirectNode()) {
+                subStatus[subPlan.getInsertRowPlanIndexList().get(i)].setRedirectNode(
+                    tmpStatus.getRedirectNode());
+                isBatchRedirect = true;
+              }
+            }
+          } else if (tmpStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            if (tmpStatus.isSetRedirectNode()) {
+              isBatchRedirect = true;
+              TSStatus redirectStatus =
+                  StatusUtils.getStatus(RpcUtils.SUCCESS_STATUS, tmpStatus.getRedirectNode());
+              for (int i = 0; i < subPlan.getInsertRowPlanIndexList().size(); i++) {
+                subStatus[subPlan.getInsertRowPlanIndexList().get(i)] = redirectStatus;
+              }
+            }
           }
         }
       }
@@ -560,28 +596,12 @@ public class Coordinator {
                 tmpStatus.getMessage(),
                 tmpStatus.subStatus));
       }
-
-      if (tmpStatus.isSetRedirectNode()) {
-        boolean isLastInsertTabletPlan =
-            parentPlan instanceof InsertTabletPlan
-                && ((InsertTabletPlan) entry.getKey()).getMaxTime()
-                    == ((InsertTabletPlan) parentPlan).getMaxTime();
-
-        boolean isLastInsertMultiTabletPlan =
-            parentPlan instanceof InsertMultiTabletPlan
-                && ((InsertMultiTabletPlan) entry.getKey()).getMaxTime()
-                    == ((InsertMultiTabletPlan) parentPlan).getMaxTime();
-
-        if (isLastInsertTabletPlan || isLastInsertMultiTabletPlan) {
-          endPoint = tmpStatus.getRedirectNode();
-        }
-      }
     }
     return concludeFinalStatus(
         parentPlan,
         totalRowNum,
         noFailure,
-        endPoint,
+        isBatchRedirect,
         isBatchFailure,
         subStatus,
         errorCodePartitionGroups);
@@ -591,7 +611,7 @@ public class Coordinator {
       PhysicalPlan parentPlan,
       int totalRowNum,
       boolean noFailure,
-      EndPoint endPoint,
+      boolean isBatchRedirect,
       boolean isBatchFailure,
       TSStatus[] subStatus,
       List<String> errorCodePartitionGroups) {
@@ -639,9 +659,11 @@ public class Coordinator {
 
     TSStatus status;
     if (noFailure) {
-      status = StatusUtils.OK;
-      if (endPoint != null) {
-        status = StatusUtils.getStatus(status, endPoint);
+      if (isBatchRedirect) {
+        status = RpcUtils.getStatus(Arrays.asList(subStatus));
+        status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+      } else {
+        status = StatusUtils.OK;
       }
     } else if (isBatchFailure) {
       status = RpcUtils.getStatus(Arrays.asList(subStatus));
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index 0ace7d4..dec3b78 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -98,7 +98,8 @@ public class RpcUtils {
   public static void verifySuccessWithRedirectionForMultiDevices(
       TSStatus status, List<String> devices) throws StatementExecutionException, RedirectException {
     verifySuccess(status);
-    if (status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+    if (status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()
+        || status.getCode() == TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
       Map<String, EndPoint> deviceEndPointMap = new HashMap<>();
       List<TSStatus> statusSubStatus = status.getSubStatus();
       for (int i = 0; i < statusSubStatus.size(); i++) {