You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/08/12 01:42:02 UTC

[iotdb] branch master updated: Fix SinkHandle bug (#6971)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3741dda9a5 Fix SinkHandle bug (#6971)
3741dda9a5 is described below

commit 3741dda9a5d5a5bc6dddf46bd19e82216ab6e377
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Fri Aug 12 09:41:57 2022 +0800

    Fix SinkHandle bug (#6971)
---
 .../org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java | 4 ++--
 .../iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java       | 4 ++++
 .../java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java   | 4 ++--
 .../java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java | 2 +-
 4 files changed, 9 insertions(+), 5 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
index 3bc455592e..35c7cda823 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
@@ -142,7 +142,7 @@ public class LocalSourceHandle implements ISourceHandle {
       logger.info("Source handle is being aborted.");
       synchronized (queue) {
         synchronized (this) {
-          if (aborted) {
+          if (aborted || closed) {
             return;
           }
           queue.abort();
@@ -163,7 +163,7 @@ public class LocalSourceHandle implements ISourceHandle {
       logger.info("Source handle is being closed.");
       synchronized (queue) {
         synchronized (this) {
-          if (aborted) {
+          if (aborted || closed) {
             return;
           }
           queue.close();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 7f087e2a9a..b2b32222d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -121,6 +121,10 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
         }
         ((SinkHandle) sinkHandles.get(e.getSourceFragmentInstanceId()))
             .acknowledgeTsBlock(e.getStartSequenceId(), e.getEndSequenceId());
+      } catch (Throwable t) {
+        logger.error(
+            "ack TsBlock [{}, {}) failed.", e.getStartSequenceId(), e.getEndSequenceId(), t);
+        throw t;
       }
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index 872720fb7e..6ccddaf44e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -197,7 +197,7 @@ public class SinkHandle implements ISinkHandle {
   @Override
   public synchronized void setNoMoreTsBlocks() {
     logger.info("start to set no-more-tsblocks");
-    if (aborted) {
+    if (aborted || closed) {
       return;
     }
     try {
@@ -283,7 +283,7 @@ public class SinkHandle implements ISinkHandle {
   void acknowledgeTsBlock(int startSequenceId, int endSequenceId) {
     long freedBytes = 0L;
     synchronized (this) {
-      if (aborted) {
+      if (aborted || closed) {
         return;
       }
       Iterator<Entry<Integer, Pair<TsBlock, Long>>> iterator =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index 3d4016dff4..1dc68c27a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -384,7 +384,7 @@ public class SourceHandle implements ISourceHandle {
             executorService.submit(
                 new SendAcknowledgeDataBlockEventTask(startSequenceId, endSequenceId));
             synchronized (SourceHandle.this) {
-              if (aborted) {
+              if (aborted || closed) {
                 return;
               }
               for (int i = startSequenceId; i < endSequenceId; i++) {