You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/05/16 02:32:31 UTC
[iotdb] branch handle_redirection_during_dispatching updated: dispatch locally if the node is in the replica set
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch handle_redirection_during_dispatching
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/handle_redirection_during_dispatching by this push:
new 4e46d302784 dispatch locally if the node is in the replica set
4e46d302784 is described below
commit 4e46d302784934956f1c0b6b8d740d1a07a3efff
Author: Tian Jiang <jt...@163.com>
AuthorDate: Tue May 16 10:35:25 2023 +0800
dispatch locally if the node is in the replica set
---
.../iotdb/commons/partition/StorageExecutor.java | 15 +------
.../apache/iotdb/commons/utils/ThriftUtils.java | 51 ++++++++++++++++++++++
.../scheduler/FragmentInstanceDispatcherImpl.java | 38 +++++++++++-----
3 files changed, 81 insertions(+), 23 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java
index 5e9774db33a..d6af675d5d0 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import javax.annotation.Nonnull;
import java.util.Objects;
+import org.apache.iotdb.commons.utils.ThriftUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,19 +88,7 @@ public class StorageExecutor implements ExecutorType {
int i = 0;
for (; i < dataNodeLocations.size(); i++) {
TDataNodeLocation dataNodeLocation = dataNodeLocations.get(i);
- if (Objects.equals(dataNodeLocation.getClientRpcEndPoint(), endPoint)) {
- break;
- }
- if (Objects.equals(dataNodeLocation.getDataRegionConsensusEndPoint(), endPoint)) {
- break;
- }
- if (Objects.equals(dataNodeLocation.getSchemaRegionConsensusEndPoint(), endPoint)) {
- break;
- }
- if (Objects.equals(dataNodeLocation.getMPPDataExchangeEndPoint(), endPoint)) {
- break;
- }
- if (Objects.equals(dataNodeLocation.getInternalEndPoint(), endPoint)) {
+ if (ThriftUtils.endPointInLocation(dataNodeLocation, endPoint)) {
break;
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftUtils.java b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftUtils.java
new file mode 100644
index 00000000000..f65ace4ec49
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.utils;
+
+import java.util.Objects;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+
+/**
+ * Utils that extend thrift generated objects.
+ */
+public class ThriftUtils {
+ private ThriftUtils() {
+ // Empty constructor
+ }
+
+ public static boolean endPointInLocation(TDataNodeLocation location, TEndPoint endPoint) {
+ if (Objects.equals(location.getClientRpcEndPoint(), endPoint)) {
+ return true;
+ }
+ if (Objects.equals(location.getDataRegionConsensusEndPoint(), endPoint)) {
+ return true;
+ }
+ if (Objects.equals(location.getSchemaRegionConsensusEndPoint(), endPoint)) {
+ return true;
+ }
+ if (Objects.equals(location.getMPPDataExchangeEndPoint(), endPoint)) {
+ return true;
+ }
+ if (Objects.equals(location.getInternalEndPoint(), endPoint)) {
+ return true;
+ }
+ return false;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index cb64acaf6bd..d362d1fbfc2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -19,14 +19,19 @@
package org.apache.iotdb.db.mpp.plan.scheduler;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.partition.QueryExecutor;
+import org.apache.iotdb.commons.partition.StorageExecutor;
import org.apache.iotdb.commons.service.metric.enums.PerformanceOverviewMetrics;
+import org.apache.iotdb.commons.utils.ThriftUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
@@ -69,8 +74,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
private final ExecutorService writeOperationExecutor;
private final QueryType type;
private final MPPQueryContext queryContext;
- private final String localhostIpAddr;
- private final int localhostInternalPort;
+ private final TEndPoint localEndPoint;
private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
syncInternalServiceClientManager;
private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
@@ -94,8 +98,9 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
this.writeOperationExecutor = writeOperationExecutor;
this.syncInternalServiceClientManager = syncInternalServiceClientManager;
this.asyncInternalServiceClientManager = asyncInternalServiceClientManager;
- this.localhostIpAddr = IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
- this.localhostInternalPort = IoTDBDescriptor.getInstance().getConfig().getInternalPort();
+ this.localEndPoint = new TEndPoint(
+ IoTDBDescriptor.getInstance().getConfig().getInternalAddress(),
+ IoTDBDescriptor.getInstance().getConfig().getInternalPort());
}
@Override
@@ -170,8 +175,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
List<FragmentInstance> localInstances = new ArrayList<>();
List<FragmentInstance> remoteInstances = new ArrayList<>();
for (FragmentInstance instance : instances) {
- TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
- if (isDispatchedToLocal(endPoint)) {
+ if (isDispatchedToLocal(instance)) {
localInstances.add(instance);
} else {
remoteInstances.add(instance);
@@ -237,15 +241,28 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
private void dispatchOneInstance(FragmentInstance instance)
throws FragmentInstanceDispatchException {
TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
- if (isDispatchedToLocal(endPoint)) {
+ if (isDispatchedToLocal(instance)) {
dispatchLocally(instance);
} else {
dispatchRemote(instance, endPoint);
}
}
- private boolean isDispatchedToLocal(TEndPoint endPoint) {
- return this.localhostIpAddr.equals(endPoint.getIp()) && localhostInternalPort == endPoint.port;
+ private boolean isDispatchedToLocal(FragmentInstance instance) {
+ if (instance.getExecutorType().isStorageExecutor()) {
+ // For write requests, if the local node is in the replica set, dispatch locally.
+ // Because the consensus layer knows more accurate about which is the preferred location
+ // (leader/master/primary) than the PartitionCache.
+ TRegionReplicaSet regionReplicaSet = instance.getExecutorType().getRegionReplicaSet();
+ for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
+ if (ThriftUtils.endPointInLocation(dataNodeLocation, localEndPoint)) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return localEndPoint.equals(instance.getHostDataNode().getInternalEndPoint());
+ }
}
private void dispatchRemote(FragmentInstance instance, TEndPoint endPoint)
@@ -381,5 +398,6 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
}
@Override
- public void abort() {}
+ public void abort() {
+ }
}