You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/03/16 08:15:18 UTC
[iotdb] branch rel/1.1 updated: [To rel/1.1] Fix dispatch result collection logic (#9323)
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new e3cd98ef9f [To rel/1.1] Fix dispatch result collection logic (#9323)
e3cd98ef9f is described below
commit e3cd98ef9f9b90ef83e09ede7579819988b21218
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Thu Mar 16 16:15:10 2023 +0800
[To rel/1.1] Fix dispatch result collection logic (#9323)
---
.../execution/executor/RegionWriteExecutor.java | 149 ++++++++-------------
.../metedata/write/CreateMultiTimeSeriesNode.java | 6 +-
.../plan/node/metedata/write/MeasurementGroup.java | 36 ++---
.../db/mpp/plan/scheduler/AsyncPlanNodeSender.java | 34 +++++
.../scheduler/FragmentInstanceDispatcherImpl.java | 32 ++++-
5 files changed, 132 insertions(+), 125 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
index 71e870cc67..69ca7a4f3b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
@@ -462,53 +462,10 @@ public class RegionWriteExecutor {
}
measurementGroup.removeMeasurements(failingMeasurementMap.keySet());
- RegionExecutionResult executionResult =
- super.visitInternalCreateTimeSeries(node, context);
-
- if (failingStatus.isEmpty() && alreadyExistingStatus.isEmpty()) {
- return executionResult;
- }
-
- TSStatus executionStatus = executionResult.getStatus();
-
- // separate the measurement_already_exist exception and other exceptions process,
- // measurement_already_exist exception is acceptable due to concurrent timeseries creation
- if (failingStatus.isEmpty()) {
- if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- if (executionStatus.getSubStatus().get(0).getCode()
- == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
- // there's only measurement_already_exist exception
- alreadyExistingStatus.addAll(executionStatus.getSubStatus());
- } else {
- failingStatus.addAll(executionStatus.getSubStatus());
- }
- } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- failingStatus.add(executionStatus);
- }
- } else {
- if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- if (executionStatus.getSubStatus().get(0).getCode()
- != TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
- failingStatus.addAll(executionStatus.getSubStatus());
- }
- } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- failingStatus.add(executionStatus);
- }
- }
-
- RegionExecutionResult result = new RegionExecutionResult();
- TSStatus status;
- if (failingStatus.isEmpty()) {
- status = RpcUtils.getStatus(alreadyExistingStatus);
- result.setAccepted(true);
- } else {
- status = RpcUtils.getStatus(failingStatus);
- result.setAccepted(false);
- }
-
- result.setMessage(status.getMessage());
- result.setStatus(status);
- return result;
+ return processExecutionResultOfInternalCreateSchema(
+ super.visitInternalCreateTimeSeries(node, context),
+ failingStatus,
+ alreadyExistingStatus);
} finally {
context.getRegionWriteValidationRWLock().writeLock().unlock();
}
@@ -562,59 +519,65 @@ public class RegionWriteExecutor {
measurementGroup.removeMeasurements(failingMeasurementMap.keySet());
}
- RegionExecutionResult executionResult =
- super.visitInternalCreateMultiTimeSeries(node, context);
-
- if (failingStatus.isEmpty() && alreadyExistingStatus.isEmpty()) {
- return executionResult;
- }
-
- TSStatus executionStatus = executionResult.getStatus();
+ return processExecutionResultOfInternalCreateSchema(
+ super.visitInternalCreateMultiTimeSeries(node, context),
+ failingStatus,
+ alreadyExistingStatus);
+ } finally {
+ context.getRegionWriteValidationRWLock().writeLock().unlock();
+ }
+ } else {
+ return super.visitInternalCreateMultiTimeSeries(node, context);
+ }
+ }
- // separate the measurement_already_exist exception and other exceptions process,
- // measurement_already_exist exception is acceptable due to concurrent timeseries creation
- if (failingStatus.isEmpty()) {
- if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- if (executionStatus.getSubStatus().get(0).getCode()
- == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
- // there's only measurement_already_exist exception
- alreadyExistingStatus.addAll(executionStatus.getSubStatus());
- } else {
- failingStatus.addAll(executionStatus.getSubStatus());
- }
- } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- failingStatus.add(executionStatus);
- }
+ private RegionExecutionResult processExecutionResultOfInternalCreateSchema(
+ RegionExecutionResult executionResult,
+ List<TSStatus> failingStatus,
+ List<TSStatus> alreadyExistingStatus) {
+ TSStatus executionStatus = executionResult.getStatus();
+
+ // separate the measurement_already_exist exception and other exceptions process,
+ // measurement_already_exist exception is acceptable due to concurrent timeseries creation
+ if (failingStatus.isEmpty()) {
+ if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ if (executionStatus.getSubStatus().get(0).getCode()
+ == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+ // there's only measurement_already_exist exception
+ alreadyExistingStatus.addAll(executionStatus.getSubStatus());
} else {
- if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- if (executionStatus.getSubStatus().get(0).getCode()
- != TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
- failingStatus.addAll(executionStatus.getSubStatus());
- }
- } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- failingStatus.add(executionStatus);
- }
+ failingStatus.addAll(executionStatus.getSubStatus());
}
-
- RegionExecutionResult result = new RegionExecutionResult();
- TSStatus status;
- if (failingStatus.isEmpty()) {
- status = RpcUtils.getStatus(alreadyExistingStatus);
- result.setAccepted(true);
- } else {
- status = RpcUtils.getStatus(failingStatus);
- result.setAccepted(false);
+ } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ failingStatus.add(executionStatus);
+ }
+ } else {
+ if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ if (executionStatus.getSubStatus().get(0).getCode()
+ != TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+ failingStatus.addAll(executionStatus.getSubStatus());
}
-
- result.setMessage(status.getMessage());
- result.setStatus(status);
- return result;
- } finally {
- context.getRegionWriteValidationRWLock().writeLock().unlock();
+ } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ failingStatus.add(executionStatus);
}
+ }
+
+ RegionExecutionResult result = new RegionExecutionResult();
+ TSStatus status;
+ if (failingStatus.isEmpty() && alreadyExistingStatus.isEmpty()) {
+ status = RpcUtils.SUCCESS_STATUS;
+ result.setAccepted(true);
+ } else if (failingStatus.isEmpty()) {
+ status = RpcUtils.getStatus(alreadyExistingStatus);
+ result.setAccepted(true);
} else {
- return super.visitInternalCreateMultiTimeSeries(node, context);
+ status = RpcUtils.getStatus(failingStatus);
+ result.setAccepted(false);
}
+
+ result.setMessage(status.getMessage());
+ result.setStatus(status);
+ return result;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java
index a739020e30..32445aee12 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java
@@ -83,8 +83,10 @@ public class CreateMultiTimeSeriesNode extends WritePlanNode {
measurementGroupMap.put(devicePath, measurementGroup);
}
- measurementGroup.addMeasurement(
- paths.get(i).getMeasurement(), dataTypes.get(i), encodings.get(i), compressors.get(i));
+ if (!measurementGroup.addMeasurement(
+ paths.get(i).getMeasurement(), dataTypes.get(i), encodings.get(i), compressors.get(i))) {
+ continue;
+ }
if (propsList != null) {
measurementGroup.addProps(propsList.get(i));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java
index f5bd2ccc52..5123f0ba0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java
@@ -28,6 +28,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -44,6 +45,8 @@ public class MeasurementGroup {
private List<Map<String, String>> tagsList;
private List<Map<String, String>> attributesList;
+ private final transient Set<String> measurementSet = new HashSet<>();
+
public List<String> getMeasurements() {
return measurements;
}
@@ -80,15 +83,20 @@ public class MeasurementGroup {
return attributesList;
}
- public void addMeasurement(
+ public boolean addMeasurement(
String measurement,
TSDataType dataType,
TSEncoding encoding,
CompressionType compressionType) {
+ if (measurementSet.contains(measurement)) {
+ return false;
+ }
measurements.add(measurement);
+ measurementSet.add(measurement);
dataTypes.add(dataType);
encodings.add(encoding);
compressors.add(compressionType);
+ return true;
}
public void addAlias(String alias) {
@@ -119,29 +127,6 @@ public class MeasurementGroup {
attributesList.add(attributes);
}
- public void removeMeasurement(int index) {
- measurements.remove(index);
- dataTypes.remove(index);
- encodings.remove(index);
- compressors.remove(index);
-
- if (aliasList != null) {
- aliasList.remove(index);
- }
-
- if (propsList != null) {
- propsList.remove(index);
- }
-
- if (tagsList != null) {
- tagsList.remove(index);
- }
-
- if (attributesList != null) {
- attributesList.remove(index);
- }
- }
-
public void removeMeasurements(Set<Integer> indexSet) {
int restSize = this.measurements.size() - indexSet.size();
List<String> measurements = new ArrayList<>(restSize);
@@ -156,6 +141,7 @@ public class MeasurementGroup {
for (int i = 0; i < this.measurements.size(); i++) {
if (indexSet.contains(i)) {
+ measurementSet.remove(this.measurements.get(i));
continue;
}
measurements.add(this.measurements.get(i));
@@ -217,6 +203,7 @@ public class MeasurementGroup {
MeasurementGroup subMeasurementGroup;
subMeasurementGroup = new MeasurementGroup();
subMeasurementGroup.measurements = measurements.subList(startIndex, endIndex);
+ subMeasurementGroup.measurementSet.addAll(subMeasurementGroup.measurements);
subMeasurementGroup.dataTypes = dataTypes.subList(startIndex, endIndex);
subMeasurementGroup.encodings = encodings.subList(startIndex, endIndex);
subMeasurementGroup.compressors = compressors.subList(startIndex, endIndex);
@@ -359,6 +346,7 @@ public class MeasurementGroup {
for (int i = 0; i < size; i++) {
measurements.add(ReadWriteIOUtils.readString(byteBuffer));
}
+ measurementSet.addAll(measurements);
dataTypes = new ArrayList<>();
for (int i = 0; i < size; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
index f19de2323f..29a5f80f62 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.plan.scheduler;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
@@ -32,6 +33,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -87,6 +89,38 @@ public class AsyncPlanNodeSender {
}
}
+ public List<TSStatus> getFailureStatusList() {
+ List<TSStatus> failureStatusList = new ArrayList<>();
+ TSStatus status;
+ for (Map.Entry<Integer, TSendPlanNodeResp> entry : instanceId2RespMap.entrySet()) {
+ status = entry.getValue().getStatus();
+ if (!entry.getValue().accepted) {
+ if (status == null) {
+ logger.warn(
+ "dispatch write failed. message: {}, node {}",
+ entry.getValue().message,
+ instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint());
+ failureStatusList.add(
+ RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR, entry.getValue().getMessage()));
+ } else {
+ logger.warn(
+ "dispatch write failed. status: {}, code: {}, message: {}, node {}",
+ entry.getValue().status,
+ TSStatusCode.representOf(status.code),
+ entry.getValue().message,
+ instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint());
+ failureStatusList.add(status);
+ }
+ } else {
+ // some expected and accepted status except SUCCESS_STATUS need to be returned
+ if (status != null && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ failureStatusList.add(status);
+ }
+ }
+ }
+ return failureStatusList;
+ }
+
public Future<FragInstanceDispatchResult> getResult() {
for (Map.Entry<Integer, TSendPlanNodeResp> entry : instanceId2RespMap.entrySet()) {
if (!entry.getValue().accepted) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 3226884b56..0719b86452 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -177,19 +177,21 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
AsyncPlanNodeSender asyncPlanNodeSender =
new AsyncPlanNodeSender(asyncInternalServiceClientManager, remoteInstances);
asyncPlanNodeSender.sendAll();
+
+ List<TSStatus> dataNodeFailureList = new ArrayList<>();
+
// sync dispatch to local
long localScheduleStartTime = System.nanoTime();
for (FragmentInstance localInstance : localInstances) {
try (SetThreadName threadName = new SetThreadName(localInstance.getId().getFullId())) {
dispatchOneInstance(localInstance);
} catch (FragmentInstanceDispatchException e) {
- return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus()));
+ dataNodeFailureList.add(e.getFailureStatus());
} catch (Throwable t) {
logger.warn("[DispatchFailed]", t);
- return immediateFuture(
- new FragInstanceDispatchResult(
- RpcUtils.getStatus(
- TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage())));
+ dataNodeFailureList.add(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage()));
}
}
PerformanceOverviewMetricsManager.recordScheduleLocalCost(
@@ -205,7 +207,25 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
RpcUtils.getStatus(
TSStatusCode.INTERNAL_SERVER_ERROR, "Interrupted errors: " + e.getMessage())));
}
- return asyncPlanNodeSender.getResult();
+
+ dataNodeFailureList.addAll(asyncPlanNodeSender.getFailureStatusList());
+
+ if (dataNodeFailureList.isEmpty()) {
+ return immediateFuture(new FragInstanceDispatchResult(true));
+ }
+ if (instances.size() == 1) {
+ return immediateFuture(new FragInstanceDispatchResult(dataNodeFailureList.get(0)));
+ } else {
+ List<TSStatus> failureStatusList = new ArrayList<>();
+ for (TSStatus dataNodeFailure : dataNodeFailureList) {
+ if (dataNodeFailure.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ failureStatusList.addAll(dataNodeFailure.getSubStatus());
+ } else {
+ failureStatusList.add(dataNodeFailure);
+ }
+ }
+ return immediateFuture(new FragInstanceDispatchResult(RpcUtils.getStatus(failureStatusList)));
+ }
}
private void dispatchOneInstance(FragmentInstance instance)