You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/11/29 11:40:22 UTC

[iotdb] 04/04: change dispatch way

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/tsbs
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7df7c57c69f00bb18e8db3ede75cd479946adbb1
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Nov 29 19:40:02 2022 +0800

    change dispatch way
---
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 36 ++++++++++------------
 1 file changed, 17 insertions(+), 19 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 1ffa111a8e..15a5f8c1d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -100,25 +100,23 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
   //  unsafe for current FragmentInstance scheduler framework. We need to implement the
   //  topological dispatch according to dependency relations between FragmentInstances
   private Future<FragInstanceDispatchResult> dispatchRead(List<FragmentInstance> instances) {
-    return executor.submit(
-        () -> {
-          for (FragmentInstance instance : instances) {
-            long startTime = System.nanoTime();
-            try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) {
-              dispatchOneInstance(instance);
-            } catch (FragmentInstanceDispatchException e) {
-              return new FragInstanceDispatchResult(e.getFailureStatus());
-            } catch (Throwable t) {
-              logger.error("[DispatchFailed]", t);
-              return new FragInstanceDispatchResult(
-                  RpcUtils.getStatus(
-                      TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage()));
-            } finally {
-              QUERY_STATISTICS.addCost(DISPATCH_READ, System.nanoTime() - startTime);
-            }
-          }
-          return new FragInstanceDispatchResult(true);
-        });
+    for (FragmentInstance instance : instances) {
+      long startTime = System.nanoTime();
+      try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) {
+        dispatchOneInstance(instance);
+      } catch (FragmentInstanceDispatchException e) {
+        return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus()));
+      } catch (Throwable t) {
+        logger.error("[DispatchFailed]", t);
+        return immediateFuture(
+            new FragInstanceDispatchResult(
+                RpcUtils.getStatus(
+                    TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage())));
+      } finally {
+        QUERY_STATISTICS.addCost(DISPATCH_READ, System.nanoTime() - startTime);
+      }
+    }
+    return immediateFuture(new FragInstanceDispatchResult(true));
   }
 
   private Future<FragInstanceDispatchResult> dispatchWriteSync(List<FragmentInstance> instances) {