You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/01/29 03:13:38 UTC
[iotdb] 01/01: [IOTDB-5434] Fix occasional timeout error in CI
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch IOTDB-5434
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 388f5637708ad17d3eb5bdcf508750f507c28ae8
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Sun Jan 29 11:13:24 2023 +0800
[IOTDB-5434] Fix occasional timeout error in CI
---
.../execution/exchange/MPPDataExchangeManager.java | 92 ++++++++--------------
1 file changed, 31 insertions(+), 61 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index b3d89c0340..f51c4b77cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -43,9 +43,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
@@ -170,18 +167,15 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
e.getTargetPlanNodeId(),
e.getTargetFragmentInstanceId(),
e.getSourceFragmentInstanceId());
- if (!sourceHandles.containsKey(e.getTargetFragmentInstanceId())
- || !sourceHandles
- .get(e.getTargetFragmentInstanceId())
- .containsKey(e.getTargetPlanNodeId())
- || sourceHandles
- .get(e.getTargetFragmentInstanceId())
- .get(e.getTargetPlanNodeId())
- .isAborted()
- || sourceHandles
- .get(e.getTargetFragmentInstanceId())
- .get(e.getTargetPlanNodeId())
- .isFinished()) {
+
+ Map<String, ISourceHandle> sourceHandleMap =
+ sourceHandles.get(e.getTargetFragmentInstanceId());
+ SourceHandle sourceHandle =
+ sourceHandleMap == null
+ ? null
+ : (SourceHandle) sourceHandleMap.get(e.getTargetPlanNodeId());
+
+ if (sourceHandle == null || sourceHandle.isAborted() || sourceHandle.isFinished()) {
// In some scenario, when the SourceHandle sends the data block ACK event, its upstream
// may
// have already been stopped. For example, in the query whit LimitOperator, the downstream
@@ -192,9 +186,6 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
return;
}
- SourceHandle sourceHandle =
- (SourceHandle)
- sourceHandles.get(e.getTargetFragmentInstanceId()).get(e.getTargetPlanNodeId());
sourceHandle.updatePendingDataBlockInfo(e.getStartSequenceId(), e.getBlockSizes());
} finally {
QUERY_METRICS.recordDataExchangeCost(
@@ -212,28 +203,21 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
e.getTargetPlanNodeId(),
e.getTargetFragmentInstanceId(),
e.getSourceFragmentInstanceId());
- if (!sourceHandles.containsKey(e.getTargetFragmentInstanceId())
- || !sourceHandles
- .get(e.getTargetFragmentInstanceId())
- .containsKey(e.getTargetPlanNodeId())
- || sourceHandles
- .get(e.getTargetFragmentInstanceId())
- .get(e.getTargetPlanNodeId())
- .isAborted()
- || sourceHandles
- .get(e.getTargetFragmentInstanceId())
- .get(e.getTargetPlanNodeId())
- .isFinished()) {
+
+ Map<String, ISourceHandle> sourceHandleMap =
+ sourceHandles.get(e.getTargetFragmentInstanceId());
+ SourceHandle sourceHandle =
+ sourceHandleMap == null
+ ? null
+ : (SourceHandle) sourceHandleMap.get(e.getTargetPlanNodeId());
+
+ if (sourceHandle == null || sourceHandle.isAborted() || sourceHandle.isFinished()) {
logger.debug(
"received onEndOfDataBlockEvent but the downstream FragmentInstance[{}] is not found",
e.getTargetFragmentInstanceId());
return;
}
- SourceHandle sourceHandle =
- (SourceHandle)
- sourceHandles
- .getOrDefault(e.getTargetFragmentInstanceId(), Collections.emptyMap())
- .get(e.getTargetPlanNodeId());
+
sourceHandle.setNoMoreTsBlocks(e.getLastSequenceId());
}
}
@@ -251,18 +235,14 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
@Override
public void onFinished(ISourceHandle sourceHandle) {
logger.debug("[ScHListenerOnFinish]");
- if (!sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId())
- || !sourceHandles
- .get(sourceHandle.getLocalFragmentInstanceId())
- .containsKey(sourceHandle.getLocalPlanNodeId())) {
+ Map<String, ISourceHandle> sourceHandleMap =
+ sourceHandles.get(sourceHandle.getLocalFragmentInstanceId());
+ if (sourceHandleMap == null
+ || sourceHandleMap.remove(sourceHandle.getLocalPlanNodeId()) == null) {
logger.debug("[ScHListenerAlreadyReleased]");
- } else {
- sourceHandles
- .get(sourceHandle.getLocalFragmentInstanceId())
- .remove(sourceHandle.getLocalPlanNodeId());
}
- if (sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId())
- && sourceHandles.get(sourceHandle.getLocalFragmentInstanceId()).isEmpty()) {
+
+ if (sourceHandleMap != null && sourceHandleMap.isEmpty()) {
sourceHandles.remove(sourceHandle.getLocalFragmentInstanceId());
}
}
@@ -459,8 +439,10 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
localFragmentInstanceId);
SharedTsBlockQueue queue;
- if (sourceHandles.containsKey(remoteFragmentInstanceId)
- && sourceHandles.get(remoteFragmentInstanceId).containsKey(remotePlanNodeId)) {
+ Map<String, ISourceHandle> sourceHandleMap = sourceHandles.get(remoteFragmentInstanceId);
+ LocalSourceHandle localSourceHandle =
+ sourceHandleMap == null ? null : (LocalSourceHandle) sourceHandleMap.get(remotePlanNodeId);
+ if (localSourceHandle != null) {
logger.debug("Get shared tsblock queue from local source handle");
queue =
((LocalSourceHandle) sourceHandles.get(remoteFragmentInstanceId).get(remotePlanNodeId))
@@ -595,8 +577,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
TEndPoint remoteEndpoint,
TFragmentInstanceId remoteFragmentInstanceId,
IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
- if (sourceHandles.containsKey(localFragmentInstanceId)
- && sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) {
+ Map<String, ISourceHandle> sourceHandleMap = sourceHandles.get(localFragmentInstanceId);
+ if (sourceHandleMap != null && sourceHandleMap.containsKey(localPlanNodeId)) {
throw new IllegalStateException(
"Source handle for plan node "
+ localPlanNodeId
@@ -661,16 +643,4 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
+ "."
+ suffix;
}
-
- public ISinkHandle getISinkHandle(TFragmentInstanceId fragmentInstanceId) {
- return sinkHandles.get(fragmentInstanceId);
- }
-
- public List<ISourceHandle> getISourceHandle(TFragmentInstanceId fragmentInstanceId) {
- if (sourceHandles.containsKey(fragmentInstanceId)) {
- return new ArrayList<>(sourceHandles.get(fragmentInstanceId).values());
- } else {
- return new ArrayList<>();
- }
- }
}