You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/05/16 02:11:27 UTC

[iotdb] branch handle_redirection_during_dispatching created (now 79a642a9f61)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch handle_redirection_during_dispatching
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 79a642a9f61 handle redirection during dispatching

This branch includes the following new commits:

     new 79a642a9f61 handle redirection during dispatching

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: handle redirection during dispatching

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch handle_redirection_during_dispatching
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 79a642a9f6169bf898bf7dc003fc2a333833aadf
Author: Tian Jiang <jt...@163.com>
AuthorDate: Tue May 16 10:14:21 2023 +0800

    handle redirection during dispatching
---
 .../iotdb/commons/partition/ExecutorType.java      | 18 ++++++-
 .../iotdb/commons/partition/StorageExecutor.java   | 55 ++++++++++++++++++++--
 .../db/mpp/plan/scheduler/AsyncPlanNodeSender.java |  6 +++
 thrift-commons/src/main/thrift/common.thrift       |  1 +
 4 files changed, 75 insertions(+), 5 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
index 648762f7c4b..dafe6dfbc07 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
@@ -20,12 +20,17 @@
 package org.apache.iotdb.commons.partition;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 
-/** The interface is used to indicate where to execute a FragmentInstance */
+/**
+ * The interface is used to indicate where to execute a FragmentInstance
+ */
 public interface ExecutorType {
 
-  /** Indicate if ExecutorType is StorageExecutor */
+  /**
+   * Indicate if ExecutorType is StorageExecutor
+   */
   boolean isStorageExecutor();
 
   TDataNodeLocation getDataNodeLocation();
@@ -33,4 +38,13 @@ public interface ExecutorType {
   default TRegionReplicaSet getRegionReplicaSet() {
     throw new UnsupportedOperationException(getClass().getName());
   }
+
+  /**
+   * Try to update the preferred location to the given EndPoint in the ReplicaSet. Do nothing if the
+   * operation is not supported or the EndPoint is not found within this ReplicaSet.
+   *
+   * @param endPoint associated with the preferred location.
+   */
+  default void updatePreferredLocation(TEndPoint endPoint) {
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java
index a99b4dea07e..5e9774db33a 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java
@@ -19,15 +19,23 @@
 
 package org.apache.iotdb.commons.partition;
 
+import java.util.List;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 
 import javax.annotation.Nonnull;
 
 import java.util.Objects;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-/** StorageExecutor indicates execution of this query need data from StorageEngine */
+/**
+ * StorageExecutor indicates execution of this query need data from StorageEngine
+ */
 public class StorageExecutor implements ExecutorType {
+
+  private static final Logger logger = LoggerFactory.getLogger(StorageExecutor.class);
   private final TRegionReplicaSet regionReplicaSet;
 
   public StorageExecutor(@Nonnull TRegionReplicaSet regionReplicaSet) {
@@ -36,6 +44,13 @@ public class StorageExecutor implements ExecutorType {
 
   @Override
   public TDataNodeLocation getDataNodeLocation() {
+    if (regionReplicaSet.isSetPreferredLocation()) {
+      int preferredLocation = regionReplicaSet.getPreferredLocation();
+      if (preferredLocation >= 0
+          && preferredLocation < regionReplicaSet.getDataNodeLocationsSize()) {
+        return regionReplicaSet.getDataNodeLocations().get(preferredLocation);
+      }
+    }
     return regionReplicaSet.getDataNodeLocations().get(0);
   }
 
@@ -51,8 +66,12 @@ public class StorageExecutor implements ExecutorType {
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
     StorageExecutor that = (StorageExecutor) o;
     return Objects.equals(regionReplicaSet, that.regionReplicaSet);
   }
@@ -61,4 +80,34 @@ public class StorageExecutor implements ExecutorType {
   public int hashCode() {
     return Objects.hash(regionReplicaSet);
   }
+
+  @Override
+  public void updatePreferredLocation(TEndPoint endPoint) {
+    List<TDataNodeLocation> dataNodeLocations = regionReplicaSet.getDataNodeLocations();
+    int i = 0;
+    for (; i < dataNodeLocations.size(); i++) {
+      TDataNodeLocation dataNodeLocation = dataNodeLocations.get(i);
+      if (Objects.equals(dataNodeLocation.getClientRpcEndPoint(), endPoint)) {
+        break;
+      }
+      if (Objects.equals(dataNodeLocation.getDataRegionConsensusEndPoint(), endPoint)) {
+        break;
+      }
+      if (Objects.equals(dataNodeLocation.getSchemaRegionConsensusEndPoint(), endPoint)) {
+        break;
+      }
+      if (Objects.equals(dataNodeLocation.getMPPDataExchangeEndPoint(), endPoint)) {
+        break;
+      }
+      if (Objects.equals(dataNodeLocation.getInternalEndPoint(), endPoint)) {
+        break;
+      }
+    }
+
+    if (i < dataNodeLocations.size()) {
+      regionReplicaSet.setPreferredLocation(i);
+      logger.info("Preferred location of {} has been set to {}", regionReplicaSet,
+          regionReplicaSet.getDataNodeLocations().get(i));
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
index 69bb8d7e35c..a858a25977f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
@@ -131,6 +131,12 @@ public class AsyncPlanNodeSender {
           failureStatusList.add(status);
         }
       }
+
+      if (status != null && status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        int instanceIndex = entry.getKey();
+        FragmentInstance fragmentInstance = instances.get(instanceIndex);
+        fragmentInstance.getExecutorType().updatePreferredLocation(status.getRedirectNode());
+      }
     }
     return failureStatusList;
   }
diff --git a/thrift-commons/src/main/thrift/common.thrift b/thrift-commons/src/main/thrift/common.thrift
index 2c07f7493c0..8cbaee3a725 100644
--- a/thrift-commons/src/main/thrift/common.thrift
+++ b/thrift-commons/src/main/thrift/common.thrift
@@ -57,6 +57,7 @@ struct TTimePartitionSlot {
 struct TRegionReplicaSet {
   1: required TConsensusGroupId regionId
   2: required list<TDataNodeLocation> dataNodeLocations
+  3: optional i32 preferredLocation
 }
 
 struct TNodeResource {