You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/01/29 03:24:13 UTC

[iotdb] branch IOTDB-5434-1.0 created (now 9e2859466b)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch IOTDB-5434-1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 9e2859466b [IOTDB-5434] Fix occasional timeout error in CI

This branch includes the following new commits:

     new 9e2859466b [IOTDB-5434] Fix occasional timeout error in CI

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-5434] Fix occasional timeout error in CI

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-5434-1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9e2859466b661f86b03cf2a4452f61b3eb9c0e97
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Sun Jan 29 11:13:24 2023 +0800

    [IOTDB-5434] Fix occasional timeout error in CI
---
 .../execution/exchange/MPPDataExchangeManager.java | 92 ++++++++--------------
 1 file changed, 31 insertions(+), 61 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 28146749a0..ea9d62e815 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -41,9 +41,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
@@ -147,18 +144,15 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
             e.getTargetPlanNodeId(),
             e.getTargetFragmentInstanceId(),
             e.getSourceFragmentInstanceId());
-        if (!sourceHandles.containsKey(e.getTargetFragmentInstanceId())
-            || !sourceHandles
-                .get(e.getTargetFragmentInstanceId())
-                .containsKey(e.getTargetPlanNodeId())
-            || sourceHandles
-                .get(e.getTargetFragmentInstanceId())
-                .get(e.getTargetPlanNodeId())
-                .isAborted()
-            || sourceHandles
-                .get(e.getTargetFragmentInstanceId())
-                .get(e.getTargetPlanNodeId())
-                .isFinished()) {
+
+        Map<String, ISourceHandle> sourceHandleMap =
+            sourceHandles.get(e.getTargetFragmentInstanceId());
+        SourceHandle sourceHandle =
+            sourceHandleMap == null
+                ? null
+                : (SourceHandle) sourceHandleMap.get(e.getTargetPlanNodeId());
+
+        if (sourceHandle == null || sourceHandle.isAborted() || sourceHandle.isFinished()) {
           // In some scenario, when the SourceHandle sends the data block ACK event, its upstream
           // may
           // have already been stopped. For example, in the query whit LimitOperator, the downstream
@@ -169,9 +163,6 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
           return;
         }
 
-        SourceHandle sourceHandle =
-            (SourceHandle)
-                sourceHandles.get(e.getTargetFragmentInstanceId()).get(e.getTargetPlanNodeId());
         sourceHandle.updatePendingDataBlockInfo(e.getStartSequenceId(), e.getBlockSizes());
       }
     }
@@ -185,28 +176,21 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
             e.getTargetPlanNodeId(),
             e.getTargetFragmentInstanceId(),
             e.getSourceFragmentInstanceId());
-        if (!sourceHandles.containsKey(e.getTargetFragmentInstanceId())
-            || !sourceHandles
-                .get(e.getTargetFragmentInstanceId())
-                .containsKey(e.getTargetPlanNodeId())
-            || sourceHandles
-                .get(e.getTargetFragmentInstanceId())
-                .get(e.getTargetPlanNodeId())
-                .isAborted()
-            || sourceHandles
-                .get(e.getTargetFragmentInstanceId())
-                .get(e.getTargetPlanNodeId())
-                .isFinished()) {
+
+        Map<String, ISourceHandle> sourceHandleMap =
+            sourceHandles.get(e.getTargetFragmentInstanceId());
+        SourceHandle sourceHandle =
+            sourceHandleMap == null
+                ? null
+                : (SourceHandle) sourceHandleMap.get(e.getTargetPlanNodeId());
+
+        if (sourceHandle == null || sourceHandle.isAborted() || sourceHandle.isFinished()) {
           logger.debug(
               "received onEndOfDataBlockEvent but the downstream FragmentInstance[{}] is not found",
               e.getTargetFragmentInstanceId());
           return;
         }
-        SourceHandle sourceHandle =
-            (SourceHandle)
-                sourceHandles
-                    .getOrDefault(e.getTargetFragmentInstanceId(), Collections.emptyMap())
-                    .get(e.getTargetPlanNodeId());
+
         sourceHandle.setNoMoreTsBlocks(e.getLastSequenceId());
       }
     }
@@ -224,18 +208,14 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
     @Override
     public void onFinished(ISourceHandle sourceHandle) {
       logger.debug("[ScHListenerOnFinish]");
-      if (!sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId())
-          || !sourceHandles
-              .get(sourceHandle.getLocalFragmentInstanceId())
-              .containsKey(sourceHandle.getLocalPlanNodeId())) {
+      Map<String, ISourceHandle> sourceHandleMap =
+          sourceHandles.get(sourceHandle.getLocalFragmentInstanceId());
+      if (sourceHandleMap == null
+          || sourceHandleMap.remove(sourceHandle.getLocalPlanNodeId()) == null) {
         logger.debug("[ScHListenerAlreadyReleased]");
-      } else {
-        sourceHandles
-            .get(sourceHandle.getLocalFragmentInstanceId())
-            .remove(sourceHandle.getLocalPlanNodeId());
       }
-      if (sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId())
-          && sourceHandles.get(sourceHandle.getLocalFragmentInstanceId()).isEmpty()) {
+
+      if (sourceHandleMap != null && sourceHandleMap.isEmpty()) {
         sourceHandles.remove(sourceHandle.getLocalFragmentInstanceId());
       }
     }
@@ -358,8 +338,10 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
         localFragmentInstanceId);
 
     SharedTsBlockQueue queue;
-    if (sourceHandles.containsKey(remoteFragmentInstanceId)
-        && sourceHandles.get(remoteFragmentInstanceId).containsKey(remotePlanNodeId)) {
+    Map<String, ISourceHandle> sourceHandleMap = sourceHandles.get(remoteFragmentInstanceId);
+    LocalSourceHandle localSourceHandle =
+        sourceHandleMap == null ? null : (LocalSourceHandle) sourceHandleMap.get(remotePlanNodeId);
+    if (localSourceHandle != null) {
       logger.debug("Get shared tsblock queue from local source handle");
       queue =
           ((LocalSourceHandle) sourceHandles.get(remoteFragmentInstanceId).get(remotePlanNodeId))
@@ -466,8 +448,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
       TEndPoint remoteEndpoint,
       TFragmentInstanceId remoteFragmentInstanceId,
       IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
-    if (sourceHandles.containsKey(localFragmentInstanceId)
-        && sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) {
+    Map<String, ISourceHandle> sourceHandleMap = sourceHandles.get(localFragmentInstanceId);
+    if (sourceHandleMap != null && sourceHandleMap.containsKey(localPlanNodeId)) {
       throw new IllegalStateException(
           "Source handle for plan node "
               + localPlanNodeId
@@ -532,16 +514,4 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
         + "."
         + suffix;
   }
-
-  public ISinkHandle getISinkHandle(TFragmentInstanceId fragmentInstanceId) {
-    return sinkHandles.get(fragmentInstanceId);
-  }
-
-  public List<ISourceHandle> getISourceHandle(TFragmentInstanceId fragmentInstanceId) {
-    if (sourceHandles.containsKey(fragmentInstanceId)) {
-      return new ArrayList<>(sourceHandles.get(fragmentInstanceId).values());
-    } else {
-      return new ArrayList<>();
-    }
-  }
 }