You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/31 09:30:28 UTC

[iotdb] branch ty/ChangeLog created (now 5f9d1cc495)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch ty/ChangeLog
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 5f9d1cc495 Add flushing state timeout detection in FragmentInstanceManager

This branch includes the following new commits:

     new 5f9d1cc495 Add flushing state timeout detection in FragmentInstanceManager

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Add flushing state timeout detection in FragmentInstanceManager

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/ChangeLog
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5f9d1cc4957a64279ab85ab84edc2975530ceace
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Wed Aug 31 17:30:07 2022 +0800

    Add flushing state timeout detection in FragmentInstanceManager
---
 .../execution/exchange/MPPDataExchangeManager.java |  2 +-
 .../fragment/FragmentInstanceContext.java          |  4 ++++
 .../fragment/FragmentInstanceManager.java          | 25 +++++++++++++++++++++-
 3 files changed, 29 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index b2b32222d9..2742816e0f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -155,7 +155,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
           // have already been stopped. For example, in the query whit LimitOperator, the downstream
           // FragmentInstance may be finished, although the upstream is still working.
           logger.warn(
-              "received NewDataBlockEvent but the upstream FragmentInstance[{}] is not found",
+              "received NewDataBlockEvent but the downstream FragmentInstance[{}] is not found",
               e.getTargetFragmentInstanceId());
           return;
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 593947d080..62ab803edb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -188,6 +188,10 @@ public class FragmentInstanceContext extends QueryContext {
     return executionEndTime.get();
   }
 
+  public long getStartTime() {
+    return executionStartTime.get();
+  }
+
   public FragmentInstanceInfo getInstanceInfo() {
     return new FragmentInstanceInfo(stateMachine.getState(), getEndTime(), getFailedCause());
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index f6ebb1b42e..f27aedb217 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution.fragment;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
@@ -41,6 +42,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static java.util.Objects.requireNonNull;
 import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
@@ -63,6 +65,9 @@ public class FragmentInstanceManager {
   // record failed instances count
   private final CounterStat failedInstances = new CounterStat();
 
+  private static final long QUERY_TIMEOUT_MS =
+      IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
+
   public static FragmentInstanceManager getInstance() {
     return FragmentInstanceManager.InstanceHolder.INSTANCE;
   }
@@ -79,6 +84,12 @@ public class FragmentInstanceManager {
 
     ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
         instanceManagementExecutor, this::removeOldInstances, 200, 200, TimeUnit.MILLISECONDS);
+    ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+        instanceManagementExecutor,
+        this::cancelTimeoutFlushingInstances,
+        200,
+        200,
+        TimeUnit.MILLISECONDS);
   }
 
   public FragmentInstanceInfo execDataQueryFragmentInstance(
@@ -175,7 +186,7 @@ public class FragmentInstanceManager {
 
   /** Cancels a FragmentInstance. */
   public FragmentInstanceInfo cancelTask(FragmentInstanceId instanceId) {
-    logger.error("cancelTask");
+    logger.debug("cancelTask");
     requireNonNull(instanceId, "taskId is null");
 
     FragmentInstanceContext context = instanceContext.remove(instanceId);
@@ -223,6 +234,18 @@ public class FragmentInstanceManager {
             });
   }
 
+  private void cancelTimeoutFlushingInstances() {
+    long now = System.currentTimeMillis();
+    instanceContext.entrySet().stream()
+        .filter(
+            entry -> {
+              FragmentInstanceContext context = entry.getValue();
+              return context.getStateMachine().getState() == FragmentInstanceState.FLUSHING
+                  && (now - context.getStartTime()) > QUERY_TIMEOUT_MS;
+            })
+        .forEach(entry -> entry.getValue().failed(new TimeoutException()));
+  }
+
   private static class InstanceHolder {
 
     private InstanceHolder() {}