You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/25 03:06:53 UTC

[iotdb] branch master updated: [IOTDB-4220] Try to fix GetTsBlock NPE (#7110)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 25131906c6 [IOTDB-4220] Try to fix GetTsBlock NPE (#7110)
25131906c6 is described below

commit 25131906c64146e69507c3d9cfb52cd751125f0b
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Thu Aug 25 11:06:47 2022 +0800

    [IOTDB-4220] Try to fix GetTsBlock NPE (#7110)
---
 .../db/mpp/execution/exchange/SinkHandle.java      | 28 +++++++++++++++-------
 .../org/apache/iotdb/db/mpp/plan/Coordinator.java  |  2 +-
 2 files changed, 20 insertions(+), 10 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index 644c9985bf..4cc0539a01 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -230,7 +230,7 @@ public class SinkHandle implements ISinkHandle {
   }
 
   @Override
-  public void close() {
+  public synchronized void close() {
     logger.info("SinkHandle is being closed.");
     sequenceIdToTsBlock.clear();
     closed = true;
@@ -246,17 +246,17 @@ public class SinkHandle implements ISinkHandle {
   }
 
   @Override
-  public boolean isAborted() {
+  public synchronized boolean isAborted() {
     return aborted;
   }
 
   @Override
-  public boolean isFinished() {
+  public synchronized boolean isFinished() {
     return noMoreTsBlocks && sequenceIdToTsBlock.isEmpty();
   }
 
   @Override
-  public long getBufferRetainedSizeInBytes() {
+  public synchronized long getBufferRetainedSizeInBytes() {
     return bufferRetainedSizeInBytes;
   }
 
@@ -268,13 +268,23 @@ public class SinkHandle implements ISinkHandle {
     throw new UnsupportedOperationException();
   }
 
-  ByteBuffer getSerializedTsBlock(int sequenceId) throws IOException {
-    TsBlock tsBlock;
-    tsBlock = sequenceIdToTsBlock.get(sequenceId).left;
-    if (tsBlock == null) {
+  synchronized ByteBuffer getSerializedTsBlock(int sequenceId) throws IOException {
+    if (aborted || closed) {
+      logger.warn(
+          "SinkHandle still receive getting TsBlock request after being aborted={} or closed={}",
+          aborted,
+          closed);
+      throw new IllegalStateException("Sink handle is aborted or closed. ");
+    }
+    Pair<TsBlock, Long> pair = sequenceIdToTsBlock.get(sequenceId);
+    if (pair == null || pair.left == null) {
+      logger.error(
+          "The data block doesn't exist. Sequence ID is {}, remaining map is {}",
+          sequenceId,
+          sequenceIdToTsBlock.entrySet());
       throw new IllegalStateException("The data block doesn't exist. Sequence ID: " + sequenceId);
     }
-    return serde.serialize(tsBlock);
+    return serde.serialize(pair.left);
   }
 
   void acknowledgeTsBlock(int startSequenceId, int endSequenceId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index e406eadd97..d4f8ac8e9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -120,7 +120,7 @@ public class Coordinator {
     QueryId globalQueryId = queryIdGenerator.createNextQueryId();
     try (SetThreadName queryName = new SetThreadName(globalQueryId.getId())) {
       if (sql != null && sql.length() > 0) {
-        LOGGER.debug("start executing sql: {}", sql);
+        LOGGER.info("start executing sql: {}", sql);
       }
       MPPQueryContext queryContext =
           new MPPQueryContext(