You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/16 08:25:55 UTC

[iotdb] 01/02: add redirect available node policy when retry

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/query_retry_redirect
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 204a445aed4a602dcf315f1fb73d383e0eb5b12f
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Aug 16 16:25:00 2022 +0800

    add redirect available node policy when retry
---
 .../iotdb/db/mpp/common/MPPQueryContext.java       | 22 ++++++++---
 .../SimpleFragmentParallelPlanner.java             | 46 +++++++++++++++++++---
 .../db/mpp/plan/scheduler/ClusterScheduler.java    |  6 ++-
 .../scheduler/FragmentInstanceDispatcherImpl.java  |  5 +++
 4 files changed, 67 insertions(+), 12 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index 81e1330eca..08da8f11e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.db.mpp.common;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
 
+import java.util.LinkedList;
+import java.util.List;
+
 /**
  * This class is used to record the context of a query including QueryId, query statement, session
  * info and so on
@@ -37,8 +40,11 @@ public class MPPQueryContext {
   private TEndPoint localInternalEndpoint;
   private ResultNodeContext resultNodeContext;
 
+  private final List<TEndPoint> endPointBlackList;
+
   public MPPQueryContext(QueryId queryId) {
     this.queryId = queryId;
+    this.endPointBlackList = new LinkedList<>();
   }
 
   public MPPQueryContext(
@@ -47,8 +53,8 @@ public class MPPQueryContext {
       SessionInfo session,
       TEndPoint localDataBlockEndpoint,
       TEndPoint localInternalEndpoint) {
+    this(queryId);
     this.sql = sql;
-    this.queryId = queryId;
     this.session = session;
     this.localDataBlockEndpoint = localDataBlockEndpoint;
     this.localInternalEndpoint = localInternalEndpoint;
@@ -63,11 +69,7 @@ public class MPPQueryContext {
       TEndPoint localInternalEndpoint,
       long timeOut,
       long startTime) {
-    this.sql = sql;
-    this.queryId = queryId;
-    this.session = session;
-    this.localDataBlockEndpoint = localDataBlockEndpoint;
-    this.localInternalEndpoint = localInternalEndpoint;
+    this(sql, queryId, session, localDataBlockEndpoint, localInternalEndpoint);
     this.resultNodeContext = new ResultNodeContext(queryId);
     this.timeOut = timeOut;
     this.startTime = startTime;
@@ -116,4 +118,12 @@ public class MPPQueryContext {
   public void setStartTime(long startTime) {
     this.startTime = startTime;
   }
+
+  public void addFailedEndPoint(TEndPoint endPoint) {
+    this.endPointBlackList.add(endPoint);
+  }
+
+  public List<TEndPoint> getEndPointBlackList() {
+    return endPointBlackList;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 854bb15467..7f00ffd9e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.plan.planner.distribution;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
@@ -35,8 +36,12 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -45,6 +50,7 @@ import java.util.Map;
  * into only one FragmentInstance.
  */
 public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
+  private static final Logger logger = LoggerFactory.getLogger(SimpleFragmentParallelPlanner.class);
 
   private SubPlan subPlan;
   private Analysis analysis;
@@ -122,16 +128,46 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
     // TODO: (Chen Rongzhao) need to make the values of ReadConsistencyLevel as static variable or
     // enums
     boolean selectRandomDataNode = "weak".equals(readConsistencyLevel);
+
+    List<TDataNodeLocation> availableDataNodes =
+        filterAvailableTDataNode(regionReplicaSet.getDataNodeLocations());
+    if (availableDataNodes.size() == 0) {
+      String errorMsg =
+          String.format(
+              "all replicas for region[%s] are not available in these DataNodes[%s]",
+              regionReplicaSet.getRegionId(), regionReplicaSet.getDataNodeLocations());
+      throw new IllegalArgumentException(errorMsg);
+    }
+    if (regionReplicaSet.getDataNodeLocationsSize() != availableDataNodes.size()) {
+      logger.info("Available replicas: " + availableDataNodes);
+    }
     int targetIndex;
     if (!selectRandomDataNode || queryContext.getSession() == null) {
       targetIndex = 0;
     } else {
-      targetIndex =
-          (int)
-              (queryContext.getSession().getSessionId()
-                  % regionReplicaSet.getDataNodeLocationsSize());
+      targetIndex = (int) (queryContext.getSession().getSessionId() % availableDataNodes.size());
+    }
+    return availableDataNodes.get(targetIndex);
+  }
+
+  private List<TDataNodeLocation> filterAvailableTDataNode(
+      List<TDataNodeLocation> originalDataNodeList) {
+    List<TDataNodeLocation> result = new LinkedList<>();
+    for (TDataNodeLocation dataNodeLocation : originalDataNodeList) {
+      if (isAvailableDataNode(dataNodeLocation)) {
+        result.add(dataNodeLocation);
+      }
+    }
+    return result;
+  }
+
+  private boolean isAvailableDataNode(TDataNodeLocation dataNodeLocation) {
+    for (TEndPoint endPoint : queryContext.getEndPointBlackList()) {
+      if (endPoint.getIp().equals(dataNodeLocation.internalEndPoint.getIp())) {
+        return false;
+      }
     }
-    return regionReplicaSet.getDataNodeLocations().get(targetIndex);
+    return true;
   }
 
   private void calculateNodeTopologyBetweenInstance() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index 417b6b5e18..10c3139071 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -76,7 +76,11 @@ public class ClusterScheduler implements IScheduler {
     this.queryType = queryType;
     this.dispatcher =
         new FragmentInstanceDispatcherImpl(
-            queryType, executor, writeOperationExecutor, internalServiceClientManager);
+            queryType,
+            queryContext,
+            executor,
+            writeOperationExecutor,
+            internalServiceClientManager);
     if (queryType == QueryType.READ) {
       this.stateTracker =
           new FixedRateFragInsStateTracker(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index f4a756d522..7849cc4201 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
 import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
@@ -66,6 +67,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
   private final ExecutorService executor;
   private final ExecutorService writeOperationExecutor;
   private final QueryType type;
+  private final MPPQueryContext queryContext;
   private final String localhostIpAddr;
   private final int localhostInternalPort;
   private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
@@ -73,10 +75,12 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
 
   public FragmentInstanceDispatcherImpl(
       QueryType type,
+      MPPQueryContext queryContext,
       ExecutorService executor,
       ExecutorService writeOperationExecutor,
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
     this.type = type;
+    this.queryContext = queryContext;
     this.executor = executor;
     this.writeOperationExecutor = writeOperationExecutor;
     this.internalServiceClientManager = internalServiceClientManager;
@@ -187,6 +191,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
       TSStatus status = new TSStatus();
       status.setCode(TSStatusCode.SYNC_CONNECTION_EXCEPTION.getStatusCode());
       status.setMessage("can't connect to node {}" + endPoint);
+      queryContext.addFailedEndPoint(endPoint);
       throw new FragmentInstanceDispatchException(status);
     }
   }