You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2022/11/08 13:08:46 UTC

[iotdb] branch master updated: Rollback modification of FrangmentInstanceDispatherImpl.dispatchWriteSync (#7933)

This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e16003793 Rollback modification of FrangmentInstanceDispatherImpl.dispatchWriteSync (#7933)
7e16003793 is described below

commit 7e16003793bdd05bb3cfd9993b57040622cbc2ed
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Tue Nov 8 21:08:40 2022 +0800

    Rollback modification of FrangmentInstanceDispatherImpl.dispatchWriteSync (#7933)
---
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 58 +++-------------------
 1 file changed, 8 insertions(+), 50 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 1a2d6e7991..1128b97341 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -48,9 +48,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
@@ -116,60 +114,20 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
   }
 
   private Future<FragInstanceDispatchResult> dispatchWriteSync(List<FragmentInstance> instances) {
-    List<Future<Throwable>> futureList = new ArrayList<>();
     for (FragmentInstance instance : instances) {
-      futureList.add(
-          writeOperationExecutor.submit(
-              () -> {
-                try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) {
-                  dispatchOneInstance(instance);
-                  return null;
-                } catch (Throwable t) {
-                  return t;
-                }
-              }));
-    }
-
-    List<Throwable> throwableList = new ArrayList<>();
-
-    for (Future<Throwable> future : futureList) {
-      try {
-        Throwable t = future.get();
-        if (t != null) {
-          throwableList.add(t);
-        }
-      } catch (InterruptedException | ExecutionException e) {
-        throwableList.add(e);
-        logger.error("[DispatchFailed]", e);
+      try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) {
+        dispatchOneInstance(instance);
+      } catch (FragmentInstanceDispatchException e) {
+        return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus()));
+      } catch (Throwable t) {
+        logger.error("[DispatchFailed]", t);
         return immediateFuture(
             new FragInstanceDispatchResult(
                 RpcUtils.getStatus(
-                    TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + e.getMessage())));
-      }
-    }
-
-    if (throwableList.isEmpty()) {
-      return immediateFuture(new FragInstanceDispatchResult(true));
-    } else {
-      List<TSStatus> failureStatusList = new ArrayList<>(throwableList.size());
-      for (Throwable t : throwableList) {
-        if (t instanceof FragmentInstanceDispatchException) {
-          failureStatusList.add(((FragmentInstanceDispatchException) t).getFailureStatus());
-        } else {
-          logger.error("[DispatchFailed]", t);
-          return immediateFuture(
-              new FragInstanceDispatchResult(
-                  RpcUtils.getStatus(
-                      TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage())));
-        }
-      }
-      if (failureStatusList.size() == 1) {
-        return immediateFuture(new FragInstanceDispatchResult(failureStatusList.get(0)));
-      } else {
-        return immediateFuture(
-            new FragInstanceDispatchResult(RpcUtils.getStatus(failureStatusList)));
+                    TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage())));
       }
     }
+    return immediateFuture(new FragInstanceDispatchResult(true));
   }
 
   private void dispatchOneInstance(FragmentInstance instance)