You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/11/29 11:40:22 UTC
[iotdb] 04/04: change dispatch way
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty/tsbs
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7df7c57c69f00bb18e8db3ede75cd479946adbb1
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Nov 29 19:40:02 2022 +0800
change dispatch way
---
.../scheduler/FragmentInstanceDispatcherImpl.java | 36 ++++++++++------------
1 file changed, 17 insertions(+), 19 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 1ffa111a8e..15a5f8c1d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -100,25 +100,23 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
// unsafe for current FragmentInstance scheduler framework. We need to implement the
// topological dispatch according to dependency relations between FragmentInstances
private Future<FragInstanceDispatchResult> dispatchRead(List<FragmentInstance> instances) {
- return executor.submit(
- () -> {
- for (FragmentInstance instance : instances) {
- long startTime = System.nanoTime();
- try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) {
- dispatchOneInstance(instance);
- } catch (FragmentInstanceDispatchException e) {
- return new FragInstanceDispatchResult(e.getFailureStatus());
- } catch (Throwable t) {
- logger.error("[DispatchFailed]", t);
- return new FragInstanceDispatchResult(
- RpcUtils.getStatus(
- TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage()));
- } finally {
- QUERY_STATISTICS.addCost(DISPATCH_READ, System.nanoTime() - startTime);
- }
- }
- return new FragInstanceDispatchResult(true);
- });
+ for (FragmentInstance instance : instances) {
+ long startTime = System.nanoTime();
+ try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) {
+ dispatchOneInstance(instance);
+ } catch (FragmentInstanceDispatchException e) {
+ return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus()));
+ } catch (Throwable t) {
+ logger.error("[DispatchFailed]", t);
+ return immediateFuture(
+ new FragInstanceDispatchResult(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage())));
+ } finally {
+ QUERY_STATISTICS.addCost(DISPATCH_READ, System.nanoTime() - startTime);
+ }
+ }
+ return immediateFuture(new FragInstanceDispatchResult(true));
}
private Future<FragInstanceDispatchResult> dispatchWriteSync(List<FragmentInstance> instances) {