You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/25 03:06:53 UTC
[iotdb] branch master updated: [IOTDB-4220] Try to fix GetTsBlock NPE (#7110)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 25131906c6 [IOTDB-4220] Try to fix GetTsBlock NPE (#7110)
25131906c6 is described below
commit 25131906c64146e69507c3d9cfb52cd751125f0b
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Thu Aug 25 11:06:47 2022 +0800
[IOTDB-4220] Try to fix GetTsBlock NPE (#7110)
---
.../db/mpp/execution/exchange/SinkHandle.java | 28 +++++++++++++++-------
.../org/apache/iotdb/db/mpp/plan/Coordinator.java | 2 +-
2 files changed, 20 insertions(+), 10 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index 644c9985bf..4cc0539a01 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -230,7 +230,7 @@ public class SinkHandle implements ISinkHandle {
}
@Override
- public void close() {
+ public synchronized void close() {
logger.info("SinkHandle is being closed.");
sequenceIdToTsBlock.clear();
closed = true;
@@ -246,17 +246,17 @@ public class SinkHandle implements ISinkHandle {
}
@Override
- public boolean isAborted() {
+ public synchronized boolean isAborted() {
return aborted;
}
@Override
- public boolean isFinished() {
+ public synchronized boolean isFinished() {
return noMoreTsBlocks && sequenceIdToTsBlock.isEmpty();
}
@Override
- public long getBufferRetainedSizeInBytes() {
+ public synchronized long getBufferRetainedSizeInBytes() {
return bufferRetainedSizeInBytes;
}
@@ -268,13 +268,23 @@ public class SinkHandle implements ISinkHandle {
throw new UnsupportedOperationException();
}
- ByteBuffer getSerializedTsBlock(int sequenceId) throws IOException {
- TsBlock tsBlock;
- tsBlock = sequenceIdToTsBlock.get(sequenceId).left;
- if (tsBlock == null) {
+ synchronized ByteBuffer getSerializedTsBlock(int sequenceId) throws IOException {
+ if (aborted || closed) {
+ logger.warn(
+ "SinkHandle still receive getting TsBlock request after being aborted={} or closed={}",
+ aborted,
+ closed);
+ throw new IllegalStateException("Sink handle is aborted or closed. ");
+ }
+ Pair<TsBlock, Long> pair = sequenceIdToTsBlock.get(sequenceId);
+ if (pair == null || pair.left == null) {
+ logger.error(
+ "The data block doesn't exist. Sequence ID is {}, remaining map is {}",
+ sequenceId,
+ sequenceIdToTsBlock.entrySet());
throw new IllegalStateException("The data block doesn't exist. Sequence ID: " + sequenceId);
}
- return serde.serialize(tsBlock);
+ return serde.serialize(pair.left);
}
void acknowledgeTsBlock(int startSequenceId, int endSequenceId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index e406eadd97..d4f8ac8e9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -120,7 +120,7 @@ public class Coordinator {
QueryId globalQueryId = queryIdGenerator.createNextQueryId();
try (SetThreadName queryName = new SetThreadName(globalQueryId.getId())) {
if (sql != null && sql.length() > 0) {
- LOGGER.debug("start executing sql: {}", sql);
+ LOGGER.info("start executing sql: {}", sql);
}
MPPQueryContext queryContext =
new MPPQueryContext(