You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/29 03:06:29 UTC

[iotdb] 01/01: [To rel/1.0] Change read dispatch from async to sync

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch DispatchRead
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 24ecb0cf224674eb8126a112b025ae8070315cbd
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Dec 29 11:06:15 2022 +0800

    [To rel/1.0] Change read dispatch from async to sync
---
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 30 ++++++++++------------
 1 file changed, 14 insertions(+), 16 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index db04e97f0b..34a75f88d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -95,22 +95,20 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
   //  unsafe for current FragmentInstance scheduler framework. We need to implement the
   //  topological dispatch according to dependency relations between FragmentInstances
   private Future<FragInstanceDispatchResult> dispatchRead(List<FragmentInstance> instances) {
-    return executor.submit(
-        () -> {
-          for (FragmentInstance instance : instances) {
-            try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) {
-              dispatchOneInstance(instance);
-            } catch (FragmentInstanceDispatchException e) {
-              return new FragInstanceDispatchResult(e.getFailureStatus());
-            } catch (Throwable t) {
-              logger.warn("[DispatchFailed]", t);
-              return new FragInstanceDispatchResult(
-                  RpcUtils.getStatus(
-                      TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage()));
-            }
-          }
-          return new FragInstanceDispatchResult(true);
-        });
+    for (FragmentInstance instance : instances) {
+      try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) {
+        dispatchOneInstance(instance);
+      } catch (FragmentInstanceDispatchException e) {
+        return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus()));
+      } catch (Throwable t) {
+        logger.warn("[DispatchFailed]", t);
+        return immediateFuture(
+            (new FragInstanceDispatchResult(
+                RpcUtils.getStatus(
+                    TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage()))));
+      }
+    }
+    return immediateFuture(new FragInstanceDispatchResult(true));
   }
 
   private Future<FragInstanceDispatchResult> dispatchWriteSync(List<FragmentInstance> instances) {