You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/11 13:19:49 UTC

[iotdb] branch AddMoreLog created (now 9aaee1755c)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch AddMoreLog
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 9aaee1755c Fix SinkHandle bug

This branch includes the following new commits:

     new 9aaee1755c Fix SinkHandle bug

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Fix SinkHandle bug

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch AddMoreLog
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9aaee1755c81d03592760a3ba7c2127f2377deed
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Aug 11 21:19:27 2022 +0800

    Fix SinkHandle bug
---
 .../org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java | 4 ++--
 .../iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java       | 4 ++++
 .../java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java   | 4 ++--
 .../java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java | 2 +-
 4 files changed, 9 insertions(+), 5 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
index 3bc455592e..35c7cda823 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
@@ -142,7 +142,7 @@ public class LocalSourceHandle implements ISourceHandle {
       logger.info("Source handle is being aborted.");
       synchronized (queue) {
         synchronized (this) {
-          if (aborted) {
+          if (aborted || closed) {
             return;
           }
           queue.abort();
@@ -163,7 +163,7 @@ public class LocalSourceHandle implements ISourceHandle {
       logger.info("Source handle is being closed.");
       synchronized (queue) {
         synchronized (this) {
-          if (aborted) {
+          if (aborted || closed) {
             return;
           }
           queue.close();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 7f087e2a9a..b2b32222d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -121,6 +121,10 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
         }
         ((SinkHandle) sinkHandles.get(e.getSourceFragmentInstanceId()))
             .acknowledgeTsBlock(e.getStartSequenceId(), e.getEndSequenceId());
+      } catch (Throwable t) {
+        logger.error(
+            "ack TsBlock [{}, {}) failed.", e.getStartSequenceId(), e.getEndSequenceId(), t);
+        throw t;
       }
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index 872720fb7e..6ccddaf44e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -197,7 +197,7 @@ public class SinkHandle implements ISinkHandle {
   @Override
   public synchronized void setNoMoreTsBlocks() {
     logger.info("start to set no-more-tsblocks");
-    if (aborted) {
+    if (aborted || closed) {
       return;
     }
     try {
@@ -283,7 +283,7 @@ public class SinkHandle implements ISinkHandle {
   void acknowledgeTsBlock(int startSequenceId, int endSequenceId) {
     long freedBytes = 0L;
     synchronized (this) {
-      if (aborted) {
+      if (aborted || closed) {
         return;
       }
       Iterator<Entry<Integer, Pair<TsBlock, Long>>> iterator =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index 3d4016dff4..1dc68c27a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -384,7 +384,7 @@ public class SourceHandle implements ISourceHandle {
             executorService.submit(
                 new SendAcknowledgeDataBlockEventTask(startSequenceId, endSequenceId));
             synchronized (SourceHandle.this) {
-              if (aborted) {
+              if (aborted || closed) {
                 return;
               }
               for (int i = startSequenceId; i < endSequenceId; i++) {