You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2024/01/26 10:30:32 UTC

(iotdb) branch master updated: Pipe: fix connector subtasks can not be stopped and restarted after exception reporting by connector subtasks (#11979)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 40fc15b23bc Pipe: fix connector subtasks can not be stopped and restarted after exception reporting by connector subtasks (#11979)
40fc15b23bc is described below

commit 40fc15b23bcc6fca83412f1d9796763f03ee7eca
Author: Caideyipi <87...@users.noreply.github.com>
AuthorDate: Fri Jan 26 18:30:25 2024 +0800

    Pipe: fix connector subtasks can not be stopped and restarted after exception reporting by connector subtasks (#11979)
---
 .../subtask/connector/PipeConnectorSubtask.java    | 10 ++++--
 .../connector/PipeConnectorSubtaskLifeCycle.java   | 42 +++++++++++-----------
 2 files changed, 28 insertions(+), 24 deletions(-)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 967ca2d6b86..f472a07fa5f 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -207,14 +207,18 @@ public class PipeConnectorSubtask extends PipeDataNodeSubtask {
 
     if (throwable instanceof PipeConnectionException) {
       // Retry to connect to the target system if the connection is broken
+      // We should reconstruct the client before re-submit the subtask
       if (onPipeConnectionException(throwable)) {
         // return if the pipe task should be stopped
         return;
       }
     }
 
-    // Handle other exceptions as usual
-    super.onFailure(throwable);
+    // Handle exceptions if any available clients exist
+    // Notice that the PipeRuntimeConnectorCriticalException must be thrown here
+    // because the upper layer relies on this to stop all the related pipe tasks
+    // Other exceptions may cause the subtask to stop forever and can not be restarted
+    super.onFailure(new PipeRuntimeConnectorCriticalException(throwable.getMessage()));
   }
 
   /** @return true if the pipe task should be stopped, false otherwise */
@@ -252,7 +256,7 @@ public class PipeConnectorSubtask extends PipeDataNodeSubtask {
       }
     }
 
-    // Stop current pipe task if failed to reconnect to
+    // Stop current pipe task directly if failed to reconnect to
     // the target system after MAX_RETRY_TIMES times
     if (retry == MAX_RETRY_TIMES && lastEvent instanceof EnrichedEvent) {
       ((EnrichedEvent) lastEvent)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
index 6759dbdbaa2..6600fae8bc3 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
@@ -35,7 +35,7 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
   private final BoundedBlockingPendingQueue<Event> pendingQueue;
 
   private int runningTaskCount;
-  private int aliveTaskCount;
+  private int registeredTaskCount;
 
   public PipeConnectorSubtaskLifeCycle(
       PipeConnectorSubtaskExecutor executor,
@@ -46,7 +46,7 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
     this.pendingQueue = pendingQueue;
 
     runningTaskCount = 0;
-    aliveTaskCount = 0;
+    registeredTaskCount = 0;
   }
 
   public PipeConnectorSubtask getSubtask() {
@@ -58,44 +58,44 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
   }
 
   public synchronized void register() {
-    if (aliveTaskCount < 0) {
-      throw new IllegalStateException("aliveTaskCount < 0");
+    if (registeredTaskCount < 0) {
+      throw new IllegalStateException("registeredTaskCount < 0");
     }
 
-    if (aliveTaskCount == 0) {
+    if (registeredTaskCount == 0) {
       executor.register(subtask);
       runningTaskCount = 0;
     }
 
-    aliveTaskCount++;
+    registeredTaskCount++;
     LOGGER.info(
-        "Register subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
+        "Register subtask {}. runningTaskCount: {}, registeredTaskCount: {}",
         subtask,
         runningTaskCount,
-        aliveTaskCount);
+        registeredTaskCount);
   }
 
   /**
    * Deregister the subtask. If the subtask is the last one, close the subtask.
    *
    * <p>Note that this method should be called after the subtask is stopped. Otherwise, the
-   * runningTaskCount might be inconsistent with the aliveTaskCount because of parallel connector
-   * scheduling.
+   * runningTaskCount might be inconsistent with the registeredTaskCount because of parallel
+   * connector scheduling.
    *
    * @param pipeNameToDeregister pipe name
    * @return true if the subtask is out of life cycle, indicating that the subtask should never be
    *     used again
-   * @throws IllegalStateException if aliveTaskCount <= 0
+   * @throws IllegalStateException if registeredTaskCount <= 0
    */
   public synchronized boolean deregister(String pipeNameToDeregister) {
-    if (aliveTaskCount <= 0) {
-      throw new IllegalStateException("aliveTaskCount <= 0");
+    if (registeredTaskCount <= 0) {
+      throw new IllegalStateException("registeredTaskCount <= 0");
     }
 
     subtask.discardEventsOfPipe(pipeNameToDeregister);
 
     try {
-      if (aliveTaskCount > 1) {
+      if (registeredTaskCount > 1) {
         return false;
       }
 
@@ -103,12 +103,12 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
       // This subtask is out of life cycle, should never be used again
       return true;
     } finally {
-      aliveTaskCount--;
+      registeredTaskCount--;
       LOGGER.info(
-          "Deregister subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
+          "Deregister subtask {}. runningTaskCount: {}, registeredTaskCount: {}",
           subtask,
           runningTaskCount,
-          aliveTaskCount);
+          registeredTaskCount);
     }
   }
 
@@ -123,10 +123,10 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
 
     runningTaskCount++;
     LOGGER.info(
-        "Start subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
+        "Start subtask {}. runningTaskCount: {}, registeredTaskCount: {}",
         subtask,
         runningTaskCount,
-        aliveTaskCount);
+        registeredTaskCount);
   }
 
   public synchronized void stop() {
@@ -140,10 +140,10 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
 
     runningTaskCount--;
     LOGGER.info(
-        "Stop subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
+        "Stop subtask {}. runningTaskCount: {}, registeredTaskCount: {}",
         subtask,
         runningTaskCount,
-        aliveTaskCount);
+        registeredTaskCount);
   }
 
   @Override