You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/05/16 02:32:31 UTC

[iotdb] branch handle_redirection_during_dispatching updated: dispatch locally if the node is in the replica set

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch handle_redirection_during_dispatching
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/handle_redirection_during_dispatching by this push:
     new 4e46d302784 dispatch locally if the node is in the replica set
4e46d302784 is described below

commit 4e46d302784934956f1c0b6b8d740d1a07a3efff
Author: Tian Jiang <jt...@163.com>
AuthorDate: Tue May 16 10:35:25 2023 +0800

    dispatch locally if the node is in the replica set
---
 .../iotdb/commons/partition/StorageExecutor.java   | 15 +------
 .../apache/iotdb/commons/utils/ThriftUtils.java    | 51 ++++++++++++++++++++++
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 38 +++++++++++-----
 3 files changed, 81 insertions(+), 23 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java
index 5e9774db33a..d6af675d5d0 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import javax.annotation.Nonnull;
 
 import java.util.Objects;
+import org.apache.iotdb.commons.utils.ThriftUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -87,19 +88,7 @@ public class StorageExecutor implements ExecutorType {
     int i = 0;
     for (; i < dataNodeLocations.size(); i++) {
       TDataNodeLocation dataNodeLocation = dataNodeLocations.get(i);
-      if (Objects.equals(dataNodeLocation.getClientRpcEndPoint(), endPoint)) {
-        break;
-      }
-      if (Objects.equals(dataNodeLocation.getDataRegionConsensusEndPoint(), endPoint)) {
-        break;
-      }
-      if (Objects.equals(dataNodeLocation.getSchemaRegionConsensusEndPoint(), endPoint)) {
-        break;
-      }
-      if (Objects.equals(dataNodeLocation.getMPPDataExchangeEndPoint(), endPoint)) {
-        break;
-      }
-      if (Objects.equals(dataNodeLocation.getInternalEndPoint(), endPoint)) {
+      if (ThriftUtils.endPointInLocation(dataNodeLocation, endPoint)) {
         break;
       }
     }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftUtils.java b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftUtils.java
new file mode 100644
index 00000000000..f65ace4ec49
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.utils;
+
+import java.util.Objects;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+
+/**
+ * Utils that extend thrift generated objects.
+ */
+public class ThriftUtils {
+  private ThriftUtils() {
+    // Empty constructor
+  }
+
+  public static boolean endPointInLocation(TDataNodeLocation location, TEndPoint endPoint) {
+    if (Objects.equals(location.getClientRpcEndPoint(), endPoint)) {
+      return true;
+    }
+    if (Objects.equals(location.getDataRegionConsensusEndPoint(), endPoint)) {
+      return true;
+    }
+    if (Objects.equals(location.getSchemaRegionConsensusEndPoint(), endPoint)) {
+      return true;
+    }
+    if (Objects.equals(location.getMPPDataExchangeEndPoint(), endPoint)) {
+      return true;
+    }
+    if (Objects.equals(location.getInternalEndPoint(), endPoint)) {
+      return true;
+    }
+    return false;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index cb64acaf6bd..d362d1fbfc2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -19,14 +19,19 @@
 
 package org.apache.iotdb.db.mpp.plan.scheduler;
 
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.partition.QueryExecutor;
+import org.apache.iotdb.commons.partition.StorageExecutor;
 import org.apache.iotdb.commons.service.metric.enums.PerformanceOverviewMetrics;
+import org.apache.iotdb.commons.utils.ThriftUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
@@ -69,8 +74,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
   private final ExecutorService writeOperationExecutor;
   private final QueryType type;
   private final MPPQueryContext queryContext;
-  private final String localhostIpAddr;
-  private final int localhostInternalPort;
+  private final TEndPoint localEndPoint;
   private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
       syncInternalServiceClientManager;
   private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
@@ -94,8 +98,9 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
     this.writeOperationExecutor = writeOperationExecutor;
     this.syncInternalServiceClientManager = syncInternalServiceClientManager;
     this.asyncInternalServiceClientManager = asyncInternalServiceClientManager;
-    this.localhostIpAddr = IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
-    this.localhostInternalPort = IoTDBDescriptor.getInstance().getConfig().getInternalPort();
+    this.localEndPoint = new TEndPoint(
+        IoTDBDescriptor.getInstance().getConfig().getInternalAddress(),
+        IoTDBDescriptor.getInstance().getConfig().getInternalPort());
   }
 
   @Override
@@ -170,8 +175,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
     List<FragmentInstance> localInstances = new ArrayList<>();
     List<FragmentInstance> remoteInstances = new ArrayList<>();
     for (FragmentInstance instance : instances) {
-      TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
-      if (isDispatchedToLocal(endPoint)) {
+      if (isDispatchedToLocal(instance)) {
         localInstances.add(instance);
       } else {
         remoteInstances.add(instance);
@@ -237,15 +241,28 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
   private void dispatchOneInstance(FragmentInstance instance)
       throws FragmentInstanceDispatchException {
     TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
-    if (isDispatchedToLocal(endPoint)) {
+    if (isDispatchedToLocal(instance)) {
       dispatchLocally(instance);
     } else {
       dispatchRemote(instance, endPoint);
     }
   }
 
-  private boolean isDispatchedToLocal(TEndPoint endPoint) {
-    return this.localhostIpAddr.equals(endPoint.getIp()) && localhostInternalPort == endPoint.port;
+  private boolean isDispatchedToLocal(FragmentInstance instance) {
+    if (instance.getExecutorType().isStorageExecutor()) {
+      // For write requests, if the local node is in the replica set, dispatch locally.
+      // Because the consensus layer knows more accurate about which is the preferred location
+      // (leader/master/primary) than the PartitionCache.
+      TRegionReplicaSet regionReplicaSet = instance.getExecutorType().getRegionReplicaSet();
+      for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
+        if (ThriftUtils.endPointInLocation(dataNodeLocation, localEndPoint)) {
+          return true;
+        }
+      }
+      return false;
+    } else {
+      return localEndPoint.equals(instance.getHostDataNode().getInternalEndPoint());
+    }
   }
 
   private void dispatchRemote(FragmentInstance instance, TEndPoint endPoint)
@@ -381,5 +398,6 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
   }
 
   @Override
-  public void abort() {}
+  public void abort() {
+  }
 }