You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/16 08:25:55 UTC
[iotdb] 01/02: add redirect available node policy when retry
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/query_retry_redirect
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 204a445aed4a602dcf315f1fb73d383e0eb5b12f
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Aug 16 16:25:00 2022 +0800
add redirect available node policy when retry
---
.../iotdb/db/mpp/common/MPPQueryContext.java | 22 ++++++++---
.../SimpleFragmentParallelPlanner.java | 46 +++++++++++++++++++---
.../db/mpp/plan/scheduler/ClusterScheduler.java | 6 ++-
.../scheduler/FragmentInstanceDispatcherImpl.java | 5 +++
4 files changed, 67 insertions(+), 12 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index 81e1330eca..08da8f11e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.db.mpp.common;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import java.util.LinkedList;
+import java.util.List;
+
/**
* This class is used to record the context of a query including QueryId, query statement, session
* info and so on
@@ -37,8 +40,11 @@ public class MPPQueryContext {
private TEndPoint localInternalEndpoint;
private ResultNodeContext resultNodeContext;
+ private final List<TEndPoint> endPointBlackList;
+
public MPPQueryContext(QueryId queryId) {
this.queryId = queryId;
+ this.endPointBlackList = new LinkedList<>();
}
public MPPQueryContext(
@@ -47,8 +53,8 @@ public class MPPQueryContext {
SessionInfo session,
TEndPoint localDataBlockEndpoint,
TEndPoint localInternalEndpoint) {
+ this(queryId);
this.sql = sql;
- this.queryId = queryId;
this.session = session;
this.localDataBlockEndpoint = localDataBlockEndpoint;
this.localInternalEndpoint = localInternalEndpoint;
@@ -63,11 +69,7 @@ public class MPPQueryContext {
TEndPoint localInternalEndpoint,
long timeOut,
long startTime) {
- this.sql = sql;
- this.queryId = queryId;
- this.session = session;
- this.localDataBlockEndpoint = localDataBlockEndpoint;
- this.localInternalEndpoint = localInternalEndpoint;
+ this(sql, queryId, session, localDataBlockEndpoint, localInternalEndpoint);
this.resultNodeContext = new ResultNodeContext(queryId);
this.timeOut = timeOut;
this.startTime = startTime;
@@ -116,4 +118,12 @@ public class MPPQueryContext {
public void setStartTime(long startTime) {
this.startTime = startTime;
}
+
+ public void addFailedEndPoint(TEndPoint endPoint) {
+ this.endPointBlackList.add(endPoint);
+ }
+
+ public List<TEndPoint> getEndPointBlackList() {
+ return endPointBlackList;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 854bb15467..7f00ffd9e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.plan.planner.distribution;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
@@ -35,8 +36,12 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -45,6 +50,7 @@ import java.util.Map;
* into only one FragmentInstance.
*/
public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
+ private static final Logger logger = LoggerFactory.getLogger(SimpleFragmentParallelPlanner.class);
private SubPlan subPlan;
private Analysis analysis;
@@ -122,16 +128,46 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
// TODO: (Chen Rongzhao) need to make the values of ReadConsistencyLevel as static variable or
// enums
boolean selectRandomDataNode = "weak".equals(readConsistencyLevel);
+
+ List<TDataNodeLocation> availableDataNodes =
+ filterAvailableTDataNode(regionReplicaSet.getDataNodeLocations());
+ if (availableDataNodes.size() == 0) {
+ String errorMsg =
+ String.format(
+ "all replicas for region[%s] are not available in these DataNodes[%s]",
+ regionReplicaSet.getRegionId(), regionReplicaSet.getDataNodeLocations());
+ throw new IllegalArgumentException(errorMsg);
+ }
+ if (regionReplicaSet.getDataNodeLocationsSize() != availableDataNodes.size()) {
+ logger.info("Available replicas: " + availableDataNodes);
+ }
int targetIndex;
if (!selectRandomDataNode || queryContext.getSession() == null) {
targetIndex = 0;
} else {
- targetIndex =
- (int)
- (queryContext.getSession().getSessionId()
- % regionReplicaSet.getDataNodeLocationsSize());
+ targetIndex = (int) (queryContext.getSession().getSessionId() % availableDataNodes.size());
+ }
+ return availableDataNodes.get(targetIndex);
+ }
+
+ private List<TDataNodeLocation> filterAvailableTDataNode(
+ List<TDataNodeLocation> originalDataNodeList) {
+ List<TDataNodeLocation> result = new LinkedList<>();
+ for (TDataNodeLocation dataNodeLocation : originalDataNodeList) {
+ if (isAvailableDataNode(dataNodeLocation)) {
+ result.add(dataNodeLocation);
+ }
+ }
+ return result;
+ }
+
+ private boolean isAvailableDataNode(TDataNodeLocation dataNodeLocation) {
+ for (TEndPoint endPoint : queryContext.getEndPointBlackList()) {
+ if (endPoint.getIp().equals(dataNodeLocation.internalEndPoint.getIp())) {
+ return false;
+ }
}
- return regionReplicaSet.getDataNodeLocations().get(targetIndex);
+ return true;
}
private void calculateNodeTopologyBetweenInstance() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index 417b6b5e18..10c3139071 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -76,7 +76,11 @@ public class ClusterScheduler implements IScheduler {
this.queryType = queryType;
this.dispatcher =
new FragmentInstanceDispatcherImpl(
- queryType, executor, writeOperationExecutor, internalServiceClientManager);
+ queryType,
+ queryContext,
+ executor,
+ writeOperationExecutor,
+ internalServiceClientManager);
if (queryType == QueryType.READ) {
this.stateTracker =
new FixedRateFragInsStateTracker(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index f4a756d522..7849cc4201 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
@@ -66,6 +67,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
private final ExecutorService executor;
private final ExecutorService writeOperationExecutor;
private final QueryType type;
+ private final MPPQueryContext queryContext;
private final String localhostIpAddr;
private final int localhostInternalPort;
private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
@@ -73,10 +75,12 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
public FragmentInstanceDispatcherImpl(
QueryType type,
+ MPPQueryContext queryContext,
ExecutorService executor,
ExecutorService writeOperationExecutor,
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
this.type = type;
+ this.queryContext = queryContext;
this.executor = executor;
this.writeOperationExecutor = writeOperationExecutor;
this.internalServiceClientManager = internalServiceClientManager;
@@ -187,6 +191,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
TSStatus status = new TSStatus();
status.setCode(TSStatusCode.SYNC_CONNECTION_EXCEPTION.getStatusCode());
status.setMessage("can't connect to node {}" + endPoint);
+ queryContext.addFailedEndPoint(endPoint);
throw new FragmentInstanceDispatchException(status);
}
}