You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/29 03:06:29 UTC
[iotdb] 01/01: [To rel/1.0] Change read dispatch from async to sync
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch DispatchRead
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 24ecb0cf224674eb8126a112b025ae8070315cbd
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Dec 29 11:06:15 2022 +0800
[To rel/1.0] Change read dispatch from async to sync
---
.../scheduler/FragmentInstanceDispatcherImpl.java | 30 ++++++++++------------
1 file changed, 14 insertions(+), 16 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index db04e97f0b..34a75f88d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -95,22 +95,20 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
// unsafe for current FragmentInstance scheduler framework. We need to implement the
// topological dispatch according to dependency relations between FragmentInstances
private Future<FragInstanceDispatchResult> dispatchRead(List<FragmentInstance> instances) {
- return executor.submit(
- () -> {
- for (FragmentInstance instance : instances) {
- try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) {
- dispatchOneInstance(instance);
- } catch (FragmentInstanceDispatchException e) {
- return new FragInstanceDispatchResult(e.getFailureStatus());
- } catch (Throwable t) {
- logger.warn("[DispatchFailed]", t);
- return new FragInstanceDispatchResult(
- RpcUtils.getStatus(
- TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage()));
- }
- }
- return new FragInstanceDispatchResult(true);
- });
+ for (FragmentInstance instance : instances) {
+ try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) {
+ dispatchOneInstance(instance);
+ } catch (FragmentInstanceDispatchException e) {
+ return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus()));
+ } catch (Throwable t) {
+ logger.warn("[DispatchFailed]", t);
+ return immediateFuture(
+ (new FragInstanceDispatchResult(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage()))));
+ }
+ }
+ return immediateFuture(new FragInstanceDispatchResult(true));
}
private Future<FragInstanceDispatchResult> dispatchWriteSync(List<FragmentInstance> instances) {