You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/01/29 11:21:58 UTC

[iotdb] branch master updated: [IOTDB-5434] Fix occasional timeout error in CI

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new eef5c5c981 [IOTDB-5434] Fix occasional timeout error in CI
eef5c5c981 is described below

commit eef5c5c981c6130f44a6bcc670ee8550a9820b0e
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Sun Jan 29 19:21:52 2023 +0800

    [IOTDB-5434] Fix occasional timeout error in CI
---
 .../execution/exchange/MPPDataExchangeManager.java | 92 ++++++++--------------
 1 file changed, 31 insertions(+), 61 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index b3d89c0340..f51c4b77cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -43,9 +43,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
@@ -170,18 +167,15 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
             e.getTargetPlanNodeId(),
             e.getTargetFragmentInstanceId(),
             e.getSourceFragmentInstanceId());
-        if (!sourceHandles.containsKey(e.getTargetFragmentInstanceId())
-            || !sourceHandles
-                .get(e.getTargetFragmentInstanceId())
-                .containsKey(e.getTargetPlanNodeId())
-            || sourceHandles
-                .get(e.getTargetFragmentInstanceId())
-                .get(e.getTargetPlanNodeId())
-                .isAborted()
-            || sourceHandles
-                .get(e.getTargetFragmentInstanceId())
-                .get(e.getTargetPlanNodeId())
-                .isFinished()) {
+
+        Map<String, ISourceHandle> sourceHandleMap =
+            sourceHandles.get(e.getTargetFragmentInstanceId());
+        SourceHandle sourceHandle =
+            sourceHandleMap == null
+                ? null
+                : (SourceHandle) sourceHandleMap.get(e.getTargetPlanNodeId());
+
+        if (sourceHandle == null || sourceHandle.isAborted() || sourceHandle.isFinished()) {
           // In some scenario, when the SourceHandle sends the data block ACK event, its upstream
           // may
           // have already been stopped. For example, in the query whit LimitOperator, the downstream
@@ -192,9 +186,6 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
           return;
         }
 
-        SourceHandle sourceHandle =
-            (SourceHandle)
-                sourceHandles.get(e.getTargetFragmentInstanceId()).get(e.getTargetPlanNodeId());
         sourceHandle.updatePendingDataBlockInfo(e.getStartSequenceId(), e.getBlockSizes());
       } finally {
         QUERY_METRICS.recordDataExchangeCost(
@@ -212,28 +203,21 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
             e.getTargetPlanNodeId(),
             e.getTargetFragmentInstanceId(),
             e.getSourceFragmentInstanceId());
-        if (!sourceHandles.containsKey(e.getTargetFragmentInstanceId())
-            || !sourceHandles
-                .get(e.getTargetFragmentInstanceId())
-                .containsKey(e.getTargetPlanNodeId())
-            || sourceHandles
-                .get(e.getTargetFragmentInstanceId())
-                .get(e.getTargetPlanNodeId())
-                .isAborted()
-            || sourceHandles
-                .get(e.getTargetFragmentInstanceId())
-                .get(e.getTargetPlanNodeId())
-                .isFinished()) {
+
+        Map<String, ISourceHandle> sourceHandleMap =
+            sourceHandles.get(e.getTargetFragmentInstanceId());
+        SourceHandle sourceHandle =
+            sourceHandleMap == null
+                ? null
+                : (SourceHandle) sourceHandleMap.get(e.getTargetPlanNodeId());
+
+        if (sourceHandle == null || sourceHandle.isAborted() || sourceHandle.isFinished()) {
           logger.debug(
               "received onEndOfDataBlockEvent but the downstream FragmentInstance[{}] is not found",
               e.getTargetFragmentInstanceId());
           return;
         }
-        SourceHandle sourceHandle =
-            (SourceHandle)
-                sourceHandles
-                    .getOrDefault(e.getTargetFragmentInstanceId(), Collections.emptyMap())
-                    .get(e.getTargetPlanNodeId());
+
         sourceHandle.setNoMoreTsBlocks(e.getLastSequenceId());
       }
     }
@@ -251,18 +235,14 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
     @Override
     public void onFinished(ISourceHandle sourceHandle) {
       logger.debug("[ScHListenerOnFinish]");
-      if (!sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId())
-          || !sourceHandles
-              .get(sourceHandle.getLocalFragmentInstanceId())
-              .containsKey(sourceHandle.getLocalPlanNodeId())) {
+      Map<String, ISourceHandle> sourceHandleMap =
+          sourceHandles.get(sourceHandle.getLocalFragmentInstanceId());
+      if (sourceHandleMap == null
+          || sourceHandleMap.remove(sourceHandle.getLocalPlanNodeId()) == null) {
         logger.debug("[ScHListenerAlreadyReleased]");
-      } else {
-        sourceHandles
-            .get(sourceHandle.getLocalFragmentInstanceId())
-            .remove(sourceHandle.getLocalPlanNodeId());
       }
-      if (sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId())
-          && sourceHandles.get(sourceHandle.getLocalFragmentInstanceId()).isEmpty()) {
+
+      if (sourceHandleMap != null && sourceHandleMap.isEmpty()) {
         sourceHandles.remove(sourceHandle.getLocalFragmentInstanceId());
       }
     }
@@ -459,8 +439,10 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
         localFragmentInstanceId);
 
     SharedTsBlockQueue queue;
-    if (sourceHandles.containsKey(remoteFragmentInstanceId)
-        && sourceHandles.get(remoteFragmentInstanceId).containsKey(remotePlanNodeId)) {
+    Map<String, ISourceHandle> sourceHandleMap = sourceHandles.get(remoteFragmentInstanceId);
+    LocalSourceHandle localSourceHandle =
+        sourceHandleMap == null ? null : (LocalSourceHandle) sourceHandleMap.get(remotePlanNodeId);
+    if (localSourceHandle != null) {
       logger.debug("Get shared tsblock queue from local source handle");
       queue =
           ((LocalSourceHandle) sourceHandles.get(remoteFragmentInstanceId).get(remotePlanNodeId))
@@ -595,8 +577,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
       TEndPoint remoteEndpoint,
       TFragmentInstanceId remoteFragmentInstanceId,
       IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
-    if (sourceHandles.containsKey(localFragmentInstanceId)
-        && sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) {
+    Map<String, ISourceHandle> sourceHandleMap = sourceHandles.get(localFragmentInstanceId);
+    if (sourceHandleMap != null && sourceHandleMap.containsKey(localPlanNodeId)) {
       throw new IllegalStateException(
           "Source handle for plan node "
               + localPlanNodeId
@@ -661,16 +643,4 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
         + "."
         + suffix;
   }
-
-  public ISinkHandle getISinkHandle(TFragmentInstanceId fragmentInstanceId) {
-    return sinkHandles.get(fragmentInstanceId);
-  }
-
-  public List<ISourceHandle> getISourceHandle(TFragmentInstanceId fragmentInstanceId) {
-    if (sourceHandles.containsKey(fragmentInstanceId)) {
-      return new ArrayList<>(sourceHandles.get(fragmentInstanceId).values());
-    } else {
-      return new ArrayList<>();
-    }
-  }
 }