You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/31 18:44:51 UTC
[iotdb] branch master updated: [IOTDB-5956] Pipe: Fix bugs in IoTDBThriftReceiverV1, PipeMetaSync (#10007)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 81f541d22f2 [IOTDB-5956] Pipe: Fix bugs in IoTDBThriftReceiverV1, PipeMetaSync (#10007)
81f541d22f2 is described below
commit 81f541d22f2b4e642a8a330643709a067173cc32
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Thu Jun 1 02:44:44 2023 +0800
[IOTDB-5956] Pipe: Fix bugs in IoTDBThriftReceiverV1, PipeMetaSync (#10007)
* Fix bugs of pipeReceiveFileDir
* Fix bugs of report exception to wrong PipeTaskMeta in ConnectorSubtask
---
.../response/pipe/task/PipeTableResp.java | 4 ++
.../runtime/PipeHandleMetaChangeProcedure.java | 48 +++++++++++--
.../PipeRuntimeConnectorCriticalException.java | 73 +++++++++++++++++++
.../pipe}/PipeRuntimeCriticalException.java | 36 +++++++++-
.../exception/pipe}/PipeRuntimeException.java | 11 ++-
.../exception/pipe/PipeRuntimeExceptionType.java | 82 ++++++++++++++++++++++
.../pipe}/PipeRuntimeNonCriticalException.java | 36 +++++++++-
.../iotdb/commons/pipe/task/meta/PipeTaskMeta.java | 35 ++++-----
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 1 +
.../db/pipe/agent/runtime/PipeRuntimeAgent.java | 2 +-
.../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 4 +-
.../PipeRealtimeDataRegionHybridCollector.java | 2 +-
.../PipeRealtimeDataRegionLogCollector.java | 2 +-
.../PipeRealtimeDataRegionTsFileCollector.java | 2 +-
.../impl/iotdb/v1/IoTDBThriftConnectorV1.java | 10 ++-
.../manager/PipeConnectorSubtaskManager.java | 7 +-
.../iotdb/db/pipe/core/event/EnrichedEvent.java | 6 ++
.../event/realtime/PipeRealtimeCollectEvent.java | 18 ++++-
.../db/pipe/resource/wal/PipeWALResource.java | 4 +-
.../apache/iotdb/db/pipe/task/PipeTaskBuilder.java | 3 +-
.../db/pipe/task/stage/PipeTaskConnectorStage.java | 6 +-
.../db/pipe/task/stage/PipeTaskProcessorStage.java | 4 --
.../db/pipe/task/subtask/PipeConnectorSubtask.java | 19 ++---
.../db/pipe/task/subtask/PipeProcessorSubtask.java | 4 +-
.../iotdb/db/pipe/task/subtask/PipeSubtask.java | 23 +++---
.../executor/PipeConnectorSubtaskExecutorTest.java | 2 -
.../executor/PipeProcessorSubtaskExecutorTest.java | 2 -
27 files changed, 364 insertions(+), 82 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
index 2d6f401eefb..3153739629f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
@@ -44,6 +44,10 @@ public class PipeTableResp implements DataSet {
this.allPipeMeta = allPipeMeta;
}
+ public List<PipeMeta> getAllPipeMeta() {
+ return allPipeMeta;
+ }
+
public PipeTableResp filter(Boolean whereClause, String pipeName) {
if (whereClause == null) {
if (pipeName == null) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index b63e157f096..8f9f9207ebc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -20,19 +20,21 @@
package org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
+import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
import org.apache.iotdb.confignode.persistence.pipe.PipeTaskOperation;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.impl.pipe.task.AbstractOperatePipeProcedureV2;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.pipe.api.exception.PipeManagementException;
-import org.apache.iotdb.pipe.api.exception.PipeRuntimeCriticalException;
-import org.apache.iotdb.pipe.api.exception.PipeRuntimeException;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -148,10 +150,46 @@ public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV
for (final PipeRuntimeException exception :
runtimeMetaFromDataNode.getExceptionMessages()) {
pipeTaskMetaOnConfigNode.trackExceptionMessage(exception);
+
if (exception instanceof PipeRuntimeCriticalException) {
- pipeMetaOnConfigNode.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
- needWriteConsensusOnConfigNodes = true;
- needPushPipeMetaToDataNodes = true;
+ final String pipeName = pipeMetaOnConfigNode.getStaticMeta().getPipeName();
+ if (!pipeMetaOnConfigNode
+ .getRuntimeMeta()
+ .getStatus()
+ .get()
+ .equals(PipeStatus.STOPPED)) {
+ pipeMetaOnConfigNode.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
+ needWriteConsensusOnConfigNodes = true;
+ needPushPipeMetaToDataNodes = true;
+
+ LOGGER.warn(
+ String.format(
+ "Detect PipeRuntimeCriticalException %s from DataNode, stop pipe %s.",
+ exception, pipeName));
+ }
+
+ if (exception instanceof PipeRuntimeConnectorCriticalException) {
+ ((PipeTableResp)
+ env.getConfigManager()
+ .getPipeManager()
+ .getPipeTaskCoordinator()
+ .getPipeTaskInfo()
+ .showPipes())
+ .filter(true, pipeName).getAllPipeMeta().stream()
+ .map(pipeMeta -> pipeMeta.getRuntimeMeta().getStatus())
+ .filter(status -> !status.get().equals(PipeStatus.STOPPED))
+ .forEach(
+ status -> {
+ status.set(PipeStatus.STOPPED);
+ needWriteConsensusOnConfigNodes = true;
+ needPushPipeMetaToDataNodes = true;
+
+ LOGGER.warn(
+ String.format(
+ "Detect PipeRuntimeConnectorCriticalException %s from DataNode, stop pipe %s.",
+ exception, pipeName));
+ });
+ }
}
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorCriticalException.java b/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorCriticalException.java
new file mode 100644
index 00000000000..615f3725457
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorCriticalException.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.exception.pipe;
+
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class PipeRuntimeConnectorCriticalException extends PipeRuntimeCriticalException {
+ public PipeRuntimeConnectorCriticalException(String message) {
+ super(message);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof PipeRuntimeConnectorCriticalException
+ && Objects.equals(getMessage(), ((PipeRuntimeConnectorCriticalException) obj).getMessage());
+ }
+
+ @Override
+ public int hashCode() {
+ return getMessage().hashCode();
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ PipeRuntimeExceptionType.CONNECTOR_CRITICAL_EXCEPTION.serialize(byteBuffer);
+ ReadWriteIOUtils.write(getMessage(), byteBuffer);
+ }
+
+ @Override
+ public void serialize(OutputStream stream) throws IOException {
+ PipeRuntimeExceptionType.CONNECTOR_CRITICAL_EXCEPTION.serialize(stream);
+ ReadWriteIOUtils.write(getMessage(), stream);
+ }
+
+ public static PipeRuntimeConnectorCriticalException deserializeFrom(ByteBuffer byteBuffer) {
+ final String message = ReadWriteIOUtils.readString(byteBuffer);
+ return new PipeRuntimeConnectorCriticalException(message);
+ }
+
+ public static PipeRuntimeConnectorCriticalException deserializeFrom(InputStream stream)
+ throws IOException {
+ final String message = ReadWriteIOUtils.readString(stream);
+ return new PipeRuntimeConnectorCriticalException(message);
+ }
+
+ @Override
+ public String toString() {
+ return "PipeRuntimeConnectorCriticalException{ message: " + getMessage() + " }";
+ }
+}
diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeRuntimeCriticalException.java b/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeCriticalException.java
similarity index 52%
rename from iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeRuntimeCriticalException.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeCriticalException.java
index fbe73118385..b6bd3c11a1c 100644
--- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeRuntimeCriticalException.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeCriticalException.java
@@ -17,8 +17,14 @@
* under the License.
*/
-package org.apache.iotdb.pipe.api.exception;
+package org.apache.iotdb.commons.exception.pipe;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.Objects;
public class PipeRuntimeCriticalException extends PipeRuntimeException {
@@ -37,4 +43,32 @@ public class PipeRuntimeCriticalException extends PipeRuntimeException {
public int hashCode() {
return getMessage().hashCode();
}
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ PipeRuntimeExceptionType.CRITICAL_EXCEPTION.serialize(byteBuffer);
+ ReadWriteIOUtils.write(getMessage(), byteBuffer);
+ }
+
+ @Override
+ public void serialize(OutputStream stream) throws IOException {
+ PipeRuntimeExceptionType.CRITICAL_EXCEPTION.serialize(stream);
+ ReadWriteIOUtils.write(getMessage(), stream);
+ }
+
+ public static PipeRuntimeCriticalException deserializeFrom(ByteBuffer byteBuffer) {
+ final String message = ReadWriteIOUtils.readString(byteBuffer);
+ return new PipeRuntimeCriticalException(message);
+ }
+
+ public static PipeRuntimeCriticalException deserializeFrom(InputStream stream)
+ throws IOException {
+ final String message = ReadWriteIOUtils.readString(stream);
+ return new PipeRuntimeCriticalException(message);
+ }
+
+ @Override
+ public String toString() {
+ return "PipeRuntimeCriticalException{ message: " + getMessage() + " }";
+ }
}
diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeRuntimeException.java b/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeException.java
similarity index 79%
rename from iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeRuntimeException.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeException.java
index 9b20dff7bff..9dd4dec5bac 100644
--- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeRuntimeException.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeException.java
@@ -17,8 +17,13 @@
* under the License.
*/
-package org.apache.iotdb.pipe.api.exception;
+package org.apache.iotdb.commons.exception.pipe;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.Objects;
public abstract class PipeRuntimeException extends PipeException {
@@ -37,4 +42,8 @@ public abstract class PipeRuntimeException extends PipeException {
public int hashCode() {
return Objects.hash(getMessage());
}
+
+ public abstract void serialize(ByteBuffer byteBuffer);
+
+ public abstract void serialize(OutputStream stream) throws IOException;
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeExceptionType.java b/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeExceptionType.java
new file mode 100644
index 00000000000..17d6ec37827
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeExceptionType.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.exception.pipe;
+
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public enum PipeRuntimeExceptionType {
+ NON_CRITICAL_EXCEPTION((short) 1),
+ CRITICAL_EXCEPTION((short) 2),
+ CONNECTOR_CRITICAL_EXCEPTION((short) 3),
+ ;
+
+ private final short type;
+
+ PipeRuntimeExceptionType(short type) {
+ this.type = type;
+ }
+
+ public short getType() {
+ return type;
+ }
+
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(type, byteBuffer);
+ }
+
+ public void serialize(OutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(type, stream);
+ }
+
+ public static PipeRuntimeException deserializeFrom(ByteBuffer byteBuffer) {
+ final short type = ReadWriteIOUtils.readShort(byteBuffer);
+ switch (type) {
+ case 1:
+ return PipeRuntimeNonCriticalException.deserializeFrom(byteBuffer);
+ case 2:
+ return PipeRuntimeCriticalException.deserializeFrom(byteBuffer);
+ case 3:
+ return PipeRuntimeConnectorCriticalException.deserializeFrom(byteBuffer);
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Unsupported PipeRuntimeException type %s.", type));
+ }
+ }
+
+ public static PipeRuntimeException deserializeFrom(InputStream stream) throws IOException {
+ final short type = ReadWriteIOUtils.readShort(stream);
+ switch (type) {
+ case 1:
+ return PipeRuntimeNonCriticalException.deserializeFrom(stream);
+ case 2:
+ return PipeRuntimeCriticalException.deserializeFrom(stream);
+ case 3:
+ return PipeRuntimeConnectorCriticalException.deserializeFrom(stream);
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Unsupported PipeRuntimeException type %s.", type));
+ }
+ }
+}
diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeRuntimeNonCriticalException.java b/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeNonCriticalException.java
similarity index 51%
rename from iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeRuntimeNonCriticalException.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeNonCriticalException.java
index 6fa63b93cc1..4bc7345924f 100644
--- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeRuntimeNonCriticalException.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeNonCriticalException.java
@@ -17,8 +17,14 @@
* under the License.
*/
-package org.apache.iotdb.pipe.api.exception;
+package org.apache.iotdb.commons.exception.pipe;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.Objects;
public class PipeRuntimeNonCriticalException extends PipeRuntimeException {
@@ -37,4 +43,32 @@ public class PipeRuntimeNonCriticalException extends PipeRuntimeException {
public int hashCode() {
return getMessage().hashCode();
}
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ PipeRuntimeExceptionType.NON_CRITICAL_EXCEPTION.serialize(byteBuffer);
+ ReadWriteIOUtils.write(getMessage(), byteBuffer);
+ }
+
+ @Override
+ public void serialize(OutputStream stream) throws IOException {
+ PipeRuntimeExceptionType.NON_CRITICAL_EXCEPTION.serialize(stream);
+ ReadWriteIOUtils.write(getMessage(), stream);
+ }
+
+ public static PipeRuntimeNonCriticalException deserializeFrom(ByteBuffer byteBuffer) {
+ final String message = ReadWriteIOUtils.readString(byteBuffer);
+ return new PipeRuntimeNonCriticalException(message);
+ }
+
+ public static PipeRuntimeNonCriticalException deserializeFrom(InputStream stream)
+ throws IOException {
+ final String message = ReadWriteIOUtils.readString(stream);
+ return new PipeRuntimeNonCriticalException(message);
+ }
+
+ @Override
+ public String toString() {
+ return "PipeRuntimeNonCriticalException{ message: " + getMessage() + " }";
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
index d4f43d15475..46547704d08 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
@@ -21,9 +21,8 @@ package org.apache.iotdb.commons.pipe.task.meta;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
-import org.apache.iotdb.pipe.api.exception.PipeRuntimeCriticalException;
-import org.apache.iotdb.pipe.api.exception.PipeRuntimeException;
-import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeExceptionType;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
@@ -81,10 +80,8 @@ public class PipeTaskMeta {
progressIndex.get().serialize(outputStream);
ReadWriteIOUtils.write(leaderDataNodeId.get(), outputStream);
ReadWriteIOUtils.write(exceptionMessages.size(), outputStream);
- for (final PipeRuntimeException exceptionMessage : exceptionMessages) {
- ReadWriteIOUtils.write(
- exceptionMessage instanceof PipeRuntimeCriticalException, outputStream);
- ReadWriteIOUtils.write(exceptionMessage.getMessage(), outputStream);
+ for (final PipeRuntimeException pipeRuntimeException : exceptionMessages) {
+ pipeRuntimeException.serialize(outputStream);
}
}
@@ -92,10 +89,8 @@ public class PipeTaskMeta {
progressIndex.get().serialize(outputStream);
ReadWriteIOUtils.write(leaderDataNodeId.get(), outputStream);
ReadWriteIOUtils.write(exceptionMessages.size(), outputStream);
- for (final PipeRuntimeException exceptionMessage : exceptionMessages) {
- ReadWriteIOUtils.write(
- exceptionMessage instanceof PipeRuntimeCriticalException, outputStream);
- ReadWriteIOUtils.write(exceptionMessage.getMessage(), outputStream);
+ for (final PipeRuntimeException pipeRuntimeException : exceptionMessages) {
+ pipeRuntimeException.serialize(outputStream);
}
}
@@ -105,12 +100,9 @@ public class PipeTaskMeta {
final PipeTaskMeta PipeTaskMeta = new PipeTaskMeta(progressIndex, leaderDataNodeId);
final int size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; ++i) {
- final boolean critical = ReadWriteIOUtils.readBool(byteBuffer);
- final String message = ReadWriteIOUtils.readString(byteBuffer);
- PipeTaskMeta.exceptionMessages.add(
- critical
- ? new PipeRuntimeCriticalException(message)
- : new PipeRuntimeNonCriticalException(message));
+ final PipeRuntimeException pipeRuntimeException =
+ PipeRuntimeExceptionType.deserializeFrom(byteBuffer);
+ PipeTaskMeta.exceptionMessages.add(pipeRuntimeException);
}
return PipeTaskMeta;
}
@@ -121,12 +113,9 @@ public class PipeTaskMeta {
final PipeTaskMeta PipeTaskMeta = new PipeTaskMeta(progressIndex, leaderDataNodeId);
final int size = ReadWriteIOUtils.readInt(inputStream);
for (int i = 0; i < size; ++i) {
- final boolean critical = ReadWriteIOUtils.readBool(inputStream);
- final String message = ReadWriteIOUtils.readString(inputStream);
- PipeTaskMeta.exceptionMessages.add(
- critical
- ? new PipeRuntimeCriticalException(message)
- : new PipeRuntimeNonCriticalException(message));
+ final PipeRuntimeException pipeRuntimeException =
+ PipeRuntimeExceptionType.deserializeFrom(inputStream);
+ PipeTaskMeta.exceptionMessages.add(pipeRuntimeException);
}
return PipeTaskMeta;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index b4ae69a80cf..cdd894b5d47 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1261,6 +1261,7 @@ public class IoTDBConfig {
triggerTemporaryLibDir = addDataHomeDir(triggerTemporaryLibDir);
pipeDir = addDataHomeDir(pipeDir);
pipeTemporaryLibDir = addDataHomeDir(pipeTemporaryLibDir);
+ pipeReceiveFileDir = addDataHomeDir(pipeReceiveFileDir);
mqttDir = addDataHomeDir(mqttDir);
extPipeDir = addDataHomeDir(extPipeDir);
queryDir = addDataHomeDir(queryDir);
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 884cecf4a74..b4e4077ea5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.agent.runtime;
import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.service.IService;
@@ -28,7 +29,6 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.resource.file.PipeHardlinkFileDirStartupCleaner;
import org.apache.iotdb.db.service.ResourcesInformationHolder;
-import org.apache.iotdb.pipe.api.exception.PipeRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index d0596127340..d086e6e71ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -518,9 +518,7 @@ public class PipeTaskAgent {
.getPipeMeta(pipeStaticMeta.getPipeName())
.getRuntimeMeta()
.getConsensusGroupIdToTaskMetaMap()
- .put(
- consensusGroupId,
- new PipeTaskMeta(pipeTaskMeta.getProgressIndex(), pipeTaskMeta.getLeaderDataNodeId()));
+ .put(consensusGroupId, pipeTaskMeta);
}
private void dropPipeTask(TConsensusGroupId dataRegionGroupId, PipeStaticMeta pipeStaticMeta) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index 493240455aa..cf01a1ea15a 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.core.collector.realtime;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
@@ -28,7 +29,6 @@ import org.apache.iotdb.db.pipe.task.queue.UnboundedBlockingPendingQueue;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
-import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
index e6835187cfc..3263e52c1e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.core.collector.realtime;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
@@ -26,7 +27,6 @@ import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
import org.apache.iotdb.db.pipe.task.queue.UnboundedBlockingPendingQueue;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
index 1b00fdd974e..7b287b10545 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.core.collector.realtime;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
@@ -26,7 +27,6 @@ import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
import org.apache.iotdb.db.pipe.task.queue.UnboundedBlockingPendingQueue;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
-import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
index d320366552e..6f7db5bbf97 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
@@ -126,7 +126,10 @@ public class IoTDBThriftConnectorV1 implements PipeConnector {
LOGGER.error(
"Network error when transfer tablet insertion event: {}.", tabletInsertionEvent, e);
// the connection may be broken, try to reconnect by catching PipeConnectionException
- throw new PipeConnectionException("Network error when transfer tablet insertion event.", e);
+ throw new PipeConnectionException(
+ String.format(
+ "Network error when transfer tablet insertion event, because %s.", e.getMessage()),
+ e);
}
}
@@ -159,7 +162,10 @@ public class IoTDBThriftConnectorV1 implements PipeConnector {
LOGGER.error(
"Network error when transfer tsfile insertion event: {}.", tsFileInsertionEvent, e);
// the connection may be broken, try to reconnect by catching PipeConnectionException
- throw new PipeConnectionException("Network error when transfer tsfile insertion event.", e);
+ throw new PipeConnectionException(
+ String.format(
+ "Network error when transfer tsfile insertion event, because %s.", e.getMessage()),
+ e);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
index 94b2975358c..548a48fdc0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.core.connector.manager;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.config.PipeConnectorConstant;
import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.IoTDBThriftConnectorV1;
@@ -46,9 +45,7 @@ public class PipeConnectorSubtaskManager {
attributeSortedString2SubtaskLifeCycleMap = new HashMap<>();
public synchronized String register(
- PipeConnectorSubtaskExecutor executor,
- PipeParameters pipeConnectorParameters,
- PipeTaskMeta taskMeta) {
+ PipeConnectorSubtaskExecutor executor, PipeParameters pipeConnectorParameters) {
final String attributeSortedString =
new TreeMap<>(pipeConnectorParameters.getAttribute()).toString();
@@ -81,7 +78,7 @@ public class PipeConnectorSubtaskManager {
new BoundedBlockingPendingQueue<>(
PipeConfig.getInstance().getPipeConnectorPendingQueueSize());
final PipeConnectorSubtask pipeConnectorSubtask =
- new PipeConnectorSubtask(attributeSortedString, taskMeta, pendingQueue, pipeConnector);
+ new PipeConnectorSubtask(attributeSortedString, pendingQueue, pipeConnector);
final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle =
new PipeConnectorSubtaskLifeCycle(executor, pipeConnectorSubtask, pendingQueue);
attributeSortedString2SubtaskLifeCycleMap.put(
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
index 28e611bd301..9fcc5fd1d90 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
@@ -20,7 +20,9 @@
package org.apache.iotdb.db.pipe.core.event;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.pipe.api.event.Event;
import java.util.concurrent.atomic.AtomicInteger;
@@ -114,4 +116,8 @@ public abstract class EnrichedEvent implements Event {
public abstract EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
PipeTaskMeta pipeTaskMeta);
+
+ public void reportException(PipeRuntimeException pipeRuntimeException) {
+ PipeAgent.runtime().report(this.pipeTaskMeta, pipeRuntimeException);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
index 15479bc3788..f15ef605617 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
@@ -49,6 +49,21 @@ public class PipeRealtimeCollectEvent extends EnrichedEvent {
this.device2Measurements = device2Measurements;
}
+ public PipeRealtimeCollectEvent(
+ EnrichedEvent event,
+ TsFileEpoch tsFileEpoch,
+ Map<String, String[]> device2Measurements,
+ PipeTaskMeta pipeTaskMeta) {
+ // pipeTaskMeta is used to report the progress of the event, the PipeRealtimeCollectEvent
+ // is only used in the realtime event collector, which does not need to report the progress
+ // of the event, so the pipeTaskMeta is always null.
+ super(pipeTaskMeta);
+
+ this.event = event;
+ this.tsFileEpoch = tsFileEpoch;
+ this.device2Measurements = device2Measurements;
+ }
+
public Event getEvent() {
return event;
}
@@ -104,7 +119,8 @@ public class PipeRealtimeCollectEvent extends EnrichedEvent {
return new PipeRealtimeCollectEvent(
event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(pipeTaskMeta),
this.tsFileEpoch,
- this.device2Measurements);
+ this.device2Measurements,
+ pipeTaskMeta);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
index 9ccb946ae45..b4a01a7828b 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
@@ -19,10 +19,10 @@
package org.apache.iotdb.db.pipe.resource.wal;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.db.wal.exception.MemTablePinException;
import org.apache.iotdb.db.wal.utils.WALEntryHandler;
-import org.apache.iotdb.pipe.api.exception.PipeRuntimeCriticalException;
-import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
index 1886f489c41..715816cfbcc 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
@@ -50,14 +50,13 @@ public class PipeTaskBuilder {
pipeStaticMeta.getCreationTime(),
pipeStaticMeta.getCollectorParameters());
final PipeTaskConnectorStage connectorStage =
- new PipeTaskConnectorStage(pipeStaticMeta.getConnectorParameters(), pipeTaskMeta);
+ new PipeTaskConnectorStage(pipeStaticMeta.getConnectorParameters());
// the processor connects the collector and connector.
final PipeTaskProcessorStage processorStage =
new PipeTaskProcessorStage(
pipeStaticMeta.getPipeName(),
dataRegionId,
- pipeTaskMeta,
collectorStage.getEventSupplier(),
collectorStage.getCollectorPendingQueue(),
pipeStaticMeta.getProcessorParameters(),
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index 5e43b1a2d18..5615c36f560 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.pipe.task.stage;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.core.connector.manager.PipeConnectorSubtaskManager;
import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
import org.apache.iotdb.db.pipe.task.queue.BoundedBlockingPendingQueue;
@@ -33,14 +32,13 @@ public class PipeTaskConnectorStage extends PipeTaskStage {
protected String connectorSubtaskId;
- public PipeTaskConnectorStage(PipeParameters pipeConnectorParameters, PipeTaskMeta taskMeta) {
+ public PipeTaskConnectorStage(PipeParameters pipeConnectorParameters) {
this.pipeConnectorParameters = pipeConnectorParameters;
connectorSubtaskId =
PipeConnectorSubtaskManager.instance()
.register(
PipeSubtaskExecutorManager.getInstance().getConnectorSubtaskExecutor(),
- pipeConnectorParameters,
- taskMeta);
+ pipeConnectorParameters);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index c0bad32a3f0..7af0df4ecff 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.pipe.task.stage;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.core.event.view.collector.PipeEventCollector;
import org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
@@ -53,7 +52,6 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
/**
* @param pipeName pipe name
* @param dataRegionId data region id
- * @param taskMeta pipe task meta
* @param pipeCollectorInputEventSupplier used to input events from pipe collector
* @param pipeCollectorInputPendingQueue used to listen whether pipe collector event queue is from
* empty to not empty or from not empty to empty, null means no need to listen
@@ -63,7 +61,6 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
public PipeTaskProcessorStage(
String pipeName,
TConsensusGroupId dataRegionId,
- PipeTaskMeta taskMeta,
EventSupplier pipeCollectorInputEventSupplier,
@Nullable BlockingPendingQueue<Event> pipeCollectorInputPendingQueue,
PipeParameters pipeProcessorParameters,
@@ -78,7 +75,6 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
this.pipeProcessorSubtask =
new PipeProcessorSubtask(
taskId,
- taskMeta,
pipeCollectorInputEventSupplier,
pipeProcessor,
pipeConnectorOutputEventCollector);
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index 12802d88e5f..f34b2b3f961 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -19,9 +19,9 @@
package org.apache.iotdb.db.pipe.task.subtask;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
-import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.task.queue.BoundedBlockingPendingQueue;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
@@ -29,7 +29,6 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.pipe.api.exception.PipeRuntimeCriticalException;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
@@ -48,10 +47,9 @@ public class PipeConnectorSubtask extends PipeSubtask {
/** @param taskID connectorAttributeSortedString */
public PipeConnectorSubtask(
String taskID,
- PipeTaskMeta taskMeta,
BoundedBlockingPendingQueue<Event> inputPendingQueue,
PipeConnector outputPipeConnector) {
- super(taskID, taskMeta);
+ super(taskID);
this.inputPendingQueue = inputPendingQueue;
this.outputPipeConnector = outputPipeConnector;
executeOnceInvokedTimes = 0;
@@ -88,7 +86,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
} catch (PipeConnectionException e) {
throw e;
} catch (Exception e) {
- e.printStackTrace();
+ LOGGER.warn("Execute Connector subtask once error.", e);
throw new PipeException(
"Error occurred during executing PipeConnector#transfer, perhaps need to check whether the implementation of PipeConnector is correct according to the pipe-api description.",
e);
@@ -126,10 +124,13 @@ public class PipeConnectorSubtask extends PipeSubtask {
String.format(
"Failed to reconnect to the target system after %d times, stopping current pipe task %s...",
MAX_RETRY_TIMES, taskID);
- LOGGER.warn(errorMessage);
+ LOGGER.warn(errorMessage, throwable);
lastFailedCause = throwable;
- PipeAgent.runtime().report(taskMeta, new PipeRuntimeCriticalException(errorMessage));
+ if (lastEvent instanceof EnrichedEvent) {
+ ((EnrichedEvent) lastEvent)
+ .reportException(new PipeRuntimeConnectorCriticalException(throwable.getMessage()));
+ }
// although the pipe task will be stopped, we still don't release the last event here
// because we need to keep it for the next retry. if user wants to restart the task,
@@ -140,7 +141,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
}
// handle other exceptions as usual
- super.onFailure(throwable);
+ super.onFailure(new PipeRuntimeConnectorCriticalException(throwable.getMessage()));
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index 94bcb248911..6a76beb02b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.pipe.task.subtask;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.collector.EventCollector;
@@ -41,11 +40,10 @@ public class PipeProcessorSubtask extends PipeSubtask {
public PipeProcessorSubtask(
String taskID,
- PipeTaskMeta taskMeta,
EventSupplier inputEventSupplier,
PipeProcessor pipeProcessor,
EventCollector outputEventCollector) {
- super(taskID, taskMeta);
+ super(taskID);
this.inputEventSupplier = inputEventSupplier;
this.pipeProcessor = pipeProcessor;
this.outputEventCollector = outputEventCollector;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index b1331f4df48..524e9abbcfa 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -19,12 +19,11 @@
package org.apache.iotdb.db.pipe.task.subtask;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
-import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler;
import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.exception.PipeRuntimeCriticalException;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@@ -44,7 +43,6 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
private static final Logger LOGGER = LoggerFactory.getLogger(PipeSubtask.class);
protected final String taskID;
- protected final PipeTaskMeta taskMeta;
private ListeningExecutorService subtaskWorkerThreadPoolExecutor;
private ExecutorService subtaskCallbackListeningExecutor;
@@ -59,10 +57,9 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
protected Event lastEvent;
- protected PipeSubtask(String taskID, PipeTaskMeta taskMeta) {
+ protected PipeSubtask(String taskID) {
super();
this.taskID = taskID;
- this.taskMeta = taskMeta;
}
public void bindExecutors(
@@ -112,16 +109,26 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
public void onFailure(@NotNull Throwable throwable) {
if (retryCount.get() < MAX_RETRY_TIMES) {
retryCount.incrementAndGet();
+ LOGGER.warn(
+ String.format(
+ "Retry subtask %s, retry count [%s/%s]",
+ this.getClass().getSimpleName(), retryCount.get(), MAX_RETRY_TIMES));
submitSelf();
} else {
final String errorMessage =
String.format(
"Subtask %s failed, has been retried for %d times, last failed because of %s",
taskID, retryCount.get(), throwable);
- LOGGER.warn(errorMessage);
+ LOGGER.warn(errorMessage, throwable);
lastFailedCause = throwable;
- PipeAgent.runtime().report(taskMeta, new PipeRuntimeCriticalException(errorMessage));
+ if (lastEvent instanceof EnrichedEvent) {
+ ((EnrichedEvent) lastEvent)
+ .reportException(
+ throwable instanceof PipeRuntimeException
+ ? (PipeRuntimeException) throwable
+ : new PipeRuntimeCriticalException(errorMessage));
+ }
// although the pipe task will be stopped, we still don't release the last event here
// because we need to keep it for the next retry. if user wants to restart the task,
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
index c01d6f51b39..802afcf16d9 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.pipe.execution.executor;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.task.queue.BoundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -39,7 +38,6 @@ public class PipeConnectorSubtaskExecutorTest extends PipeSubtaskExecutorTest {
Mockito.spy(
new PipeConnectorSubtask(
"PipeConnectorSubtaskExecutorTest",
- mock(PipeTaskMeta.class),
mock(BoundedBlockingPendingQueue.class),
mock(PipeConnector.class)));
}
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
index c20eab04cbd..d0a5208d537 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.pipe.execution.executor;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
import org.apache.iotdb.pipe.api.PipeProcessor;
@@ -40,7 +39,6 @@ public class PipeProcessorSubtaskExecutorTest extends PipeSubtaskExecutorTest {
Mockito.spy(
new PipeProcessorSubtask(
"PipeProcessorSubtaskExecutorTest",
- mock(PipeTaskMeta.class),
mock(EventSupplier.class),
mock(PipeProcessor.class),
mock(EventCollector.class)));