You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/03/16 08:15:18 UTC

[iotdb] branch rel/1.1 updated: [To rel/1.1] Fix dispatch result collection logic (#9323)

This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new e3cd98ef9f [To rel/1.1] Fix dispatch result collection logic (#9323)
e3cd98ef9f is described below

commit e3cd98ef9f9b90ef83e09ede7579819988b21218
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Thu Mar 16 16:15:10 2023 +0800

    [To rel/1.1] Fix dispatch result collection logic (#9323)
---
 .../execution/executor/RegionWriteExecutor.java    | 149 ++++++++-------------
 .../metedata/write/CreateMultiTimeSeriesNode.java  |   6 +-
 .../plan/node/metedata/write/MeasurementGroup.java |  36 ++---
 .../db/mpp/plan/scheduler/AsyncPlanNodeSender.java |  34 +++++
 .../scheduler/FragmentInstanceDispatcherImpl.java  |  32 ++++-
 5 files changed, 132 insertions(+), 125 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
index 71e870cc67..69ca7a4f3b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
@@ -462,53 +462,10 @@ public class RegionWriteExecutor {
           }
           measurementGroup.removeMeasurements(failingMeasurementMap.keySet());
 
-          RegionExecutionResult executionResult =
-              super.visitInternalCreateTimeSeries(node, context);
-
-          if (failingStatus.isEmpty() && alreadyExistingStatus.isEmpty()) {
-            return executionResult;
-          }
-
-          TSStatus executionStatus = executionResult.getStatus();
-
-          // separate the measurement_already_exist exception and other exceptions process,
-          // measurement_already_exist exception is acceptable due to concurrent timeseries creation
-          if (failingStatus.isEmpty()) {
-            if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-              if (executionStatus.getSubStatus().get(0).getCode()
-                  == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
-                // there's only measurement_already_exist exception
-                alreadyExistingStatus.addAll(executionStatus.getSubStatus());
-              } else {
-                failingStatus.addAll(executionStatus.getSubStatus());
-              }
-            } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-              failingStatus.add(executionStatus);
-            }
-          } else {
-            if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-              if (executionStatus.getSubStatus().get(0).getCode()
-                  != TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
-                failingStatus.addAll(executionStatus.getSubStatus());
-              }
-            } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-              failingStatus.add(executionStatus);
-            }
-          }
-
-          RegionExecutionResult result = new RegionExecutionResult();
-          TSStatus status;
-          if (failingStatus.isEmpty()) {
-            status = RpcUtils.getStatus(alreadyExistingStatus);
-            result.setAccepted(true);
-          } else {
-            status = RpcUtils.getStatus(failingStatus);
-            result.setAccepted(false);
-          }
-
-          result.setMessage(status.getMessage());
-          result.setStatus(status);
-          return result;
+          return processExecutionResultOfInternalCreateSchema(
+              super.visitInternalCreateTimeSeries(node, context),
+              failingStatus,
+              alreadyExistingStatus);
         } finally {
           context.getRegionWriteValidationRWLock().writeLock().unlock();
         }
@@ -562,59 +519,65 @@ public class RegionWriteExecutor {
             measurementGroup.removeMeasurements(failingMeasurementMap.keySet());
           }
 
-          RegionExecutionResult executionResult =
-              super.visitInternalCreateMultiTimeSeries(node, context);
-
-          if (failingStatus.isEmpty() && alreadyExistingStatus.isEmpty()) {
-            return executionResult;
-          }
-
-          TSStatus executionStatus = executionResult.getStatus();
+          return processExecutionResultOfInternalCreateSchema(
+              super.visitInternalCreateMultiTimeSeries(node, context),
+              failingStatus,
+              alreadyExistingStatus);
+        } finally {
+          context.getRegionWriteValidationRWLock().writeLock().unlock();
+        }
+      } else {
+        return super.visitInternalCreateMultiTimeSeries(node, context);
+      }
+    }
 
-          // separate the measurement_already_exist exception and other exceptions process,
-          // measurement_already_exist exception is acceptable due to concurrent timeseries creation
-          if (failingStatus.isEmpty()) {
-            if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-              if (executionStatus.getSubStatus().get(0).getCode()
-                  == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
-                // there's only measurement_already_exist exception
-                alreadyExistingStatus.addAll(executionStatus.getSubStatus());
-              } else {
-                failingStatus.addAll(executionStatus.getSubStatus());
-              }
-            } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-              failingStatus.add(executionStatus);
-            }
+    private RegionExecutionResult processExecutionResultOfInternalCreateSchema(
+        RegionExecutionResult executionResult,
+        List<TSStatus> failingStatus,
+        List<TSStatus> alreadyExistingStatus) {
+      TSStatus executionStatus = executionResult.getStatus();
+
+      // separate the measurement_already_exist exception and other exceptions process,
+      // measurement_already_exist exception is acceptable due to concurrent timeseries creation
+      if (failingStatus.isEmpty()) {
+        if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+          if (executionStatus.getSubStatus().get(0).getCode()
+              == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+            // there's only measurement_already_exist exception
+            alreadyExistingStatus.addAll(executionStatus.getSubStatus());
           } else {
-            if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-              if (executionStatus.getSubStatus().get(0).getCode()
-                  != TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
-                failingStatus.addAll(executionStatus.getSubStatus());
-              }
-            } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-              failingStatus.add(executionStatus);
-            }
+            failingStatus.addAll(executionStatus.getSubStatus());
           }
