You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2024/01/26 10:30:32 UTC
(iotdb) branch master updated: Pipe: fix connector subtasks can not be stopped and restarted after exception reporting by connector subtasks (#11979)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 40fc15b23bc Pipe: fix connector subtasks can not be stopped and restarted after exception reporting by connector subtasks (#11979)
40fc15b23bc is described below
commit 40fc15b23bcc6fca83412f1d9796763f03ee7eca
Author: Caideyipi <87...@users.noreply.github.com>
AuthorDate: Fri Jan 26 18:30:25 2024 +0800
Pipe: fix connector subtasks can not be stopped and restarted after exception reporting by connector subtasks (#11979)
---
.../subtask/connector/PipeConnectorSubtask.java | 10 ++++--
.../connector/PipeConnectorSubtaskLifeCycle.java | 42 +++++++++++-----------
2 files changed, 28 insertions(+), 24 deletions(-)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 967ca2d6b86..f472a07fa5f 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -207,14 +207,18 @@ public class PipeConnectorSubtask extends PipeDataNodeSubtask {
if (throwable instanceof PipeConnectionException) {
// Retry to connect to the target system if the connection is broken
+ // We should reconstruct the client before re-submit the subtask
if (onPipeConnectionException(throwable)) {
// return if the pipe task should be stopped
return;
}
}
- // Handle other exceptions as usual
- super.onFailure(throwable);
+ // Handle exceptions if any available clients exist
+ // Notice that the PipeRuntimeConnectorCriticalException must be thrown here
+ // because the upper layer relies on this to stop all the related pipe tasks
+ // Other exceptions may cause the subtask to stop forever and can not be restarted
+ super.onFailure(new PipeRuntimeConnectorCriticalException(throwable.getMessage()));
}
/** @return true if the pipe task should be stopped, false otherwise */
@@ -252,7 +256,7 @@ public class PipeConnectorSubtask extends PipeDataNodeSubtask {
}
}
- // Stop current pipe task if failed to reconnect to
+ // Stop current pipe task directly if failed to reconnect to
// the target system after MAX_RETRY_TIMES times
if (retry == MAX_RETRY_TIMES && lastEvent instanceof EnrichedEvent) {
((EnrichedEvent) lastEvent)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
index 6759dbdbaa2..6600fae8bc3 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
@@ -35,7 +35,7 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
private final BoundedBlockingPendingQueue<Event> pendingQueue;
private int runningTaskCount;
- private int aliveTaskCount;
+ private int registeredTaskCount;
public PipeConnectorSubtaskLifeCycle(
PipeConnectorSubtaskExecutor executor,
@@ -46,7 +46,7 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
this.pendingQueue = pendingQueue;
runningTaskCount = 0;
- aliveTaskCount = 0;
+ registeredTaskCount = 0;
}
public PipeConnectorSubtask getSubtask() {
@@ -58,44 +58,44 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
}
public synchronized void register() {
- if (aliveTaskCount < 0) {
- throw new IllegalStateException("aliveTaskCount < 0");
+ if (registeredTaskCount < 0) {
+ throw new IllegalStateException("registeredTaskCount < 0");
}
- if (aliveTaskCount == 0) {
+ if (registeredTaskCount == 0) {
executor.register(subtask);
runningTaskCount = 0;
}
- aliveTaskCount++;
+ registeredTaskCount++;
LOGGER.info(
- "Register subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
+ "Register subtask {}. runningTaskCount: {}, registeredTaskCount: {}",
subtask,
runningTaskCount,
- aliveTaskCount);
+ registeredTaskCount);
}
/**
* Deregister the subtask. If the subtask is the last one, close the subtask.
*
* <p>Note that this method should be called after the subtask is stopped. Otherwise, the
- * runningTaskCount might be inconsistent with the aliveTaskCount because of parallel connector
- * scheduling.
+ * runningTaskCount might be inconsistent with the registeredTaskCount because of parallel
+ * connector scheduling.
*
* @param pipeNameToDeregister pipe name
* @return true if the subtask is out of life cycle, indicating that the subtask should never be
* used again
- * @throws IllegalStateException if aliveTaskCount <= 0
+ * @throws IllegalStateException if registeredTaskCount <= 0
*/
public synchronized boolean deregister(String pipeNameToDeregister) {
- if (aliveTaskCount <= 0) {
- throw new IllegalStateException("aliveTaskCount <= 0");
+ if (registeredTaskCount <= 0) {
+ throw new IllegalStateException("registeredTaskCount <= 0");
}
subtask.discardEventsOfPipe(pipeNameToDeregister);
try {
- if (aliveTaskCount > 1) {
+ if (registeredTaskCount > 1) {
return false;
}
@@ -103,12 +103,12 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
// This subtask is out of life cycle, should never be used again
return true;
} finally {
- aliveTaskCount--;
+ registeredTaskCount--;
LOGGER.info(
- "Deregister subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
+ "Deregister subtask {}. runningTaskCount: {}, registeredTaskCount: {}",
subtask,
runningTaskCount,
- aliveTaskCount);
+ registeredTaskCount);
}
}
@@ -123,10 +123,10 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
runningTaskCount++;
LOGGER.info(
- "Start subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
+ "Start subtask {}. runningTaskCount: {}, registeredTaskCount: {}",
subtask,
runningTaskCount,
- aliveTaskCount);
+ registeredTaskCount);
}
public synchronized void stop() {
@@ -140,10 +140,10 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
runningTaskCount--;
LOGGER.info(
- "Stop subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
+ "Stop subtask {}. runningTaskCount: {}, registeredTaskCount: {}",
subtask,
runningTaskCount,
- aliveTaskCount);
+ registeredTaskCount);
}
@Override