You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/06/22 06:25:34 UTC
[rocketmq-connect] branch master updated: [ISSUE #24] sink, source retry mechanism, dead letter queue support (#148)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 0249c93 [ISSUE #24] sink, source retry mechanism, dead letter queue support (#148)
0249c93 is described below
commit 0249c9386aebf5a54ff26191877cf0722f4fd181
Author: xiaoyi <su...@163.com>
AuthorDate: Wed Jun 22 14:25:30 2022 +0800
[ISSUE #24] sink, source retry mechanism, dead letter queue support (#148)
* init standalone
* fixed
* complete standalone mode
* test standalone mode
* check code
* fixed
* Ensure the uniqueness of offset keys for different connectors #143
* add sink、source retry mechanism, dead letter queue support #24
* upgrade rocketmq connect JDBC plug-in #153
* upgrade rocketmq jdbc plugin and runtime api to 0.1.3 #153
* fixed
* add Decimal logical
* optimize transform api
* upgrade api to 0.1.3
* fixed
* fixed
Co-authored-by: “sunxiaojian” <“sunxiaojian926@163.com”>
---
connectors/rocketmq-connect-jdbc/pom.xml | 2 -
.../connect/jdbc/connector/JdbcSinkTask.java | 5 +-
.../runtime/connectorwrapper/TransformChain.java | 22 +-
.../connect/runtime/connectorwrapper/Worker.java | 21 +-
.../runtime/connectorwrapper/WorkerSinkTask.java | 25 +-
.../connectorwrapper/WorkerSinkTaskContext.java | 17 +-
.../runtime/connectorwrapper/WorkerSourceTask.java | 20 +-
.../runtime/errors/DeadLetterQueueConfig.java | 152 +++++++++++++
.../runtime/errors/DeadLetterQueueReporter.java | 234 +++++++++++++++++++
.../connect/runtime/errors/ErrorReporter.java | 76 +++++++
.../connect/runtime/errors/LogReporter.java | 66 ++++++
.../rocketmq/connect/runtime/errors/Operation.java | 28 +++
.../connect/runtime/errors/ProcessingContext.java | 243 ++++++++++++++++++++
.../runtime/errors/ReporterManagerUtil.java | 105 +++++++++
.../runtime/errors/RetryWithToleranceOperator.java | 253 +++++++++++++++++++++
.../connect/runtime/errors/ToleranceType.java | 41 ++++
.../runtime/errors/WorkerErrorRecordReporter.java | 54 +++++
.../connect/runtime/utils/ConnectUtil.java | 18 ++
.../rocketmq/connect/runtime/utils/Utils.java | 54 +++++
.../runtime/connectorwrapper/WorkerTest.java | 10 +-
.../connect/runtime/rest/RestHandlerTest.java | 15 +-
21 files changed, 1439 insertions(+), 22 deletions(-)
diff --git a/connectors/rocketmq-connect-jdbc/pom.xml b/connectors/rocketmq-connect-jdbc/pom.xml
index 59d7f4b..f525cd7 100644
--- a/connectors/rocketmq-connect-jdbc/pom.xml
+++ b/connectors/rocketmq-connect-jdbc/pom.xml
@@ -46,8 +46,6 @@
<junit.version>4.13.1</junit.version>
<assertj.version>2.6.0</assertj.version>
<mockito.version>2.6.3</mockito.version>
-
- <!--rocket connect api-->
<openmessaging-connector.version>0.1.3</openmessaging-connector.version>
<openmessaging-api.version>0.3.1-alpha</openmessaging-api.version>
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java
index 35656eb..e967849 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.connect.jdbc.connector;
import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.ErrorRecordReporter;
import io.openmessaging.connector.api.component.task.sink.SinkTask;
import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
import io.openmessaging.connector.api.data.ConnectRecord;
@@ -42,6 +43,7 @@ public class JdbcSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(JdbcSinkTask.class);
private SinkTaskContext context;
+ private ErrorRecordReporter errorRecordReporter;
private KeyValue originalConfig;
private JdbcSinkConfig config;
private DatabaseDialect dialect;
@@ -73,6 +75,7 @@ public class JdbcSinkTask extends SinkTask {
remainingRetries--;
throw new RetriableException(sqlAllMessagesException);
}
+
}
remainingRetries = config.getMaxRetries();
}
@@ -86,7 +89,7 @@ public class JdbcSinkTask extends SinkTask {
sqlAllMessagesException.setNextException(sqle);
return sqlAllMessagesException;
}
-
+
/**
* Start the component
*
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
index 63cf7d1..8308df7 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
@@ -23,13 +23,17 @@ import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.Transform;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.internal.DefaultKeyValue;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
+import org.apache.rocketmq.connect.runtime.errors.ErrorReporter;
+import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.rocketmq.connect.runtime.utils.Plugin;
import org.apache.rocketmq.connect.runtime.utils.PluginClassLoader;
import org.slf4j.Logger;
@@ -49,6 +53,8 @@ public class TransformChain<R extends ConnectRecord> implements AutoCloseable {
private static final String PREFIX = RuntimeConfigDefine.TRANSFORMS + "-";
+ private RetryWithToleranceOperator retryWithToleranceOperator;
+
public TransformChain(KeyValue config, Plugin plugin) {
this.config = config;
this.plugin = plugin;
@@ -56,6 +62,13 @@ public class TransformChain<R extends ConnectRecord> implements AutoCloseable {
init();
}
+ /**
+ * set retryWithToleranceOperator
+ */
+ public void retryWithToleranceOperator(RetryWithToleranceOperator retryWithToleranceOperator) {
+ this.retryWithToleranceOperator = retryWithToleranceOperator;
+ }
+
private void init() {
String transformsStr = config.getString(RuntimeConfigDefine.TRANSFORMS);
if (StringUtils.isBlank(transformsStr)) {
@@ -94,7 +107,14 @@ public class TransformChain<R extends ConnectRecord> implements AutoCloseable {
}
for (final Transform<R> transform : transformList) {
final R currentRecord = connectRecord;
- connectRecord = transform.doTransform(currentRecord);
+ if (this.retryWithToleranceOperator == null) {
+ connectRecord = transform.doTransform(currentRecord);
+ } else {
+ connectRecord = this.retryWithToleranceOperator.execute(
+ () -> transform.doTransform(currentRecord), ErrorReporter.Stage.TRANSFORMATION, transform.getClass()
+ );
+ }
+
if (connectRecord == null) {
break;
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index 58c98ce..c1bedaf 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -49,6 +49,8 @@ import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
+import org.apache.rocketmq.connect.runtime.errors.ReporterManagerUtil;
+import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
import org.apache.rocketmq.connect.runtime.service.DefaultConnectorContext;
import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
@@ -393,6 +395,7 @@ public class Worker {
}
// STEP 2: try to create new tasks
+ int taskId = 0;
for (String connectorName : newTasks.keySet()) {
for (ConnectKeyValue keyValue : newTasks.get(connectorName)) {
String taskType = keyValue.getString(RuntimeConfigDefine.TASK_TYPE);
@@ -423,10 +426,15 @@ public class Worker {
Plugin.compareAndSwapLoaders(loader);
}
if (task instanceof SourceTask) {
+
DefaultMQProducer producer = ConnectUtil.initDefaultMQProducer(connectConfig);
TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
+ // create retry operator
+ RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(keyValue);
+ retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters(connectorName, keyValue));
+
WorkerSourceTask workerSourceTask = new WorkerSourceTask(connectorName,
- (SourceTask) task, keyValue, positionManagementService, recordConverter, producer, workerState, connectStatsManager, connectStatsService, transformChain);
+ (SourceTask) task, keyValue, positionManagementService, recordConverter, producer, workerState, connectStatsManager, connectStatsService, transformChain, retryWithToleranceOperator);
Plugin.compareAndSwapLoaders(currentThreadLoader);
Future future = taskExecutor.submit(workerSourceTask);
@@ -434,14 +442,21 @@ public class Worker {
this.pendingTasks.put(workerSourceTask, System.currentTimeMillis());
} else if (task instanceof SinkTask) {
log.info("sink task config keyValue is {}", keyValue.getProperties());
- DefaultMQPullConsumer consumer = ConnectUtil.initDefaultMQPullConsumer(connectConfig, connectorName, keyValue);
+ DefaultMQPullConsumer consumer = ConnectUtil.initDefaultMQPullConsumer(connectConfig, connectorName, keyValue, ++taskId);
Set<String> consumerGroupSet = ConnectUtil.fetchAllConsumerGroupList(connectConfig);
if (!consumerGroupSet.contains(consumer.getConsumerGroup())) {
ConnectUtil.createSubGroup(connectConfig, consumer.getConsumerGroup());
}
TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
+
+ // create retry operator
+ RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(keyValue);
+ retryWithToleranceOperator.reporters(ReporterManagerUtil.sinkTaskReporters(connectorName, keyValue, connectConfig));
+
+
WorkerSinkTask workerSinkTask = new WorkerSinkTask(connectorName,
- (SinkTask) task, keyValue, recordConverter, consumer, workerState, connectStatsManager, connectStatsService, transformChain);
+ (SinkTask) task, keyValue, recordConverter, consumer, workerState, connectStatsManager, connectStatsService, transformChain,
+ retryWithToleranceOperator, ReporterManagerUtil.createWorkerErrorRecordReporter(keyValue, retryWithToleranceOperator, recordConverter));
Plugin.compareAndSwapLoaders(currentThreadLoader);
Future future = taskExecutor.submit(workerSinkTask);
taskToFutureMap.put(workerSinkTask, future);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
index 62adc5b..125e269 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
@@ -60,6 +60,9 @@ import org.apache.rocketmq.connect.runtime.common.QueueState;
import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
import org.apache.rocketmq.connect.runtime.config.SinkConnectorConfig;
import org.apache.rocketmq.connect.runtime.converter.RocketMQConverter;
+import org.apache.rocketmq.connect.runtime.errors.ErrorReporter;
+import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.rocketmq.connect.runtime.errors.WorkerErrorRecordReporter;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
@@ -158,6 +161,10 @@ public class WorkerSinkTask implements WorkerTask {
private final TransformChain<ConnectRecord> transformChain;
+ private WorkerErrorRecordReporter errorRecordReporter;
+ private RetryWithToleranceOperator retryWithToleranceOperator;
+
+
public static final String BROKER_NAME = "brokerName";
public static final String QUEUE_ID = "queueId";
public static final String TOPIC = "topic";
@@ -183,7 +190,9 @@ public class WorkerSinkTask implements WorkerTask {
AtomicReference<WorkerState> workerState,
ConnectStatsManager connectStatsManager,
ConnectStatsService connectStatsService,
- TransformChain<ConnectRecord> transformChain) {
+ TransformChain<ConnectRecord> transformChain,
+ RetryWithToleranceOperator retryWithToleranceOperator,
+ WorkerErrorRecordReporter errorRecordReporter) {
this.connectorName = connectorName;
this.sinkTask = sinkTask;
this.taskConfig = taskConfig;
@@ -197,6 +206,9 @@ public class WorkerSinkTask implements WorkerTask {
this.connectStatsService = connectStatsService;
this.stopPullMsgLatch = new CountDownLatch(1);
this.transformChain = transformChain;
+ this.errorRecordReporter = errorRecordReporter;
+ this.retryWithToleranceOperator = retryWithToleranceOperator;
+ this.transformChain.retryWithToleranceOperator(retryWithToleranceOperator);
}
/**
@@ -538,7 +550,9 @@ public class WorkerSinkTask implements WorkerTask {
private void receiveMessages(List<MessageExt> messages) {
List<ConnectRecord> sinkDataEntries = new ArrayList<>(32);
for (MessageExt message : messages) {
- ConnectRecord sinkDataEntry = convertToSinkDataEntry(message);
+ this.retryWithToleranceOperator.consumerRecord(message);
+ ConnectRecord sinkDataEntry = this.retryWithToleranceOperator.execute(()->convertToSinkDataEntry(message), ErrorReporter.Stage.CONVERTER, WorkerSinkTask.class);
+ if (sinkDataEntry != null && !this.retryWithToleranceOperator.failed())
sinkDataEntries.add(sinkDataEntry);
String msgId = message.getMsgId();
log.info("Received one message success : msgId {}", msgId);
@@ -670,6 +684,13 @@ public class WorkerSinkTask implements WorkerTask {
this.sinkTaskContext.resetOffset(offsets);
}
+ /**
+ * error record reporter
+ * @return
+ */
+ public WorkerErrorRecordReporter errorRecordReporter() {
+ return errorRecordReporter;
+ }
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java
index b17cde3..e698d1d 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java
@@ -19,14 +19,17 @@
package org.apache.rocketmq.connect.runtime.connectorwrapper;
import com.alibaba.fastjson.JSON;
+import io.openmessaging.connector.api.component.task.sink.ErrorRecordReporter;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
+
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
@@ -65,6 +68,11 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
this.consumer = consumer;
}
+ @Override
+ public ErrorRecordReporter errorRecordReporter() {
+ return workerSinkTask.errorRecordReporter();
+ }
+
@Override
public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
if (null == recordPartition || null == recordPartition.getPartition() || null == recordOffset || null == recordOffset.getOffset()) {
@@ -172,15 +180,18 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
}
}
- @Override public Set<RecordPartition> assignment() {
+ @Override
+ public Set<RecordPartition> assignment() {
return this.workerSinkTask.getRecordPartitions();
}
- @Override public String getConnectorName() {
+ @Override
+ public String getConnectorName() {
return taskConfig.getString("connectorName");
}
- @Override public String getTaskName() {
+ @Override
+ public String getTaskName() {
return taskConfig.getString("taskId");
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 0de1f6f..2a2cb07 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -51,11 +51,13 @@ import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
import org.apache.rocketmq.connect.runtime.converter.RocketMQConverter;
+import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
import org.apache.rocketmq.connect.runtime.store.PositionStorageReaderImpl;
import org.apache.rocketmq.connect.runtime.store.PositionStorageWriter;
+import org.apache.rocketmq.connect.runtime.utils.Utils;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -119,6 +121,7 @@ public class WorkerSourceTask implements WorkerTask {
private TransformChain<ConnectRecord> transformChain;
+ private RetryWithToleranceOperator retryWithToleranceOperator;
/**
* The property of message in WHITE_KEY_SET don't need add a connect prefix
*/
@@ -138,7 +141,8 @@ public class WorkerSourceTask implements WorkerTask {
AtomicReference<WorkerState> workerState,
ConnectStatsManager connectStatsManager,
ConnectStatsService connectStatsService,
- TransformChain<ConnectRecord> transformChain) {
+ TransformChain<ConnectRecord> transformChain,
+ RetryWithToleranceOperator retryWithToleranceOperator) {
this.connectorName = connectorName;
this.sourceTask = sourceTask;
this.taskConfig = taskConfig;
@@ -151,6 +155,8 @@ public class WorkerSourceTask implements WorkerTask {
this.connectStatsManager = connectStatsManager;
this.connectStatsService = connectStatsService;
this.transformChain = transformChain;
+ this.retryWithToleranceOperator = retryWithToleranceOperator;
+ this.transformChain.retryWithToleranceOperator(this.retryWithToleranceOperator);
}
/**
@@ -243,8 +249,11 @@ public class WorkerSourceTask implements WorkerTask {
}
List<ConnectRecord> connectRecordList1 = new ArrayList<>(32);
for (ConnectRecord connectRecord : connectRecordList) {
+
+ retryWithToleranceOperator.sourceRecord(connectRecord);
+
ConnectRecord connectRecord1 = this.transformChain.doTransforms(connectRecord);
- if (null != connectRecord1) {
+ if (null != connectRecord1 && !retryWithToleranceOperator.failed()) {
connectRecordList1.add(connectRecord1);
}
}
@@ -258,11 +267,8 @@ public class WorkerSourceTask implements WorkerTask {
@Override
public void stop() {
state.compareAndSet(WorkerTaskState.RUNNING, WorkerTaskState.STOPPING);
- try {
- transformChain.close();
- } catch (Exception exception) {
- log.error("Transform close failed, {}", exception);
- }
+ Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
+ Utils.closeQuietly(transformChain, "transform chain");
log.warn("Stop a task success.");
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueConfig.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueConfig.java
new file mode 100644
index 0000000..0a27494
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueConfig.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.errors;
+
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+
+/**
+ * dead letter queue config
+ */
+public class DeadLetterQueueConfig {
+ /**
+ * dead letter queue config
+ */
+ public static final String DLQ_PREFIX = "errors.deadletterqueue.";
+ public static final String DLQ_TOPIC_NAME_CONFIG = DLQ_PREFIX + "topic.name";
+
+
+ public static final String DLQ_TOPIC_READ_QUEUE_NUMS = DLQ_PREFIX + "read.queue.nums";
+ public static final short DLQ_TOPIC_READ_QUEUE_NUMS_DEFAULT = 16;
+
+ public static final String DLQ_TOPIC_WRITE_QUEUE_NUMS = DLQ_PREFIX + "write.queue.nums";
+ public static final short DLQ_TOPIC_WRITE_QUEUE_NUMS_DEFAULT = 16;
+
+ public static final String DLQ_CONTEXT_HEADERS_ENABLE_CONFIG = DLQ_PREFIX + "context.headers.enable";
+ public static final boolean DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT = false;
+
+ public static final String ERRORS_LOG_INCLUDE_MESSAGES_CONFIG = "errors.log.include.messages";
+ public static final boolean ERRORS_LOG_INCLUDE_MESSAGES_DEFAULT = false;
+
+ public static final String ERRORS_LOG_ENABLE_CONFIG = "errors.log.enable";
+ public static final boolean ERRORS_LOG_ENABLE_DEFAULT = false;
+
+
+ public static final String ERRORS_RETRY_TIMEOUT_CONFIG = "errors.retry.timeout";
+ public static final int ERRORS_RETRY_TIMEOUT_DEFAULT = 0;
+
+ public static final String ERRORS_RETRY_MAX_DELAY_CONFIG = "errors.retry.delay.max.ms";
+ public static final String ERRORS_RETRY_MAX_DELAY_DISPLAY = "Maximum Delay Between Retries for Errors";
+ public static final int ERRORS_RETRY_MAX_DELAY_DEFAULT = 60000;
+
+ public static final String ERRORS_TOLERANCE_CONFIG = "errors.tolerance";
+ public static final ToleranceType ERRORS_TOLERANCE_DEFAULT = ToleranceType.NONE;
+
+ private ConnectKeyValue config;
+
+ /**
+ * config
+ *
+ * @param config
+ */
+ public DeadLetterQueueConfig(ConnectKeyValue config) {
+ this.config = config;
+ }
+
+
+ /**
+ * get dlq topic name
+ *
+ * @return
+ */
+ public String dlqTopicName() {
+ return config.getString(DLQ_TOPIC_NAME_CONFIG, "");
+ }
+
+
+ /**
+ * get dlq context headers enabled
+ *
+ * @return
+ */
+ public Boolean isDlqContextHeadersEnabled() {
+ return config.getProperties().containsKey(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG) ?
+ Boolean.getBoolean(config.getProperties().get(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG)) : DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT;
+ }
+
+
+ /**
+ * get dlq topic read queue nums
+ *
+ * @return
+ */
+ public Integer dlqTopicReadQueueNums() {
+ return config.getInt(DLQ_TOPIC_READ_QUEUE_NUMS, DLQ_TOPIC_READ_QUEUE_NUMS_DEFAULT);
+
+ }
+
+ /**
+ * get dlq topic write queue nums
+ *
+ * @return
+ */
+ public Integer dlqTopicWriteQueueNums() {
+ return config.getInt(DLQ_TOPIC_WRITE_QUEUE_NUMS, DLQ_TOPIC_WRITE_QUEUE_NUMS_DEFAULT);
+ }
+
+ /**
+ * include error log
+ *
+ * @return
+ */
+ public boolean includeRecordDetailsInErrorLog() {
+ return config.getProperties().containsKey(ERRORS_LOG_INCLUDE_MESSAGES_CONFIG) ?
+ Boolean.getBoolean(config.getProperties().get(ERRORS_LOG_INCLUDE_MESSAGES_CONFIG)) : ERRORS_LOG_INCLUDE_MESSAGES_DEFAULT;
+ }
+
+
+ public boolean enableErrantRecordReporter() {
+ String dqlTopic = dlqTopicName();
+ boolean enableErrorLog = enableErrorLog();
+ boolean enableDqlTopic = !dqlTopic.isEmpty();
+ return enableErrorLog || enableDqlTopic;
+ }
+
+ private boolean enableErrorLog() {
+ return config.getProperties().containsKey(ERRORS_LOG_ENABLE_CONFIG) ?
+ Boolean.getBoolean(config.getProperties().get(ERRORS_LOG_ENABLE_CONFIG)) : ERRORS_LOG_ENABLE_DEFAULT;
+
+ }
+
+ public long errorRetryTimeout() {
+ return config.getLong(ERRORS_RETRY_TIMEOUT_CONFIG);
+ }
+
+ public long errorMaxDelayInMillis() {
+ return config.getLong(ERRORS_RETRY_MAX_DELAY_CONFIG);
+ }
+
+ public ToleranceType errorToleranceType() {
+ String tolerance = config.getString(ERRORS_TOLERANCE_CONFIG);
+ for (ToleranceType type : ToleranceType.values()) {
+ if (type.name().equalsIgnoreCase(tolerance)) {
+ return type;
+ }
+ }
+ return ERRORS_TOLERANCE_DEFAULT;
+ }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporter.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporter.java
new file mode 100644
index 0000000..cf8967b
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.errors;
+
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Map;
+import java.util.Objects;
+
+
+/**
+ * Write the original consumed record into a dead letter queue
+ */
+public class DeadLetterQueueReporter implements ErrorReporter {
+
+ private static final Logger log = LoggerFactory.getLogger(DeadLetterQueueReporter.class);
+
+
+ public static final String HEADER_PREFIX = "__connect.errors.";
+ public static final String ERROR_HEADER_ORIG_TOPIC = HEADER_PREFIX + "topic";
+ public static final String ERROR_HEADER_ORIG_PARTITION = HEADER_PREFIX + "partition";
+ public static final String ERROR_HEADER_ORIG_OFFSET = HEADER_PREFIX + "offset";
+ public static final String ERROR_HEADER_CONNECTOR_NAME = HEADER_PREFIX + "connector.name";
+ public static final String ERROR_HEADER_CLUSTER_ID = HEADER_PREFIX + "cluster.id";
+ public static final String ERROR_HEADER_STAGE = HEADER_PREFIX + "stage";
+ public static final String ERROR_HEADER_EXECUTING_CLASS = HEADER_PREFIX + "class.name";
+ public static final String ERROR_HEADER_EXCEPTION = HEADER_PREFIX + "exception.class.name";
+ public static final String ERROR_HEADER_EXCEPTION_MESSAGE = HEADER_PREFIX + "exception.message";
+ public static final String ERROR_HEADER_EXCEPTION_STACK_TRACE = HEADER_PREFIX + "exception.stacktrace";
+
+
+ /**
+ * The configs of current source task.
+ */
+ private ConnectKeyValue config;
+
+ /**
+ * A RocketMQ producer to send message to dest MQ.
+ */
+ private DefaultMQProducer producer;
+
+ /**
+ * worker id
+ */
+ private String workerId;
+
+ /**
+ * config
+ */
+ private final DeadLetterQueueConfig deadLetterQueueConfig;
+
+ private String connectorName;
+
+
+ public void workerId(String workerId) {
+ this.workerId = workerId;
+ }
+
+
+ /**
+ * build reporter
+ *
+ * @param connectorName
+ * @param sinkConfig
+ * @param workerConfig
+ * @return
+ */
+ public static DeadLetterQueueReporter build(String connectorName,
+ ConnectKeyValue sinkConfig,
+ ConnectConfig workerConfig) {
+
+ DeadLetterQueueConfig deadLetterQueueConfig = new DeadLetterQueueConfig(sinkConfig);
+ String dlqTopic = deadLetterQueueConfig.dlqTopicName();
+ if (dlqTopic.isEmpty()) {
+ return null;
+ }
+ if (!ConnectUtil.isTopicExist(workerConfig, dlqTopic)) {
+ TopicConfig topicConfig = new TopicConfig(dlqTopic);
+ topicConfig.setReadQueueNums(deadLetterQueueConfig.dlqTopicReadQueueNums());
+ topicConfig.setWriteQueueNums(deadLetterQueueConfig.dlqTopicWriteQueueNums());
+ ConnectUtil.createTopic(workerConfig, topicConfig);
+ }
+ DefaultMQProducer dlqProducer = ConnectUtil.initDefaultMQProducer(workerConfig);
+ return new DeadLetterQueueReporter(dlqProducer, sinkConfig, connectorName);
+ }
+
+ /**
+ * Initialize the dead letter queue reporter with a producer
+ *
+ * @param producer
+ * @param connConfig
+ * @param connectorName
+ */
+ DeadLetterQueueReporter(DefaultMQProducer producer,
+ ConnectKeyValue connConfig,
+ String connectorName) {
+ Objects.requireNonNull(producer);
+ Objects.requireNonNull(connConfig);
+ Objects.requireNonNull(connectorName);
+ this.producer = producer;
+ this.config = connConfig;
+ this.connectorName = connectorName;
+ this.deadLetterQueueConfig = new DeadLetterQueueConfig(connConfig);
+ }
+
+ /**
+ * Write the raw records into a topic
+ */
+ @Override
+ public void report(ProcessingContext context) {
+ if (this.deadLetterQueueConfig.dlqTopicName().trim().isEmpty()) {
+ return;
+ }
+ MessageExt originalMessage = context.consumerRecord();
+ if (originalMessage == null) {
+ return;
+ }
+
+ Message producerRecord = new Message();
+ producerRecord.setTopic(deadLetterQueueConfig.dlqTopicName());
+ producerRecord.setBody(originalMessage.getBody());
+
+ if (deadLetterQueueConfig.isDlqContextHeadersEnabled()) {
+ populateContextHeaders(originalMessage, context);
+ }
+
+ try {
+ producer.send(originalMessage, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult result) {
+ log.info("Successful send error message to RocketMQ:{}, Topic {}", result.getMsgId(), result.getMessageQueue().getTopic());
+ }
+
+ @Override
+ public void onException(Throwable throwable) {
+ log.error("Source task send record failed ,error msg {}. message {}", throwable.getMessage(), JSON.toJSONString(originalMessage), throwable);
+ }
+ });
+ } catch (MQClientException e) {
+ log.error("Send message MQClientException. message: {}, error info: {}.", producerRecord, e);
+ } catch (RemotingException e) {
+ log.error("Send message RemotingException. message: {}, error info: {}.", producerRecord, e);
+ } catch (InterruptedException e) {
+ log.error("Send message InterruptedException. message: {}, error info: {}.", producerRecord, e);
+ throw new ConnectException(e);
+ }
+ }
+
+
+ /**
+ * pop context property
+ *
+ * @param producerRecord
+ * @param context
+ */
+ void populateContextHeaders(Message producerRecord, ProcessingContext context) {
+ Map<String, String> headers = producerRecord.getProperties();
+ if (context.consumerRecord() != null) {
+ producerRecord.putUserProperty(ERROR_HEADER_ORIG_TOPIC, context.consumerRecord().getTopic());
+ producerRecord.putUserProperty(ERROR_HEADER_ORIG_PARTITION, String.valueOf(context.consumerRecord().getQueueId()));
+ producerRecord.putUserProperty(ERROR_HEADER_ORIG_OFFSET, String.valueOf(context.consumerRecord().getQueueOffset()));
+ }
+ if (workerId != null) {
+ producerRecord.putUserProperty(ERROR_HEADER_CLUSTER_ID, workerId);
+ }
+ producerRecord.putUserProperty(ERROR_HEADER_STAGE, context.stage().name());
+ producerRecord.putUserProperty(ERROR_HEADER_EXECUTING_CLASS, context.executingClass().getName());
+ producerRecord.putUserProperty(ERROR_HEADER_CONNECTOR_NAME, connectorName);
+ if (context.error() != null) {
+ Throwable error = context.error();
+ headers.put(ERROR_HEADER_EXCEPTION, error.getClass().getName());
+ headers.put(ERROR_HEADER_EXCEPTION_MESSAGE, error.getMessage());
+ byte[] trace;
+ if ((trace = stacktrace(context.error())) != null) {
+ headers.put(ERROR_HEADER_EXCEPTION_STACK_TRACE, new String(trace));
+ }
+ }
+ }
+
+ private byte[] stacktrace(Throwable error) {
+ if (error == null) {
+ return null;
+ }
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try {
+ PrintStream stream = new PrintStream(bos, true, "UTF-8");
+ error.printStackTrace(stream);
+ bos.close();
+ return bos.toByteArray();
+ } catch (IOException e) {
+ log.error("Could not serialize stacktrace.", e);
+ }
+ return null;
+ }
+
+ @Override
+ public void close() {
+ if (producer != null) {
+ producer.shutdown();
+ }
+ }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ErrorReporter.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ErrorReporter.java
new file mode 100644
index 0000000..2125fc6
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ErrorReporter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.errors;
+
+
+/**
+ * error reporter interface
+ */
+public interface ErrorReporter extends AutoCloseable {
+
+ /**
+ * report message
+ *
+ * @param context
+ */
+ void report(ProcessingContext context);
+
+ /**
+ * close reporters
+ */
+ @Override
+ default void close() {
+ }
+
+
+ /**
+ * A logical stage in a Connect pipeline.
+ */
+ enum Stage {
+
+ /**
+ * source poll
+ */
+ TASK_POLL,
+
+ /**
+ * sink put
+ */
+ TASK_PUT,
+
+ /**
+ * transform record
+ */
+ TRANSFORMATION,
+
+ /**
+ * converter record
+ */
+ CONVERTER,
+
+ /**
+ * When producing to rocketmq topic
+ */
+ PRODUCE,
+
+ /**
+ * When consuming from a rocketmq topic
+ */
+ CONSUME
+ }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/LogReporter.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/LogReporter.java
new file mode 100644
index 0000000..2f4113b
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/LogReporter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.errors;
+
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+
+/**
+ * Writes errors and their context to application logs.
+ */
+public class LogReporter implements ErrorReporter {
+
+ private static final Logger log = LoggerFactory.getLogger(LogReporter.class);
+
+ private final String connectName;
+ private final DeadLetterQueueConfig deadLetterQueueConfig;
+
+ public LogReporter(String connectorName,
+ ConnectKeyValue sinkConfig) {
+ Objects.requireNonNull(connectorName);
+ Objects.requireNonNull(sinkConfig);
+
+ this.connectName = connectorName;
+ this.deadLetterQueueConfig = new DeadLetterQueueConfig(sinkConfig);
+ }
+
+ /**
+ * Log error context.
+ *
+ * @param context the processing context.
+ */
+ @Override
+ public void report(ProcessingContext context) {
+ log.error(message(context), context.error());
+ }
+
+ /**
+ * format error message
+ *
+ * @param context
+ * @return
+ */
+ String message(ProcessingContext context) {
+ return String.format("Error encountered in task %s. %s", String.valueOf(connectName),
+ context.toString(deadLetterQueueConfig.includeRecordDetailsInErrorLog()));
+ }
+
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/Operation.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/Operation.java
new file mode 100644
index 0000000..ac28b3a
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/Operation.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.errors;
+
+import java.util.concurrent.Callable;
+
+/**
+ * A recoverable operation evaluated in the connector pipeline.
+ *
+ * @param <V> return type of the result of the operation.
+ */
+public interface Operation<V> extends Callable<V> {
+
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ProcessingContext.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ProcessingContext.java
new file mode 100644
index 0000000..644f024
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ProcessingContext.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.errors;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+
+/**
+ * contains all info current time
+ */
+class ProcessingContext implements AutoCloseable {
+
+ /**
+ * reporters
+ */
+ private Collection<ErrorReporter> reporters = Collections.emptyList();
+
+ /**
+ * send message
+ */
+ private MessageExt consumedMessage;
+
+ /**
+ * original source record
+ */
+ private ConnectRecord sourceRecord;
+
+ /**
+ * stage
+ */
+ private ErrorReporter.Stage stage;
+ private Class<?> klass;
+
+ /**
+ * attempt
+ */
+ private int attempt;
+ /**
+ * error message
+ */
+ private Throwable error;
+
+ /**
+ * reset info
+ */
+ private void reset() {
+ attempt = 0;
+ stage = null;
+ klass = null;
+ error = null;
+ }
+
+ /**
+ * Set the record consumed from Kafka in a sink connector.
+ *
+ * @param consumedMessage the record
+ */
+ public void consumerRecord(MessageExt consumedMessage) {
+ this.consumedMessage = consumedMessage;
+ reset();
+ }
+
+ /**
+ * @return the record consumed from Kafka. could be null
+ */
+ public MessageExt consumerRecord() {
+ return consumedMessage;
+ }
+
+ /**
+ * @return the source record being processed.
+ */
+ public ConnectRecord sourceRecord() {
+ return sourceRecord;
+ }
+
+ /**
+ * Set the source record being processed in the connect pipeline.
+ *
+ * @param record the source record
+ */
+ public void sourceRecord(ConnectRecord record) {
+ this.sourceRecord = record;
+ reset();
+ }
+
+ /**
+ * Set the stage in the connector pipeline which is currently executing.
+ *
+ * @param stage the stage
+ */
+ public void stage(ErrorReporter.Stage stage) {
+ this.stage = stage;
+ }
+
+ /**
+ * @return the stage in the connector pipeline which is currently executing.
+ */
+ public ErrorReporter.Stage stage() {
+ return stage;
+ }
+
+ /**
+ * @return the class which is going to execute the current operation.
+ */
+ public Class<?> executingClass() {
+ return klass;
+ }
+
+ /**
+ * @param klass set the class which is currently executing.
+ */
+ public void executingClass(Class<?> klass) {
+ this.klass = klass;
+ }
+
+ /**
+ * A helper method to set both the stage and the class.
+ *
+ * @param stage the stage
+ * @param klass the class which will execute the operation in this stage.
+ */
+ public void currentContext(ErrorReporter.Stage stage, Class<?> klass) {
+ stage(stage);
+ executingClass(klass);
+ }
+
+
+ /**
+ * report errors
+ */
+ public void report() {
+ if (reporters.size() == 1) {
+ reporters.iterator().next().report(this);
+ }
+ reporters.stream().forEach(r -> r.report(this));
+ }
+
+
+ /**
+ * @param attempt the number of attempts made to execute the current operation.
+ */
+ public void attempt(int attempt) {
+ this.attempt = attempt;
+ }
+
+ public int attempt() {
+ return attempt;
+ }
+
+
+ public Throwable error() {
+ return error;
+ }
+
+ /**
+ * set error
+ *
+ * @param error
+ */
+ public void error(Throwable error) {
+ this.error = error;
+ }
+
+
+ /**
+ * @return
+ */
+ public boolean failed() {
+ return error() != null;
+ }
+
+
+ /**
+ * set reporters
+ *
+ * @param reporters
+ */
+ public void reporters(Collection<ErrorReporter> reporters) {
+ Objects.requireNonNull(reporters);
+ this.reporters = reporters;
+ }
+
+ @Override
+ public void close() {
+ ConnectException e = null;
+ for (ErrorReporter reporter : reporters) {
+ try {
+ reporter.close();
+ } catch (Throwable t) {
+ e = e != null ? e : new ConnectException("Failed to close all reporters");
+ e.addSuppressed(t);
+ }
+ }
+ if (e != null) {
+ throw e;
+ }
+ }
+
+ public String toString(boolean includeMessage) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("Executing stage '");
+ builder.append(stage().name());
+ builder.append("' with class '");
+ builder.append(executingClass() == null ? "null" : executingClass().getName());
+ builder.append('\'');
+ if (includeMessage && sourceRecord() != null) {
+ builder.append(", where source record is = ");
+ builder.append(sourceRecord());
+ } else if (includeMessage && consumerRecord() != null) {
+ MessageExt msg = consumerRecord();
+ builder.append(", where consumed record is ");
+ builder.append("{topic='").append(consumedMessage.getTopic()).append('\'');
+ builder.append(", partition=").append(msg.getQueueId());
+ builder.append(", offset=").append(msg.getQueueOffset());
+ builder.append(", bornTimestamp=").append(msg.getBornTimestamp());
+ builder.append(", storeTimestamp=").append(msg.getStoreTimestamp());
+ builder.append("}");
+ }
+ builder.append('.');
+ return builder.toString();
+ }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ReporterManagerUtil.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ReporterManagerUtil.java
new file mode 100644
index 0000000..6c4e30a
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ReporterManagerUtil.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.errors;
+
+import io.openmessaging.connector.api.data.Converter;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * reporter manage util
+ */
+public class ReporterManagerUtil {
+
+ /**
+ * create retry operator
+ *
+ * @param connConfig
+ * @return
+ */
+ public static RetryWithToleranceOperator createRetryWithToleranceOperator(ConnectKeyValue connConfig) {
+ DeadLetterQueueConfig deadLetterQueueConfig = new DeadLetterQueueConfig(connConfig);
+ return new RetryWithToleranceOperator(
+ deadLetterQueueConfig.errorRetryTimeout(),
+ deadLetterQueueConfig.errorMaxDelayInMillis(),
+ deadLetterQueueConfig.errorToleranceType()
+ );
+ }
+
+ /**
+ * create worker error record reporter
+ *
+ * @param connConfig
+ * @param retryWithToleranceOperator
+ * @param converter
+ * @return
+ */
+ public static WorkerErrorRecordReporter createWorkerErrorRecordReporter(
+ ConnectKeyValue connConfig,
+ RetryWithToleranceOperator retryWithToleranceOperator,
+ Converter converter) {
+ DeadLetterQueueConfig deadLetterQueueConfig = new DeadLetterQueueConfig(connConfig);
+ if (deadLetterQueueConfig.enableErrantRecordReporter()) {
+ return new WorkerErrorRecordReporter(retryWithToleranceOperator, converter);
+ }
+ return null;
+ }
+
+ /**
+ * build sink task reporter
+ *
+ * @param connectName
+ * @param connConfig
+ * @param workerConfig
+ * @return
+ */
+ public static List<ErrorReporter> sinkTaskReporters(String connectName,
+ ConnectKeyValue connConfig,
+ ConnectConfig workerConfig) {
+ // ensure reporter order
+ ArrayList<ErrorReporter> reporters = new ArrayList<>();
+ LogReporter logReporter = new LogReporter(connectName, connConfig);
+ reporters.add(logReporter);
+
+ // dead letter queue reporter
+ DeadLetterQueueReporter reporter = DeadLetterQueueReporter.build(connectName, connConfig, workerConfig);
+ if (reporter != null) {
+ reporters.add(reporter);
+ }
+ return reporters;
+ }
+
+ /**
+ * build source task reporter
+ *
+ * @param connectName
+ * @param connConfig
+ * @return
+ */
+ public static List<ErrorReporter> sourceTaskReporters(String connectName,
+ ConnectKeyValue connConfig) {
+ List<ErrorReporter> reporters = new ArrayList<>();
+ LogReporter logReporter = new LogReporter(connectName, connConfig);
+ reporters.add(logReporter);
+
+ return reporters;
+ }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/RetryWithToleranceOperator.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/RetryWithToleranceOperator.java
new file mode 100644
index 0000000..b0fa283
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/RetryWithToleranceOperator.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.errors;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import io.openmessaging.connector.api.errors.RetriableException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.connect.runtime.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * retry operator
+ */
+public class RetryWithToleranceOperator implements AutoCloseable {
+
+ private static final Logger log = LoggerFactory.getLogger(RetryWithToleranceOperator.class);
+
+ public static final long RETRIES_DELAY_MIN_MS = 300;
+
+ private static final Map<ErrorReporter.Stage, Class<? extends Exception>> TOLERABLE_EXCEPTIONS = new HashMap<>();
+
+ static {
+ TOLERABLE_EXCEPTIONS.put(ErrorReporter.Stage.TRANSFORMATION, Exception.class);
+ TOLERABLE_EXCEPTIONS.put(ErrorReporter.Stage.CONVERTER, Exception.class);
+ }
+
+ private final long retryTimeout;
+ private final long maxDelayInMillis;
+ private final ToleranceType toleranceType;
+
+ private long totalFailures = 0;
+ protected ProcessingContext context = new ProcessingContext();
+
+ public RetryWithToleranceOperator(long errorRetryTimeout,
+ long maxDelayInMillis,
+ ToleranceType toleranceType) {
+ this.retryTimeout = errorRetryTimeout;
+ this.maxDelayInMillis = maxDelayInMillis;
+ this.toleranceType = toleranceType;
+ }
+
+ public void executeFailed(ErrorReporter.Stage stage,
+ Class<?> executingClass,
+ MessageExt consumerRecord,
+ Throwable error) {
+
+ markAsFailed();
+ context.consumerRecord(consumerRecord);
+ context.currentContext(stage, executingClass);
+ context.error(error);
+ context.report();
+ if (!withinToleranceLimits()) {
+ throw new ConnectException("Tolerance exceeded in error handler", error);
+ }
+ }
+
+ /**
+ * Execute the recoverable operation. If the operation is already in a failed state, then simply return
+ * with the existing failure.
+ */
+ public <V> V execute(Operation<V> operation, ErrorReporter.Stage stage, Class<?> executingClass) {
+ context.currentContext(stage, executingClass);
+ if (context.failed()) {
+ log.debug("ProcessingContext is already in failed state. Ignoring requested operation.");
+ return null;
+ }
+
+ try {
+ Class<? extends Exception> ex = TOLERABLE_EXCEPTIONS.getOrDefault(context.stage(), RetriableException.class);
+ return execAndHandleError(operation, ex);
+ } finally {
+ if (context.failed()) {
+ context.report();
+ }
+ }
+ }
+
+ /**
+ * Attempt to execute an operation.
+ */
+ protected <V> V execAndRetry(Operation<V> operation) throws Exception {
+ int attempt = 0;
+ long startTime = System.currentTimeMillis();
+ long deadline = startTime + retryTimeout;
+ do {
+ try {
+ attempt++;
+ return operation.call();
+ } catch (RetriableException e) {
+ log.trace("Caught a retriable exception while executing {} operation with {}", context.stage(), context.executingClass());
+ if (checkRetry(startTime)) {
+ backoff(attempt, deadline);
+ if (Thread.currentThread().isInterrupted()) {
+ log.trace("Thread was interrupted. Marking operation as failed.");
+ context.error(e);
+ return null;
+ }
+ } else {
+ log.trace("Can't retry. start={}, attempt={}, deadline={}", startTime, attempt, deadline);
+ context.error(e);
+ return null;
+ }
+ } finally {
+ context.attempt(attempt);
+ }
+ } while (true);
+ }
+
+ /**
+ * Execute a given operation multiple times (if needed), and tolerate certain exceptions.
+ */
+ protected <V> V execAndHandleError(Operation<V> operation, Class<? extends Exception> tolerated) {
+ try {
+ V result = execAndRetry(operation);
+ if (context.failed()) {
+ markAsFailed();
+ }
+ return result;
+ } catch (Exception e) {
+ markAsFailed();
+ context.error(e);
+
+ if (!tolerated.isAssignableFrom(e.getClass())) {
+ throw new ConnectException("Unhandled exception in error handler", e);
+ }
+
+ if (!withinToleranceLimits()) {
+ throw new ConnectException("Tolerance exceeded in error handler", e);
+ }
+ return null;
+ }
+ }
+
+ private void markAsFailed() {
+ totalFailures++;
+ }
+
+ public boolean withinToleranceLimits() {
+ switch (toleranceType) {
+ case NONE:
+ if (totalFailures > 0) {
+ return false;
+ }
+ case ALL:
+ return true;
+ default:
+ throw new ConnectException("Unknown tolerance type: " + toleranceType);
+ }
+ }
+
+ boolean checkRetry(long startTime) {
+ return (System.currentTimeMillis() - startTime) < retryTimeout;
+ }
+
+ void backoff(int attempt, long deadline) {
+ int numRetry = attempt - 1;
+ long delay = RETRIES_DELAY_MIN_MS << numRetry;
+ if (delay > maxDelayInMillis) {
+ delay = ThreadLocalRandom.current().nextLong(maxDelayInMillis);
+ }
+ if (delay + System.currentTimeMillis() > deadline) {
+ delay = deadline - System.currentTimeMillis();
+ }
+ log.debug("Sleeping for {} millis", delay);
+ Utils.sleep(delay);
+ }
+
+ /**
+ * Set the error reporters for this connector.
+ *
+ * @param reporters the error reporters (should not be null).
+ */
+ public void reporters(List<ErrorReporter> reporters) {
+ this.context.reporters(reporters);
+ }
+
+ /**
+ * Set the source record being processed in the connect pipeline.
+ *
+ * @param preTransformRecord the source record
+ */
+ public void sourceRecord(ConnectRecord preTransformRecord) {
+ this.context.sourceRecord(preTransformRecord);
+ }
+
+
+ /**
+ * Set the record consumed rocketmq in a sink
+ *
+ * @param messageExt
+ */
+ public void consumerRecord(MessageExt messageExt) {
+ this.context.consumerRecord(messageExt);
+ }
+
+ /**
+ * failed
+ *
+ * @return
+ */
+ public boolean failed() {
+ return this.context.failed();
+ }
+
+ /**
+ * error
+ *
+ * @return
+ */
+ public Throwable error() {
+ return this.context.error();
+ }
+
+
+ @Override
+ public void close() {
+ this.context.close();
+ }
+
+
+ @Override
+ public String toString() {
+ return "RetryWithToleranceOperator{" +
+ "retryTimeout=" + retryTimeout +
+ ", errorMaxDelayInMillis=" + maxDelayInMillis +
+ ", errorToleranceType=" + toleranceType +
+ ", totalFailures=" + totalFailures +
+ ", context=" + context +
+ '}';
+ }
+
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ToleranceType.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ToleranceType.java
new file mode 100644
index 0000000..53c3d65
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ToleranceType.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.errors;
+
+import java.util.Locale;
+
+/**
+ * The different levels of error tolerance.
+ */
+public enum ToleranceType {
+
+ /**
+ * Tolerate no errors.
+ */
+ NONE,
+
+ /**
+ * Tolerate all errors.
+ */
+ ALL;
+
+ public String value() {
+ return name().toLowerCase(Locale.ROOT);
+ }
+
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/WorkerErrorRecordReporter.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/WorkerErrorRecordReporter.java
new file mode 100644
index 0000000..59533aa
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/WorkerErrorRecordReporter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.errors;
+
+import io.openmessaging.connector.api.component.task.sink.ErrorRecordReporter;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Converter;
+import org.apache.rocketmq.common.message.MessageExt;
+
+/**
+ * worker error record reporter
+ */
+public class WorkerErrorRecordReporter implements ErrorRecordReporter {
+
+ private RetryWithToleranceOperator retryWithToleranceOperator;
+ private Converter converter;
+
+ public WorkerErrorRecordReporter(RetryWithToleranceOperator retryWithToleranceOperator,
+ Converter converter) {
+ this.retryWithToleranceOperator = retryWithToleranceOperator;
+ this.converter = converter;
+ }
+
+ /**
+ * report record
+ *
+ * @param record
+ * @param error
+ * @return
+ */
+ @Override
+ public void report(ConnectRecord record, Throwable error) {
+ byte[] value = converter.objectToByte(record.getData());
+ MessageExt consumerRecord = new MessageExt();
+ consumerRecord.setBody(value);
+ retryWithToleranceOperator.executeFailed(ErrorReporter.Stage.TASK_PUT, SinkTask.class, consumerRecord, error);
+ }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
index b7f9c5f..5f7ad28 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
@@ -288,6 +288,24 @@ public class ConnectUtil {
return recordPartition;
}
+ public static DefaultMQPullConsumer initDefaultMQPullConsumer(ConnectConfig connectConfig, String connectorName, ConnectKeyValue keyValue, Integer taskId) {
+ RPCHook rpcHook = null;
+ if (connectConfig.getAclEnable()) {
+ rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
+ }
+ DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(rpcHook);
+ consumer.setInstanceName(createInstance(connectorName.concat("-").concat(taskId.toString())));
+ String taskGroupId = keyValue.getString("task-group-id");
+ if (StringUtils.isNotBlank(taskGroupId)) {
+ consumer.setConsumerGroup(taskGroupId);
+ } else {
+ consumer.setConsumerGroup(SYS_TASK_CG_PREFIX + connectorName);
+ }
+ if (StringUtils.isNotBlank(connectConfig.getNamesrvAddr())) {
+ consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
+ }
+ return consumer;
+ }
public static DefaultMQPullConsumer initDefaultMQPullConsumer(ConnectConfig connectConfig, String connectorName, ConnectKeyValue keyValue) {
RPCHook rpcHook = null;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Utils.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Utils.java
new file mode 100644
index 0000000..60f1b5b
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Utils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * common utils
+ */
+public class Utils {
+
+ private static final Logger log = LoggerFactory.getLogger(Utils.class);
+ /**
+ * Sleep for a bit
+ * @param ms The duration of the sleep
+ */
+ public static void sleep(long ms) {
+ try {
+ Thread.sleep(ms);
+ } catch (InterruptedException e) {
+ // this is okay, we just wake up early
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Closes {@code closeable} and if an exception is thrown, it is logged at the WARN level.
+ */
+ public static void closeQuietly(AutoCloseable closeable, String name) {
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch (Throwable t) {
+ log.warn("Failed to close {} with type {}", name, closeable.getClass().getName(), t);
+ }
+ }
+ }
+}
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
index 3a04098..0c98f12 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
@@ -38,6 +38,8 @@ import org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConnect
import org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConverter;
import org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestPositionManageServiceImpl;
import org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestSourceTask;
+import org.apache.rocketmq.connect.runtime.errors.ReporterManagerUtil;
+import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
@@ -110,6 +112,11 @@ public class WorkerTest {
ConnectKeyValue connectKeyValue = new ConnectKeyValue();
connectKeyValue.getProperties().put("key1", "TEST-TASK-" + i + "1");
connectKeyValue.getProperties().put("key2", "TEST-TASK-" + i + "2");
+
+ // create retry operator
+ RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(connectKeyValue);
+ retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters("TEST-CONN-" + i, connectKeyValue));
+
runnables.add(new WorkerSourceTask("TEST-CONN-" + i,
new TestSourceTask(),
connectKeyValue,
@@ -118,7 +125,8 @@ public class WorkerTest {
producer,
new AtomicReference(WorkerState.STARTED),
connectStatsManager, connectStatsService,
- transformChain));
+ transformChain,
+ retryWithToleranceOperator));
}
worker.setWorkingTasks(runnables);
assertThat(worker.getWorkingTasks().size()).isEqualTo(3);
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
index b452e6c..509f609 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
@@ -49,6 +49,8 @@ import org.apache.rocketmq.connect.runtime.connectorwrapper.Worker;
import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerConnector;
import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSourceTask;
import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerState;
+import org.apache.rocketmq.connect.runtime.errors.ReporterManagerUtil;
+import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
import org.apache.rocketmq.connect.runtime.service.DefaultConnectorContext;
@@ -206,8 +208,17 @@ public class RestHandlerTest {
}
};
TransformChain<ConnectRecord> transformChain = new TransformChain<ConnectRecord>(new DefaultKeyValue(), new Plugin(new ArrayList<>()));
- WorkerSourceTask workerSourceTask1 = new WorkerSourceTask("testConnectorName1", sourceTask, connectKeyValue, positionManagementServiceImpl, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain);
- WorkerSourceTask workerSourceTask2 = new WorkerSourceTask("testConnectorName2", sourceTask, connectKeyValue1, positionManagementServiceImpl, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain);
+ // create retry operator
+ RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(connectKeyValue);
+ retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters("testConnectorName1", connectKeyValue));
+
+ WorkerSourceTask workerSourceTask1 = new WorkerSourceTask("testConnectorName1", sourceTask, connectKeyValue, positionManagementServiceImpl, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain,retryWithToleranceOperator);
+
+ // create retry operator
+ RetryWithToleranceOperator retryWithToleranceOperator02 = ReporterManagerUtil.createRetryWithToleranceOperator(connectKeyValue);
+ retryWithToleranceOperator02.reporters(ReporterManagerUtil.sourceTaskReporters("testConnectorName2", connectKeyValue));
+
+ WorkerSourceTask workerSourceTask2 = new WorkerSourceTask("testConnectorName2", sourceTask, connectKeyValue1, positionManagementServiceImpl, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain,retryWithToleranceOperator02);
workerTasks = new HashSet<Runnable>() {
{
add(workerSourceTask1);