You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/19 07:03:55 UTC

[iotdb] 02/02: fix an issue that SourceHandle may become Blocked

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp_issues
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fb7bf77ccc67feb8e4734e8a28f123716e23d91c
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Apr 19 15:03:17 2022 +0800

    fix an issue that SourceHandle may become Blocked
---
 .../iotdb/db/mpp/buffer/DataBlockManager.java      | 11 ++++++-----
 .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java | 14 +++++---------
 .../apache/iotdb/db/mpp/buffer/SourceHandle.java   | 16 +++++++++-------
 .../iotdb/db/mpp/execution/IQueryExecution.java    |  2 ++
 .../iotdb/db/mpp/execution/QueryExecution.java     | 22 ++++++++++++++++++++--
 .../db/mpp/execution/config/ConfigExecution.java   |  3 +++
 .../iotdb/db/query/control/SessionManager.java     |  7 ++++++-
 .../thrift/impl/DataNodeTSIServiceImpl.java        | 11 ++++++++++-
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   |  6 +++---
 9 files changed, 64 insertions(+), 28 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index c6c642b1ef..3177888ba8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -138,7 +138,7 @@ public class DataBlockManager implements IDataBlockManager {
 
     @Override
     public void onEndOfDataBlockEvent(EndOfDataBlockEvent e) throws TException {
-      logger.debug(
+      logger.info(
           "End of data block event received, for plan node {} of {} from {}.",
           e.getTargetPlanNodeId(),
           e.getTargetFragmentInstanceId(),
@@ -175,11 +175,12 @@ public class DataBlockManager implements IDataBlockManager {
               .containsKey(sourceHandle.getLocalPlanNodeId())) {
         logger.info(
             "Resources of finished source handle {} has already been released", sourceHandle);
+      } else {
+        sourceHandles
+            .get(sourceHandle.getLocalFragmentInstanceId())
+            .remove(sourceHandle.getLocalPlanNodeId());
       }
-      sourceHandles
-          .get(sourceHandle.getLocalFragmentInstanceId())
-          .remove(sourceHandle.getLocalPlanNodeId());
-      if (sourceHandles.get(sourceHandle.getLocalFragmentInstanceId()).isEmpty()) {
+      if (sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId()) && sourceHandles.get(sourceHandle.getLocalFragmentInstanceId()).isEmpty()) {
         sourceHandles.remove(sourceHandle.getLocalFragmentInstanceId());
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index b0204bd80e..fc3cd825fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -160,11 +160,7 @@ public class SinkHandle implements ISinkHandle {
   }
 
   private void sendEndOfDataBlockEvent() throws TException {
-    logger.info(
-        "Send end of data block event to plan node {} of {}. {}",
-        remotePlanNodeId,
-        remoteFragmentInstanceId,
-        Thread.currentThread().getName());
+    logger.info("[SinkHandle {}]: send end of data block event.", this.getRemotePlanNodeId());
     int attempt = 0;
     EndOfDataBlockEvent endOfDataBlockEvent =
         new EndOfDataBlockEvent(
@@ -194,7 +190,7 @@ public class SinkHandle implements ISinkHandle {
 
   @Override
   public void close() throws IOException {
-    logger.info("Sink handle {} is being closed.", this);
+    logger.info("[SinkHandle {}]: is being closed.", this.getRemotePlanNodeId());
     if (throwable != null) {
       throw new IOException(throwable);
     }
@@ -211,12 +207,12 @@ public class SinkHandle implements ISinkHandle {
     } catch (TException e) {
       throw new IOException(e);
     }
-    logger.info("Sink handle {} is closed.", this);
+    logger.info("[SinkHandle {}] is closed.", this.getRemotePlanNodeId());
   }
 
   @Override
   public void abort() {
-    logger.info("Sink handle {} is being aborted.", this);
+    logger.info("[SinkHandle {}]: is being aborted.", this.getRemotePlanNodeId());
     synchronized (this) {
       sequenceIdToTsBlock.clear();
       closed = true;
@@ -228,7 +224,7 @@ public class SinkHandle implements ISinkHandle {
       }
     }
     sinkHandleListener.onAborted(this);
-    logger.info("Sink handle {} is aborted", this);
+    logger.info("[SinkHandle {}]: is aborted.", this.getRemotePlanNodeId());
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index d1f1df86d8..01dc2f0fb1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -211,9 +211,14 @@ public class SourceHandle implements ISourceHandle {
   }
 
   synchronized void setNoMoreTsBlocks(int lastSequenceId) {
-    logger.info("[SourceHandle {}-{}]: No more TsBlock. {} ", localFragmentInstanceId, localPlanNodeId, remoteFragmentInstanceId);
+    logger.info("[SourceHandle {}]: No more TsBlock. {} ", localPlanNodeId, remoteFragmentInstanceId);
     this.lastSequenceId = lastSequenceId;
-    noMoreTsBlocks = true;
+    if (!blocked.isDone() && currSequenceId - 1 == lastSequenceId) {
+      logger.info("[SourceHandle {}]: all blocks are consumed. set blocked to null.", localPlanNodeId);
+      blocked.set(null);
+    } else {
+      logger.info("[SourceHandle {}]: No need to set blocked. Blocked: {}, Consumed: {} ", localPlanNodeId, blocked.isDone(), currSequenceId - 1 == lastSequenceId);
+    }
   }
 
   synchronized void updatePendingDataBlockInfo(int startSequenceId, List<Long> dataBlockSizes) {
@@ -225,7 +230,7 @@ public class SourceHandle implements ISourceHandle {
 
   @Override
   public synchronized void close() {
-    logger.info("[SourceHandle {}-{}]: closed ", localFragmentInstanceId, localPlanNodeId);
+    logger.info("[SourceHandle {}]: closed ", localPlanNodeId);
     if (closed) {
       return;
     }
@@ -247,10 +252,7 @@ public class SourceHandle implements ISourceHandle {
   @Override
   public boolean isFinished() {
     return throwable == null
-        && noMoreTsBlocks
-        && numActiveGetDataBlocksTask == 0
-        && currSequenceId - 1 == lastSequenceId
-        && sequenceIdToTsBlock.isEmpty();
+        && currSequenceId - 1 == lastSequenceId;
   }
 
   String getRemoteHostname() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
index 2e0bd9c76d..af9633ee89 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
@@ -28,6 +28,8 @@ public interface IQueryExecution {
 
   void stop();
 
+  void stopAndCleanup();
+
   ExecutionResult getStatus();
 
   TsBlock getBatchResult();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 9dcb3e5fc3..380361c3fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -110,6 +110,10 @@ public class QueryExecution implements IQueryExecution {
             return;
           }
           this.stop();
+          // TODO: (xingtanzjr) If the query is in abnormal state, the releaseResource() should be invoked
+          if (state == QueryState.FAILED || state == QueryState.ABORTED || state == QueryState.CANCELED) {
+            releaseResource();
+          }
         });
   }
 
@@ -160,18 +164,28 @@ public class QueryExecution implements IQueryExecution {
     this.distributedPlan = planner.planFragments();
   }
 
-  /** Abort the query and do cleanup work including QuerySchedule aborting and resource releasing */
+  // Stop the workers for this query
   public void stop() {
     if (this.scheduler != null) {
       this.scheduler.stop();
     }
+  }
+
+  // Stop the query and clean up all the resources this query occupied
+  public void stopAndCleanup() {
+    stop();
     releaseResource();
   }
 
   /** Release the resources that current QueryExecution hold. */
   private void releaseResource() {
     // close ResultHandle to unblock client's getResult request
-    if (resultHandle != null && !resultHandle.isClosed()) {
+    // Actually, we should not close the ResultHandle when the QueryExecution is Finished.
+    // There are only two scenarios where the ResultHandle should be closed:
+    //   1. The client fetch all the result and the ResultHandle is finished.
+    //   2. The client's connection is closed that all owned QueryExecution should be cleaned up
+    if (resultHandle != null && resultHandle.isFinished()) {
+      LOG.info("[QueryExecution {}]:  result handle is closed", context.getQueryId());
       resultHandle.close();
     }
   }
@@ -186,12 +200,16 @@ public class QueryExecution implements IQueryExecution {
   @Override
   public TsBlock getBatchResult() {
     try {
+      if (resultHandle.isClosed() || resultHandle.isFinished()) {
+        return null;
+      }
       LOG.info("[QueryExecution {}]: try to get result.", context.getQueryId());
       ListenableFuture<Void> blocked = resultHandle.isBlocked();
       blocked.get();
       LOG.info("[QueryExecution {}]:  unblock. Cancelled: {}, Done: {}", context.getQueryId(), blocked.isCancelled(), blocked.isDone());
       if (resultHandle.isFinished()) {
         LOG.info("[QueryExecution {}]:  result is null", context.getQueryId());
+        releaseResource();
         return null;
       }
       return resultHandle.receive();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
index be1c95871f..b134d4896b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
@@ -103,6 +103,9 @@ public class ConfigExecution implements IQueryExecution {
   @Override
   public void stop() {}
 
+  @Override
+  public void stopAndCleanup() {}
+
   @Override
   public ExecutionResult getStatus() {
     try {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 20aa1d1b86..a8a0faca92 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -46,6 +46,7 @@ import java.util.TimeZone;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
 
@@ -215,6 +216,10 @@ public class SessionManager {
   }
 
   public boolean releaseSessionResource(long sessionId) {
+    return releaseSessionResource(sessionId, this::releaseQueryResourceNoExceptions);
+  }
+
+  public boolean releaseSessionResource(long sessionId, Consumer<Long> releaseQueryResource) {
     sessionIdToZoneId.remove(sessionId);
     sessionIdToClientVersion.remove(sessionId);
 
@@ -224,7 +229,7 @@ public class SessionManager {
         Set<Long> queryIdSet = statementIdToQueryId.remove(statementId);
         if (queryIdSet != null) {
           for (Long queryId : queryIdSet) {
-            releaseQueryResourceNoExceptions(queryId);
+            releaseQueryResource.accept(queryId);
           }
         }
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index 2c13c68ad7..3de0a28cc2 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -562,7 +562,7 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
       ExecutionResult result =
           COORDINATOR.execute(
               statement,
-              new QueryId(String.valueOf(queryId)),
+              genQueryId(queryId),
               SESSION_MANAGER.getSessionInfo(req.sessionId),
               "",
               PARTITION_FETCHER,
@@ -756,8 +756,17 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
   public void handleClientExit() {
     Long sessionId = SESSION_MANAGER.getCurrSessionId();
     if (sessionId != null) {
+      SESSION_MANAGER.releaseSessionResource(sessionId, this::cleanupQueryExecution);
       TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
       closeSession(req);
     }
   }
+
+  private void cleanupQueryExecution(Long queryId) {
+    COORDINATOR.getQueryExecution(genQueryId(queryId)).stopAndCleanup();
+  }
+
+  private QueryId genQueryId(long id) {
+    return new QueryId(String.valueOf(id));
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index aafc3ffd00..aca0b37906 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -195,10 +195,10 @@ public class QueryDataSetUtils {
 
     int rowCount = 0;
     int[] valueOccupation = new int[columnNum];
-    while (rowCount < fetchSize && queryExecution.hasNextResult()) {
-      LOG.info("[ToTSDataSet {}] hasNext return true.", queryExecution);
+    while (rowCount < fetchSize) {
+      LOG.info("[ToTSDataSet {}] invoke queryExecution.getBatchResult.", queryExecution);
       TsBlock tsBlock = queryExecution.getBatchResult();
-      LOG.info("[ToTSDataSet {}] result got.", queryExecution);
+      LOG.info("[ToTSDataSet {}] result got. Empty: {}", queryExecution, tsBlock == null);
       if (tsBlock == null) {
         break;
       }