You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/05 12:53:30 UTC

[iotdb] 01/01: fix the issue that some FI stuck in FLUSHING states

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/fix_instance_flushing
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit cbc42948a0ed01f5db8e1f5de7e301b2d4d02dbf
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu May 5 20:53:16 2022 +0800

    fix the issue that some FI stuck in FLUSHING states
---
 .../db/mpp/execution/datatransfer/DataBlockManager.java | 15 +++++++++------
 .../db/mpp/execution/datatransfer/SourceHandle.java     |  4 ++--
 .../iotdb/db/mpp/plan/execution/QueryExecution.java     |  4 +++-
 .../db/mpp/plan/scheduler/SimpleQueryTerminator.java    | 17 +++++++++++++----
 .../db/service/thrift/impl/InternalServiceImpl.java     | 12 ++++++++----
 thrift/src/main/thrift/mpp.thrift                       |  1 +
 6 files changed, 36 insertions(+), 17 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/DataBlockManager.java
index c889f18f6d..b6674ee7f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/DataBlockManager.java
@@ -106,13 +106,16 @@ public class DataBlockManager implements IDataBlockManager {
           e.getEndSequenceId(),
           e.getSourceFragmentInstanceId());
       if (!sinkHandles.containsKey(e.getSourceFragmentInstanceId())) {
-        throw new TException(
-            "Source fragment instance not found. Fragment instance ID: "
-                + e.getSourceFragmentInstanceId()
-                + ".");
+        // In some scenario, when the SourceHandle sends the data block ACK event, its upstream may
+        // have already been stopped. For example, in the query whit LimitOperator, the downstream
+        // FragmentInstance may be finished, although the upstream is still working.
+        logger.warn(
+            "received ACK event but target fragment[{}] instance not found.",
+            e.getSourceFragmentInstanceId());
+      } else {
+        ((SinkHandle) sinkHandles.get(e.getSourceFragmentInstanceId()))
+            .acknowledgeTsBlock(e.getStartSequenceId(), e.getEndSequenceId());
       }
-      ((SinkHandle) sinkHandles.get(e.getSourceFragmentInstanceId()))
-          .acknowledgeTsBlock(e.getStartSequenceId(), e.getEndSequenceId());
     }
 
     @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
index 1c8bf70880..2102e98bfb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
@@ -320,6 +320,8 @@ public class SourceHandle implements ISourceHandle {
             TsBlock tsBlock = serde.deserialize(byteBuffer);
             tsBlocks.add(tsBlock);
           }
+          executorService.submit(
+              new SendAcknowledgeDataBlockEventTask(startSequenceId, endSequenceId));
           synchronized (SourceHandle.this) {
             if (aborted) {
               return;
@@ -331,8 +333,6 @@ public class SourceHandle implements ISourceHandle {
               blocked.set(null);
             }
           }
-          executorService.submit(
-              new SendAcknowledgeDataBlockEventTask(startSequenceId, endSequenceId));
           break;
         } catch (Throwable e) {
           logger.error(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 763ca26a82..7f6078087f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -238,7 +238,9 @@ public class QueryExecution implements IQueryExecution {
       ListenableFuture<Void> blocked = resultHandle.isBlocked();
       blocked.get();
       if (resultHandle.isFinished()) {
-        releaseResource();
+        // Once the resultHandle is finished, we should transit the state of this query to FINISHED.
+        // So that the corresponding cleanup work could be triggered.
+        stateMachine.transitionToFinished();
         return Optional.empty();
       }
       return Optional.of(resultHandle.receive());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
index df6f08f7c2..14334733ed 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -58,7 +59,7 @@ public class SimpleQueryTerminator implements IQueryTerminator {
 
   @Override
   public Future<Boolean> terminate() {
-    List<TEndPoint> relatedHost = getRelatedHost(fragmentInstances);
+    List<TEndPoint> relatedHost = getRelatedHost();
 
     return executor.submit(
         () -> {
@@ -66,7 +67,8 @@ public class SimpleQueryTerminator implements IQueryTerminator {
             // TODO (jackie tien) change the port
             try (SyncDataNodeInternalServiceClient client =
                 internalServiceClientManager.borrowClient(endPoint)) {
-              client.cancelQuery(new TCancelQueryReq(queryId.getId()));
+              client.cancelQuery(
+                  new TCancelQueryReq(queryId.getId(), getRelatedFragmentInstances(endPoint)));
             } catch (IOException e) {
               LOGGER.error("can't connect to node {}", endPoint, e);
               return false;
@@ -78,10 +80,17 @@ public class SimpleQueryTerminator implements IQueryTerminator {
         });
   }
 
-  private List<TEndPoint> getRelatedHost(List<FragmentInstance> instances) {
-    return instances.stream()
+  private List<TEndPoint> getRelatedHost() {
+    return fragmentInstances.stream()
         .map(instance -> instance.getHostDataNode().internalEndPoint)
         .distinct()
         .collect(Collectors.toList());
   }
+
+  private List<TFragmentInstanceId> getRelatedFragmentInstances(TEndPoint endPoint) {
+    return fragmentInstances.stream()
+        .filter(instance -> instance.getHostDataNode().internalEndPoint.equals(endPoint))
+        .map(instance -> instance.getId().toThrift())
+        .collect(Collectors.toList());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index f8b5778611..e2a1fff0c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -72,6 +72,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class InternalServiceImpl implements InternalService.Iface {
 
@@ -132,11 +133,14 @@ public class InternalServiceImpl implements InternalService.Iface {
 
   @Override
   public TCancelResp cancelQuery(TCancelQueryReq req) throws TException {
-
-    // TODO need to be implemented and currently in order not to print NotImplementedException log,
-    // we simply return null
+    List<FragmentInstanceId> taskIds =
+        req.getFragmentInstanceIds().stream()
+            .map(FragmentInstanceId::fromThrift)
+            .collect(Collectors.toList());
+    for (FragmentInstanceId taskId : taskIds) {
+      FragmentInstanceManager.getInstance().cancelTask(taskId);
+    }
     return new TCancelResp(true);
-    //    throw new NotImplementedException();
   }
 
   @Override
diff --git a/thrift/src/main/thrift/mpp.thrift b/thrift/src/main/thrift/mpp.thrift
index 0c2acf6cb2..1d33d58cc4 100644
--- a/thrift/src/main/thrift/mpp.thrift
+++ b/thrift/src/main/thrift/mpp.thrift
@@ -105,6 +105,7 @@ struct TFragmentInstanceStateResp {
 
 struct TCancelQueryReq {
   1: required string queryId
+  2: required list<TFragmentInstanceId> fragmentInstanceIds
 }
 
 struct TCancelPlanFragmentReq {