You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/19 07:03:55 UTC
[iotdb] 02/02: fix an issue that SourceHandle may become Blocked
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/mpp_issues
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fb7bf77ccc67feb8e4734e8a28f123716e23d91c
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Apr 19 15:03:17 2022 +0800
fix an issue that SourceHandle may become Blocked
---
.../iotdb/db/mpp/buffer/DataBlockManager.java | 11 ++++++-----
.../org/apache/iotdb/db/mpp/buffer/SinkHandle.java | 14 +++++---------
.../apache/iotdb/db/mpp/buffer/SourceHandle.java | 16 +++++++++-------
.../iotdb/db/mpp/execution/IQueryExecution.java | 2 ++
.../iotdb/db/mpp/execution/QueryExecution.java | 22 ++++++++++++++++++++--
.../db/mpp/execution/config/ConfigExecution.java | 3 +++
.../iotdb/db/query/control/SessionManager.java | 7 ++++++-
.../thrift/impl/DataNodeTSIServiceImpl.java | 11 ++++++++++-
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 6 +++---
9 files changed, 64 insertions(+), 28 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index c6c642b1ef..3177888ba8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -138,7 +138,7 @@ public class DataBlockManager implements IDataBlockManager {
@Override
public void onEndOfDataBlockEvent(EndOfDataBlockEvent e) throws TException {
- logger.debug(
+ logger.info(
"End of data block event received, for plan node {} of {} from {}.",
e.getTargetPlanNodeId(),
e.getTargetFragmentInstanceId(),
@@ -175,11 +175,12 @@ public class DataBlockManager implements IDataBlockManager {
.containsKey(sourceHandle.getLocalPlanNodeId())) {
logger.info(
"Resources of finished source handle {} has already been released", sourceHandle);
+ } else {
+ sourceHandles
+ .get(sourceHandle.getLocalFragmentInstanceId())
+ .remove(sourceHandle.getLocalPlanNodeId());
}
- sourceHandles
- .get(sourceHandle.getLocalFragmentInstanceId())
- .remove(sourceHandle.getLocalPlanNodeId());
- if (sourceHandles.get(sourceHandle.getLocalFragmentInstanceId()).isEmpty()) {
+ if (sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId()) && sourceHandles.get(sourceHandle.getLocalFragmentInstanceId()).isEmpty()) {
sourceHandles.remove(sourceHandle.getLocalFragmentInstanceId());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index b0204bd80e..fc3cd825fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -160,11 +160,7 @@ public class SinkHandle implements ISinkHandle {
}
private void sendEndOfDataBlockEvent() throws TException {
- logger.info(
- "Send end of data block event to plan node {} of {}. {}",
- remotePlanNodeId,
- remoteFragmentInstanceId,
- Thread.currentThread().getName());
+ logger.info("[SinkHandle {}]: send end of data block event.", this.getRemotePlanNodeId());
int attempt = 0;
EndOfDataBlockEvent endOfDataBlockEvent =
new EndOfDataBlockEvent(
@@ -194,7 +190,7 @@ public class SinkHandle implements ISinkHandle {
@Override
public void close() throws IOException {
- logger.info("Sink handle {} is being closed.", this);
+ logger.info("[SinkHandle {}]: is being closed.", this.getRemotePlanNodeId());
if (throwable != null) {
throw new IOException(throwable);
}
@@ -211,12 +207,12 @@ public class SinkHandle implements ISinkHandle {
} catch (TException e) {
throw new IOException(e);
}
- logger.info("Sink handle {} is closed.", this);
+ logger.info("[SinkHandle {}] is closed.", this.getRemotePlanNodeId());
}
@Override
public void abort() {
- logger.info("Sink handle {} is being aborted.", this);
+ logger.info("[SinkHandle {}]: is being aborted.", this.getRemotePlanNodeId());
synchronized (this) {
sequenceIdToTsBlock.clear();
closed = true;
@@ -228,7 +224,7 @@ public class SinkHandle implements ISinkHandle {
}
}
sinkHandleListener.onAborted(this);
- logger.info("Sink handle {} is aborted", this);
+ logger.info("[SinkHandle {}]: is aborted.", this.getRemotePlanNodeId());
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index d1f1df86d8..01dc2f0fb1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -211,9 +211,14 @@ public class SourceHandle implements ISourceHandle {
}
synchronized void setNoMoreTsBlocks(int lastSequenceId) {
- logger.info("[SourceHandle {}-{}]: No more TsBlock. {} ", localFragmentInstanceId, localPlanNodeId, remoteFragmentInstanceId);
+ logger.info("[SourceHandle {}]: No more TsBlock. {} ", localPlanNodeId, remoteFragmentInstanceId);
this.lastSequenceId = lastSequenceId;
- noMoreTsBlocks = true;
+ if (!blocked.isDone() && currSequenceId - 1 == lastSequenceId) {
+ logger.info("[SourceHandle {}]: all blocks are consumed. set blocked to null.", localPlanNodeId);
+ blocked.set(null);
+ } else {
+ logger.info("[SourceHandle {}]: No need to set blocked. Blocked: {}, Consumed: {} ", localPlanNodeId, blocked.isDone(), currSequenceId - 1 == lastSequenceId);
+ }
}
synchronized void updatePendingDataBlockInfo(int startSequenceId, List<Long> dataBlockSizes) {
@@ -225,7 +230,7 @@ public class SourceHandle implements ISourceHandle {
@Override
public synchronized void close() {
- logger.info("[SourceHandle {}-{}]: closed ", localFragmentInstanceId, localPlanNodeId);
+ logger.info("[SourceHandle {}]: closed ", localPlanNodeId);
if (closed) {
return;
}
@@ -247,10 +252,7 @@ public class SourceHandle implements ISourceHandle {
@Override
public boolean isFinished() {
return throwable == null
- && noMoreTsBlocks
- && numActiveGetDataBlocksTask == 0
- && currSequenceId - 1 == lastSequenceId
- && sequenceIdToTsBlock.isEmpty();
+ && currSequenceId - 1 == lastSequenceId;
}
String getRemoteHostname() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
index 2e0bd9c76d..af9633ee89 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/IQueryExecution.java
@@ -28,6 +28,8 @@ public interface IQueryExecution {
void stop();
+ void stopAndCleanup();
+
ExecutionResult getStatus();
TsBlock getBatchResult();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 9dcb3e5fc3..380361c3fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -110,6 +110,10 @@ public class QueryExecution implements IQueryExecution {
return;
}
this.stop();
+ // TODO: (xingtanzjr) If the query is in abnormal state, the releaseResource() should be invoked
+ if (state == QueryState.FAILED || state == QueryState.ABORTED || state == QueryState.CANCELED) {
+ releaseResource();
+ }
});
}
@@ -160,18 +164,28 @@ public class QueryExecution implements IQueryExecution {
this.distributedPlan = planner.planFragments();
}
- /** Abort the query and do cleanup work including QuerySchedule aborting and resource releasing */
+ // Stop the workers for this query
public void stop() {
if (this.scheduler != null) {
this.scheduler.stop();
}
+ }
+
+ // Stop the query and clean up all the resources this query occupied
+ public void stopAndCleanup() {
+ stop();
releaseResource();
}
/** Release the resources that current QueryExecution hold. */
private void releaseResource() {
// close ResultHandle to unblock client's getResult request
- if (resultHandle != null && !resultHandle.isClosed()) {
+ // Actually, we should not close the ResultHandle when the QueryExecution is Finished.
+ // There are only two scenarios where the ResultHandle should be closed:
+ // 1. The client fetch all the result and the ResultHandle is finished.
+ // 2. The client's connection is closed that all owned QueryExecution should be cleaned up
+ if (resultHandle != null && resultHandle.isFinished()) {
+ LOG.info("[QueryExecution {}]: result handle is closed", context.getQueryId());
resultHandle.close();
}
}
@@ -186,12 +200,16 @@ public class QueryExecution implements IQueryExecution {
@Override
public TsBlock getBatchResult() {
try {
+ if (resultHandle.isClosed() || resultHandle.isFinished()) {
+ return null;
+ }
LOG.info("[QueryExecution {}]: try to get result.", context.getQueryId());
ListenableFuture<Void> blocked = resultHandle.isBlocked();
blocked.get();
LOG.info("[QueryExecution {}]: unblock. Cancelled: {}, Done: {}", context.getQueryId(), blocked.isCancelled(), blocked.isDone());
if (resultHandle.isFinished()) {
LOG.info("[QueryExecution {}]: result is null", context.getQueryId());
+ releaseResource();
return null;
}
return resultHandle.receive();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
index be1c95871f..b134d4896b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
@@ -103,6 +103,9 @@ public class ConfigExecution implements IQueryExecution {
@Override
public void stop() {}
+ @Override
+ public void stopAndCleanup() {}
+
@Override
public ExecutionResult getStatus() {
try {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 20aa1d1b86..a8a0faca92 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -46,6 +46,7 @@ import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
@@ -215,6 +216,10 @@ public class SessionManager {
}
public boolean releaseSessionResource(long sessionId) {
+ return releaseSessionResource(sessionId, this::releaseQueryResourceNoExceptions);
+ }
+
+ public boolean releaseSessionResource(long sessionId, Consumer<Long> releaseQueryResource) {
sessionIdToZoneId.remove(sessionId);
sessionIdToClientVersion.remove(sessionId);
@@ -224,7 +229,7 @@ public class SessionManager {
Set<Long> queryIdSet = statementIdToQueryId.remove(statementId);
if (queryIdSet != null) {
for (Long queryId : queryIdSet) {
- releaseQueryResourceNoExceptions(queryId);
+ releaseQueryResource.accept(queryId);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index 2c13c68ad7..3de0a28cc2 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -562,7 +562,7 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
ExecutionResult result =
COORDINATOR.execute(
statement,
- new QueryId(String.valueOf(queryId)),
+ genQueryId(queryId),
SESSION_MANAGER.getSessionInfo(req.sessionId),
"",
PARTITION_FETCHER,
@@ -756,8 +756,17 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
public void handleClientExit() {
Long sessionId = SESSION_MANAGER.getCurrSessionId();
if (sessionId != null) {
+ SESSION_MANAGER.releaseSessionResource(sessionId, this::cleanupQueryExecution);
TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
closeSession(req);
}
}
+
+ private void cleanupQueryExecution(Long queryId) {
+ COORDINATOR.getQueryExecution(genQueryId(queryId)).stopAndCleanup();
+ }
+
+ private QueryId genQueryId(long id) {
+ return new QueryId(String.valueOf(id));
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index aafc3ffd00..aca0b37906 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -195,10 +195,10 @@ public class QueryDataSetUtils {
int rowCount = 0;
int[] valueOccupation = new int[columnNum];
- while (rowCount < fetchSize && queryExecution.hasNextResult()) {
- LOG.info("[ToTSDataSet {}] hasNext return true.", queryExecution);
+ while (rowCount < fetchSize) {
+ LOG.info("[ToTSDataSet {}] invoke queryExecution.getBatchResult.", queryExecution);
TsBlock tsBlock = queryExecution.getBatchResult();
- LOG.info("[ToTSDataSet {}] result got.", queryExecution);
+ LOG.info("[ToTSDataSet {}] result got. Empty: {}", queryExecution, tsBlock == null);
if (tsBlock == null) {
break;
}