You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/17 02:34:52 UTC

[iotdb] branch xingtanzjr/query_retry_redirect updated: fix the false termination operation in QueryTerminator when query retry

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/query_retry_redirect
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/xingtanzjr/query_retry_redirect by this push:
     new 4caba8f3ad fix the false termination operation in QueryTerminator when query retry
4caba8f3ad is described below

commit 4caba8f3ad54fb19cd9f4c0f687274426a9d024d
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Aug 17 10:34:40 2022 +0800

    fix the false termination operation in QueryTerminator when query retry
---
 .../org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java  | 5 +----
 .../apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java | 8 ++++++--
 2 files changed, 7 insertions(+), 6 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index 10c3139071..1df6c79d61 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -87,10 +87,7 @@ public class ClusterScheduler implements IScheduler {
               stateMachine, scheduledExecutor, instances, internalServiceClientManager);
       this.queryTerminator =
           new SimpleQueryTerminator(
-              scheduledExecutor,
-              queryContext.getQueryId(),
-              instances,
-              internalServiceClientManager);
+              scheduledExecutor, queryContext, instances, internalServiceClientManager);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
index 32b329d640..f452600ad1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
@@ -45,6 +46,7 @@ public class SimpleQueryTerminator implements IQueryTerminator {
   private static final long TERMINATION_GRACE_PERIOD_IN_MS = 1000L;
   protected ScheduledExecutorService scheduledExecutor;
   private final QueryId queryId;
+  private final MPPQueryContext queryContext;
   private List<TEndPoint> relatedHost;
   private Map<TEndPoint, List<TFragmentInstanceId>> ownedFragmentInstance;
 
@@ -53,11 +55,12 @@ public class SimpleQueryTerminator implements IQueryTerminator {
 
   public SimpleQueryTerminator(
       ScheduledExecutorService scheduledExecutor,
-      QueryId queryId,
+      MPPQueryContext queryContext,
       List<FragmentInstance> fragmentInstances,
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
     this.scheduledExecutor = scheduledExecutor;
-    this.queryId = queryId;
+    this.queryId = queryContext.getQueryId();
+    this.queryContext = queryContext;
     this.internalServiceClientManager = internalServiceClientManager;
     calculateParameter(fragmentInstances);
   }
@@ -95,6 +98,7 @@ public class SimpleQueryTerminator implements IQueryTerminator {
   private List<TEndPoint> getRelatedHost(List<FragmentInstance> fragmentInstances) {
     return fragmentInstances.stream()
         .map(instance -> instance.getHostDataNode().internalEndPoint)
+        .filter(endPoint -> !queryContext.getEndPointBlackList().contains(endPoint))
         .distinct()
         .collect(Collectors.toList());
   }