You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/08/12 01:42:02 UTC
[iotdb] branch master updated: Fix SinkHandle bug (#6971)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3741dda9a5 Fix SinkHandle bug (#6971)
3741dda9a5 is described below
commit 3741dda9a5d5a5bc6dddf46bd19e82216ab6e377
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Fri Aug 12 09:41:57 2022 +0800
Fix SinkHandle bug (#6971)
---
.../org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java | 4 ++--
.../iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java | 4 ++++
.../java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java | 4 ++--
.../java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java | 2 +-
4 files changed, 9 insertions(+), 5 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
index 3bc455592e..35c7cda823 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
@@ -142,7 +142,7 @@ public class LocalSourceHandle implements ISourceHandle {
logger.info("Source handle is being aborted.");
synchronized (queue) {
synchronized (this) {
- if (aborted) {
+ if (aborted || closed) {
return;
}
queue.abort();
@@ -163,7 +163,7 @@ public class LocalSourceHandle implements ISourceHandle {
logger.info("Source handle is being closed.");
synchronized (queue) {
synchronized (this) {
- if (aborted) {
+ if (aborted || closed) {
return;
}
queue.close();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 7f087e2a9a..b2b32222d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -121,6 +121,10 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
}
((SinkHandle) sinkHandles.get(e.getSourceFragmentInstanceId()))
.acknowledgeTsBlock(e.getStartSequenceId(), e.getEndSequenceId());
+ } catch (Throwable t) {
+ logger.error(
+ "ack TsBlock [{}, {}) failed.", e.getStartSequenceId(), e.getEndSequenceId(), t);
+ throw t;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index 872720fb7e..6ccddaf44e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -197,7 +197,7 @@ public class SinkHandle implements ISinkHandle {
@Override
public synchronized void setNoMoreTsBlocks() {
logger.info("start to set no-more-tsblocks");
- if (aborted) {
+ if (aborted || closed) {
return;
}
try {
@@ -283,7 +283,7 @@ public class SinkHandle implements ISinkHandle {
void acknowledgeTsBlock(int startSequenceId, int endSequenceId) {
long freedBytes = 0L;
synchronized (this) {
- if (aborted) {
+ if (aborted || closed) {
return;
}
Iterator<Entry<Integer, Pair<TsBlock, Long>>> iterator =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index 3d4016dff4..1dc68c27a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -384,7 +384,7 @@ public class SourceHandle implements ISourceHandle {
executorService.submit(
new SendAcknowledgeDataBlockEventTask(startSequenceId, endSequenceId));
synchronized (SourceHandle.this) {
- if (aborted) {
+ if (aborted || closed) {
return;
}
for (int i = startSequenceId; i < endSequenceId; i++) {