You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/05/16 02:11:28 UTC

[iotdb] 01/01: handle redirection during dispatching

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch handle_redirection_during_dispatching
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 79a642a9f6169bf898bf7dc003fc2a333833aadf
Author: Tian Jiang <jt...@163.com>
AuthorDate: Tue May 16 10:14:21 2023 +0800

    handle redirection during dispatching
---
 .../iotdb/commons/partition/ExecutorType.java      | 18 ++++++-
 .../iotdb/commons/partition/StorageExecutor.java   | 55 ++++++++++++++++++++--
 .../db/mpp/plan/scheduler/AsyncPlanNodeSender.java |  6 +++
 thrift-commons/src/main/thrift/common.thrift       |  1 +
 4 files changed, 75 insertions(+), 5 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
index 648762f7c4b..dafe6dfbc07 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
@@ -20,12 +20,17 @@
 package org.apache.iotdb.commons.partition;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 
-/** The interface is used to indicate where to execute a FragmentInstance */
+/**
+ * The interface is used to indicate where to execute a FragmentInstance
+ */
 public interface ExecutorType {
 
-  /** Indicate if ExecutorType is StorageExecutor */
+  /**
+   * Indicate if ExecutorType is StorageExecutor
+   */
   boolean isStorageExecutor();
 
   TDataNodeLocation getDataNodeLocation();
@@ -33,4 +38,13 @@ public interface ExecutorType {
   default TRegionReplicaSet getRegionReplicaSet() {
     throw new UnsupportedOperationException(getClass().getName());
   }
+
+  /**
+   * Try to update the preferred location to the given EndPoint in the ReplicaSet. Do nothing if the
+   * operation is not supported or the EndPoint is not found within this ReplicaSet.
+   *
+   * @param endPoint associated with the preferred location.
+   */
+  default void updatePreferredLocation(TEndPoint endPoint) {
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java
index a99b4dea07e..5e9774db33a 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java
@@ -19,15 +19,23 @@
 
 package org.apache.iotdb.commons.partition;
 
+import java.util.List;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 
 import javax.annotation.Nonnull;
 
 import java.util.Objects;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-/** StorageExecutor indicates execution of this query need data from StorageEngine */
+/**
+ * StorageExecutor indicates execution of this query need data from StorageEngine
+ */
 public class StorageExecutor implements ExecutorType {
+
+  private static final Logger logger = LoggerFactory.getLogger(StorageExecutor.class);
   private final TRegionReplicaSet regionReplicaSet;
 
   public StorageExecutor(@Nonnull TRegionReplicaSet regionReplicaSet) {
@@ -36,6 +44,13 @@ public class StorageExecutor implements ExecutorType {
 
   @Override
   public TDataNodeLocation getDataNodeLocation() {
+    if (regionReplicaSet.isSetPreferredLocation()) {
+      int preferredLocation = regionReplicaSet.getPreferredLocation();
+      if (preferredLocation >= 0
+          && preferredLocation < regionReplicaSet.getDataNodeLocationsSize()) {
+        return regionReplicaSet.getDataNodeLocations().get(preferredLocation);
+      }
+    }
     return regionReplicaSet.getDataNodeLocations().get(0);
   }
 
@@ -51,8 +66,12 @@ public class StorageExecutor implements ExecutorType {
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
     StorageExecutor that = (StorageExecutor) o;
     return Objects.equals(regionReplicaSet, that.regionReplicaSet);
   }
@@ -61,4 +80,34 @@ public class StorageExecutor implements ExecutorType {
   public int hashCode() {
     return Objects.hash(regionReplicaSet);
   }
+
+  @Override
+  public void updatePreferredLocation(TEndPoint endPoint) {
+    List<TDataNodeLocation> dataNodeLocations = regionReplicaSet.getDataNodeLocations();
+    int i = 0;
+    for (; i < dataNodeLocations.size(); i++) {
+      TDataNodeLocation dataNodeLocation = dataNodeLocations.get(i);
+      if (Objects.equals(dataNodeLocation.getClientRpcEndPoint(), endPoint)) {
+        break;
+      }
+      if (Objects.equals(dataNodeLocation.getDataRegionConsensusEndPoint(), endPoint)) {
+        break;
+      }
+      if (Objects.equals(dataNodeLocation.getSchemaRegionConsensusEndPoint(), endPoint)) {
+        break;
+      }
+      if (Objects.equals(dataNodeLocation.getMPPDataExchangeEndPoint(), endPoint)) {
+        break;
+      }
+      if (Objects.equals(dataNodeLocation.getInternalEndPoint(), endPoint)) {
+        break;
+      }
+    }
+
+    if (i < dataNodeLocations.size()) {
+      regionReplicaSet.setPreferredLocation(i);
+      logger.info("Preferred location of {} has been set to {}", regionReplicaSet,
+          regionReplicaSet.getDataNodeLocations().get(i));
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
index 69bb8d7e35c..a858a25977f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
@@ -131,6 +131,12 @@ public class AsyncPlanNodeSender {
           failureStatusList.add(status);
         }
       }
+
+      if (status != null && status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        int instanceIndex = entry.getKey();
+        FragmentInstance fragmentInstance = instances.get(instanceIndex);
+        fragmentInstance.getExecutorType().updatePreferredLocation(status.getRedirectNode());
+      }
     }
     return failureStatusList;
   }
diff --git a/thrift-commons/src/main/thrift/common.thrift b/thrift-commons/src/main/thrift/common.thrift
index 2c07f7493c0..8cbaee3a725 100644
--- a/thrift-commons/src/main/thrift/common.thrift
+++ b/thrift-commons/src/main/thrift/common.thrift
@@ -57,6 +57,7 @@ struct TTimePartitionSlot {
 struct TRegionReplicaSet {
   1: required TConsensusGroupId regionId
   2: required list<TDataNodeLocation> dataNodeLocations
+  3: optional i32 preferredLocation
 }
 
 struct TNodeResource {