You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/22 04:09:40 UTC

[iotdb] branch xingtanzjr/query_state created (now c193dad066)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch xingtanzjr/query_state
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at c193dad066 fix the bug in state tracker that the printRate is not correct

This branch includes the following new commits:

     new c193dad066 fix the bug in state tracker that the printRate is not correct

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: fix the bug in state tracker that the printRate is not correct

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/query_state
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c193dad0668ca4042ab9d546595115c640924f2b
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Aug 22 12:09:25 2022 +0800

    fix the bug in state tracker that the printRate is not correct
---
 .../db/mpp/plan/execution/QueryExecution.java      |  3 ++
 .../scheduler/FixedRateFragInsStateTracker.java    | 33 ++++++++++++++++++----
 2 files changed, 30 insertions(+), 6 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 4a6ac5fdc0..e9d6857ae1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -368,6 +368,9 @@ public class QueryExecution implements IQueryExecution {
         stateMachine.transitionToFailed(e);
         Thread.currentThread().interrupt();
         throw new IoTDBException(e, TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+      } catch (Throwable t) {
+        stateMachine.transitionToFailed(t);
+        throw t;
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
index 42a98f6b38..fb91167a6d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
@@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -47,8 +49,7 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
   // TODO: (xingtanzjr) consider how much Interval is OK for state tracker
   private static final long STATE_FETCH_INTERVAL_IN_MS = 500;
   private ScheduledFuture<?> trackTask;
-  private volatile FragmentInstanceState lastState;
-  private volatile long durationToLastPrintInMS;
+  private final ConcurrentHashMap<FragmentInstanceId, InstanceStateMetrics> lastState;
   private volatile boolean aborted;
 
   public FixedRateFragInsStateTracker(
@@ -58,6 +59,7 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
     super(stateMachine, scheduledExecutor, instances, internalServiceClientManager);
     this.aborted = false;
+    this.lastState = new ConcurrentHashMap<>();
   }
 
   @Override
@@ -93,12 +95,12 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
     for (FragmentInstance instance : instances) {
       try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) {
         FragmentInstanceState state = fetchState(instance);
-        if (needPrintState(lastState, state, durationToLastPrintInMS)) {
+        InstanceStateMetrics metrics = lastState.computeIfAbsent(instance.getId(), k -> new InstanceStateMetrics());
+        if (needPrintState(metrics.lastState, state, metrics.durationToLastPrintInMS)) {
           logger.info("State is {}", state);
-          lastState = state;
-          durationToLastPrintInMS = 0;
+          metrics.reset(state);
         } else {
-          durationToLastPrintInMS += STATE_FETCH_INTERVAL_IN_MS;
+          metrics.addDuration(STATE_FETCH_INTERVAL_IN_MS);
         }
 
         if (state != null) {
@@ -118,4 +120,23 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
     }
     return durationToLastPrintInMS >= SAME_STATE_PRINT_RATE_IN_MS;
   }
+
+  private static class InstanceStateMetrics {
+    private FragmentInstanceState lastState;
+    private long durationToLastPrintInMS;
+
+    private InstanceStateMetrics() {
+      this.lastState = null;
+      this.durationToLastPrintInMS = 0L;
+    }
+
+    private void reset(FragmentInstanceState newState) {
+      this.lastState = newState;
+      this.durationToLastPrintInMS = 0L;
+    }
+
+    private void addDuration(long duration) {
+      durationToLastPrintInMS += duration;
+    }
+  }
 }