-
-          RegionExecutionResult result = new RegionExecutionResult();
-          TSStatus status;
-          if (failingStatus.isEmpty()) {
-            status = RpcUtils.getStatus(alreadyExistingStatus);
-            result.setAccepted(true);
-          } else {
-            status = RpcUtils.getStatus(failingStatus);
-            result.setAccepted(false);
+        } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          failingStatus.add(executionStatus);
+        }
+      } else {
+        if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+          if (executionStatus.getSubStatus().get(0).getCode()
+              != TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+            failingStatus.addAll(executionStatus.getSubStatus());
           }
-
-          result.setMessage(status.getMessage());
-          result.setStatus(status);
-          return result;
-        } finally {
-          context.getRegionWriteValidationRWLock().writeLock().unlock();
+        } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          failingStatus.add(executionStatus);
         }
+      }
+
+      RegionExecutionResult result = new RegionExecutionResult();
+      TSStatus status;
+      if (failingStatus.isEmpty() && alreadyExistingStatus.isEmpty()) {
+        status = RpcUtils.SUCCESS_STATUS;
+        result.setAccepted(true);
+      } else if (failingStatus.isEmpty()) {
+        status = RpcUtils.getStatus(alreadyExistingStatus);
+        result.setAccepted(true);
       } else {
-        return super.visitInternalCreateMultiTimeSeries(node, context);
+        status = RpcUtils.getStatus(failingStatus);
+        result.setAccepted(false);
       }
+
+      result.setMessage(status.getMessage());
+      result.setStatus(status);
+      return result;
     }
 
     @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java
index a739020e30..32445aee12 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java
@@ -83,8 +83,10 @@ public class CreateMultiTimeSeriesNode extends WritePlanNode {
         measurementGroupMap.put(devicePath, measurementGroup);
       }
 
