You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/05/16 02:11:28 UTC
[iotdb] 01/01: handle redirection during dispatching
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch handle_redirection_during_dispatching
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 79a642a9f6169bf898bf7dc003fc2a333833aadf
Author: Tian Jiang <jt...@163.com>
AuthorDate: Tue May 16 10:14:21 2023 +0800
handle redirection during dispatching
---
.../iotdb/commons/partition/ExecutorType.java | 18 ++++++-
.../iotdb/commons/partition/StorageExecutor.java | 55 ++++++++++++++++++++--
.../db/mpp/plan/scheduler/AsyncPlanNodeSender.java | 6 +++
thrift-commons/src/main/thrift/common.thrift | 1 +
4 files changed, 75 insertions(+), 5 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
index 648762f7c4b..dafe6dfbc07 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
@@ -20,12 +20,17 @@
package org.apache.iotdb.commons.partition;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-/** The interface is used to indicate where to execute a FragmentInstance */
+/**
+ * The interface is used to indicate where to execute a FragmentInstance
+ */
public interface ExecutorType {
- /** Indicate if ExecutorType is StorageExecutor */
+ /**
+ * Indicate if ExecutorType is StorageExecutor
+ */
boolean isStorageExecutor();
TDataNodeLocation getDataNodeLocation();
@@ -33,4 +38,13 @@ public interface ExecutorType {
default TRegionReplicaSet getRegionReplicaSet() {
throw new UnsupportedOperationException(getClass().getName());
}
+
+ /**
+ * Try to update the preferred location to the given EndPoint in the ReplicaSet. Do nothing if the
+ * operation is not supported or the EndPoint is not found within this ReplicaSet.
+ *
+ * @param endPoint associated with the preferred location.
+ */
+ default void updatePreferredLocation(TEndPoint endPoint) {
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java
index a99b4dea07e..5e9774db33a 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java
@@ -19,15 +19,23 @@
package org.apache.iotdb.commons.partition;
+import java.util.List;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import javax.annotation.Nonnull;
import java.util.Objects;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-/** StorageExecutor indicates execution of this query need data from StorageEngine */
+/**
+ * StorageExecutor indicates execution of this query need data from StorageEngine
+ */
public class StorageExecutor implements ExecutorType {
+
+ private static final Logger logger = LoggerFactory.getLogger(StorageExecutor.class);
private final TRegionReplicaSet regionReplicaSet;
public StorageExecutor(@Nonnull TRegionReplicaSet regionReplicaSet) {
@@ -36,6 +44,13 @@ public class StorageExecutor implements ExecutorType {
@Override
public TDataNodeLocation getDataNodeLocation() {
+ if (regionReplicaSet.isSetPreferredLocation()) {
+ int preferredLocation = regionReplicaSet.getPreferredLocation();
+ if (preferredLocation >= 0
+ && preferredLocation < regionReplicaSet.getDataNodeLocationsSize()) {
+ return regionReplicaSet.getDataNodeLocations().get(preferredLocation);
+ }
+ }
return regionReplicaSet.getDataNodeLocations().get(0);
}
@@ -51,8 +66,12 @@ public class StorageExecutor implements ExecutorType {
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
StorageExecutor that = (StorageExecutor) o;
return Objects.equals(regionReplicaSet, that.regionReplicaSet);
}
@@ -61,4 +80,34 @@ public class StorageExecutor implements ExecutorType {
public int hashCode() {
return Objects.hash(regionReplicaSet);
}
+
+ @Override
+ public void updatePreferredLocation(TEndPoint endPoint) {
+ List<TDataNodeLocation> dataNodeLocations = regionReplicaSet.getDataNodeLocations();
+ int i = 0;
+ for (; i < dataNodeLocations.size(); i++) {
+ TDataNodeLocation dataNodeLocation = dataNodeLocations.get(i);
+ if (Objects.equals(dataNodeLocation.getClientRpcEndPoint(), endPoint)) {
+ break;
+ }
+ if (Objects.equals(dataNodeLocation.getDataRegionConsensusEndPoint(), endPoint)) {
+ break;
+ }
+ if (Objects.equals(dataNodeLocation.getSchemaRegionConsensusEndPoint(), endPoint)) {
+ break;
+ }
+ if (Objects.equals(dataNodeLocation.getMPPDataExchangeEndPoint(), endPoint)) {
+ break;
+ }
+ if (Objects.equals(dataNodeLocation.getInternalEndPoint(), endPoint)) {
+ break;
+ }
+ }
+
+ if (i < dataNodeLocations.size()) {
+ regionReplicaSet.setPreferredLocation(i);
+ logger.info("Preferred location of {} has been set to {}", regionReplicaSet,
+ regionReplicaSet.getDataNodeLocations().get(i));
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
index 69bb8d7e35c..a858a25977f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
@@ -131,6 +131,12 @@ public class AsyncPlanNodeSender {
failureStatusList.add(status);
}
}
+
+ if (status != null && status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ int instanceIndex = entry.getKey();
+ FragmentInstance fragmentInstance = instances.get(instanceIndex);
+ fragmentInstance.getExecutorType().updatePreferredLocation(status.getRedirectNode());
+ }
}
return failureStatusList;
}
diff --git a/thrift-commons/src/main/thrift/common.thrift b/thrift-commons/src/main/thrift/common.thrift
index 2c07f7493c0..8cbaee3a725 100644
--- a/thrift-commons/src/main/thrift/common.thrift
+++ b/thrift-commons/src/main/thrift/common.thrift
@@ -57,6 +57,7 @@ struct TTimePartitionSlot {
struct TRegionReplicaSet {
1: required TConsensusGroupId regionId
2: required list<TDataNodeLocation> dataNodeLocations
+ 3: optional i32 preferredLocation
}
struct TNodeResource {