You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2022/11/08 13:08:46 UTC
[iotdb] branch master updated: Rollback modification of FrangmentInstanceDispatherImpl.dispatchWriteSync (#7933)
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7e16003793 Rollback modification of FrangmentInstanceDispatherImpl.dispatchWriteSync (#7933)
7e16003793 is described below
commit 7e16003793bdd05bb3cfd9993b57040622cbc2ed
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Tue Nov 8 21:08:40 2022 +0800
Rollback modification of FrangmentInstanceDispatherImpl.dispatchWriteSync (#7933)
---
.../scheduler/FragmentInstanceDispatcherImpl.java | 58 +++-------------------
1 file changed, 8 insertions(+), 50 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 1a2d6e7991..1128b97341 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -48,9 +48,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -116,60 +114,20 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
}
private Future<FragInstanceDispatchResult> dispatchWriteSync(List<FragmentInstance> instances) {
- List<Future<Throwable>> futureList = new ArrayList<>();
for (FragmentInstance instance : instances) {
- futureList.add(
- writeOperationExecutor.submit(
- () -> {
- try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) {
- dispatchOneInstance(instance);
- return null;
- } catch (Throwable t) {
- return t;
- }
- }));
- }
-
- List<Throwable> throwableList = new ArrayList<>();
-
- for (Future<Throwable> future : futureList) {
- try {
- Throwable t = future.get();
- if (t != null) {
- throwableList.add(t);
- }
- } catch (InterruptedException | ExecutionException e) {
- throwableList.add(e);
- logger.error("[DispatchFailed]", e);
+ try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) {
+ dispatchOneInstance(instance);
+ } catch (FragmentInstanceDispatchException e) {
+ return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus()));
+ } catch (Throwable t) {
+ logger.error("[DispatchFailed]", t);
return immediateFuture(
new FragInstanceDispatchResult(
RpcUtils.getStatus(
- TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + e.getMessage())));
- }
- }
-
- if (throwableList.isEmpty()) {
- return immediateFuture(new FragInstanceDispatchResult(true));
- } else {
- List<TSStatus> failureStatusList = new ArrayList<>(throwableList.size());
- for (Throwable t : throwableList) {
- if (t instanceof FragmentInstanceDispatchException) {
- failureStatusList.add(((FragmentInstanceDispatchException) t).getFailureStatus());
- } else {
- logger.error("[DispatchFailed]", t);
- return immediateFuture(
- new FragInstanceDispatchResult(
- RpcUtils.getStatus(
- TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage())));
- }
- }
- if (failureStatusList.size() == 1) {
- return immediateFuture(new FragInstanceDispatchResult(failureStatusList.get(0)));
- } else {
- return immediateFuture(
- new FragInstanceDispatchResult(RpcUtils.getStatus(failureStatusList)));
+ TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage())));
}
}
+ return immediateFuture(new FragInstanceDispatchResult(true));
}
private void dispatchOneInstance(FragmentInstance instance)