-      measurementGroup.addMeasurement(
-          paths.get(i).getMeasurement(), dataTypes.get(i), encodings.get(i), compressors.get(i));
+      if (!measurementGroup.addMeasurement(
+          paths.get(i).getMeasurement(), dataTypes.get(i), encodings.get(i), compressors.get(i))) {
+        continue;
+      }
 
       if (propsList != null) {
         measurementGroup.addProps(propsList.get(i));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java
index f5bd2ccc52..5123f0ba0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java
@@ -28,6 +28,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -44,6 +45,8 @@ public class MeasurementGroup {
   private List<Map<String, String>> tagsList;
   private List<Map<String, String>> attributesList;
 
+  private final transient Set<String> measurementSet = new HashSet<>();
+
   public List<String> getMeasurements() {
     return measurements;
   }
@@ -80,15 +83,20 @@ public class MeasurementGroup {
     return attributesList;
   }
 
-  public void addMeasurement(
+  public boolean addMeasurement(
       String measurement,
       TSDataType dataType,
       TSEncoding encoding,
       CompressionType compressionType) {
+    if (measurementSet.contains(measurement)) {
+      return false;
+    }
     measurements.add(measurement);
+    measurementSet.add(measurement);
     dataTypes.add(dataType);
     encodings.add(encoding);
     compressors.add(compressionType);
+    return true;
   }
 
   public void addAlias(String alias) {
@@ -119,29 +127,6 @@ public class MeasurementGroup {
     attributesList.add(attributes);
   }
 
-  public void removeMeasurement(int index) {
-    measurements.remove(index);
-    dataTypes.remove(index);
-    encodings.remove(index);
-    compressors.remove(index);
-
-    if (aliasList != null) {
-      aliasList.remove(index);
-    }
-
-    if (propsList != null) {
-      propsList.remove(index);
-    }
-
-    if (tagsList != null) {
-      tagsList.remove(index);
-    }
-
-    if (attributesList != null) {
-      attributesList.remove(index);
-    }
-  }
-
   public void removeMeasurements(Set<Integer> indexSet) {
     int restSize = this.measurements.size() - indexSet.size();
     List<String> measurements = new ArrayList<>(restSize);
@@ -156,6 +141,7 @@ public class MeasurementGroup {
 
     for (int i = 0; i < this.measurements.size(); i++) {
       if (indexSet.contains(i)) {
+        measurementSet.remove(this.measurements.get(i));
         continue;
       }
       measurements.add(this.measurements.get(i));
@@ -217,6 +203,7 @@ public class MeasurementGroup {
     MeasurementGroup subMeasurementGroup;
     subMeasurementGroup = new MeasurementGroup();
     subMeasurementGroup.measurements = measurements.subList(startIndex, endIndex);
+    subMeasurementGroup.measurementSet.addAll(subMeasurementGroup.measurements);
     subMeasurementGroup.dataTypes = dataTypes.subList(startIndex, endIndex);
     subMeasurementGroup.encodings = encodings.subList(startIndex, endIndex);
     subMeasurementGroup.compressors = compressors.subList(startIndex, endIndex);
@@ -359,6 +346,7 @@ public class MeasurementGroup {
     for (int i = 0; i < size; i++) {
       measurements.add(ReadWriteIOUtils.readString(byteBuffer));
     }
+    measurementSet.addAll(measurements);
 
     dataTypes = new ArrayList<>();
     for (int i = 0; i < size; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
index f19de2323f..29a5f80f62 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.plan.scheduler;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
@@ -32,6 +33,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -87,6 +89,38 @@ public class AsyncPlanNodeSender {
     }
   }
 
+  public List<TSStatus> getFailureStatusList() {
+    List<TSStatus> failureStatusList = new ArrayList<>();
+    TSStatus status;
+    for (Map.Entry<Integer, TSendPlanNodeResp> entry : instanceId2RespMap.entrySet()) {
+      status = entry.getValue().getStatus();
+      if (!entry.getValue().accepted) {
+        if (status == null) {
+          logger.warn(
+              "dispatch write failed. message: {}, node {}",
+              entry.getValue().message,
+              instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint());
+          failureStatusList.add(
+              RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR, entry.getValue().getMessage()));
+        } else {
+          logger.warn(
+              "dispatch write failed. status: {}, code: {}, message: {}, node {}",
+              entry.getValue().status,
+              TSStatusCode.representOf(status.code),
+              entry.getValue().message,
+              instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint());
+          failureStatusList.add(status);
+        }
+      } else {
+        // some expected and accepted status except SUCCESS_STATUS need to be returned
+        if (status != null && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          failureStatusList.add(status);
+        }
+      }
+    }
+    return failureStatusList;
+  }
+
   public Future<FragInstanceDispatchResult> getResult() {
     for (Map.Entry<Integer, TSendPlanNodeResp> entry : instanceId2RespMap.entrySet()) {
       if (!entry.getValue().accepted) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 3226884b56..0719b86452 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -177,19 +177,21 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
     AsyncPlanNodeSender asyncPlanNodeSender =
         new AsyncPlanNodeSender(asyncInternalServiceClientManager, remoteInstances);
     asyncPlanNodeSender.sendAll();
+
+    List<TSStatus> dataNodeFailureList = new ArrayList<>();
+
     // sync dispatch to local
     long localScheduleStartTime = System.nanoTime();
     for (FragmentInstance localInstance : localInstances) {
       try (SetThreadName threadName = new SetThreadName(localInstance.getId().getFullId())) {
         dispatchOneInstance(localInstance);
       } catch (FragmentInstanceDispatchException e) {
-        return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus()));
+        dataNodeFailureList.add(e.getFailureStatus());
       } catch (Throwable t) {
         logger.warn("[DispatchFailed]", t);
-        return immediateFuture(
-            new FragInstanceDispatchResult(
-                RpcUtils.getStatus(
-                    TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage())));
+        dataNodeFailureList.add(
+            RpcUtils.getStatus(
+                TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage()));
       }
     }
     PerformanceOverviewMetricsManager.recordScheduleLocalCost(
@@ -205,7 +207,25 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
               RpcUtils.getStatus(
                   TSStatusCode.INTERNAL_SERVER_ERROR, "Interrupted errors: " + e.getMessage())));
     }
-    return asyncPlanNodeSender.getResult();
+
+    dataNodeFailureList.addAll(asyncPlanNodeSender.getFailureStatusList());
+
+    if (dataNodeFailureList.isEmpty()) {
+      return immediateFuture(new FragInstanceDispatchResult(true));
+    }
+    if (instances.size() == 1) {
+      return immediateFuture(new FragInstanceDispatchResult(dataNodeFailureList.get(0)));
+    } else {
+      List<TSStatus> failureStatusList = new ArrayList<>();
+      for (TSStatus dataNodeFailure : dataNodeFailureList) {
+        if (dataNodeFailure.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+          failureStatusList.addAll(dataNodeFailure.getSubStatus());
+        } else {
+          failureStatusList.add(dataNodeFailure);
+        }
+      }
+      return immediateFuture(new FragInstanceDispatchResult(RpcUtils.getStatus(failureStatusList)));
+    }
   }
 
   private void dispatchOneInstance(FragmentInstance instance)