You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/07/29 08:27:41 UTC
[rocketmq-connect] branch master updated: [ISSUE #218]The same class conversion fails when the rocketmq client is included in the connector implementation
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 920717d [ISSUE #218]The same class conversion fails when the rocketmq client is included in the connector implementation
920717d is described below
commit 920717ddff6aba686fca1103ab8044980cedb445
Author: zhoubo <87...@qq.com>
AuthorDate: Fri Jul 29 16:27:36 2022 +0800
[ISSUE #218]The same class conversion fails when the rocketmq client is included in the connector implementation
---
.../connect/runtime/connectorwrapper/Worker.java | 50 +++++-----
.../runtime/connectorwrapper/WorkerSourceTask.java | 109 ++++++++++-----------
.../runtime/controller/isolation/PluginUtils.java | 1 +
3 files changed, 75 insertions(+), 85 deletions(-)
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index cbfd271..ca34ca7 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -120,9 +120,9 @@ public class Worker {
private final ConnectStatsService connectStatsService;
public Worker(ConnectConfig workerConfig,
- PositionManagementService positionManagementService,
- ConfigManagementService configManagementService,
- Plugin plugin, AbstractConnectController connectController) {
+ PositionManagementService positionManagementService,
+ ConfigManagementService configManagementService,
+ Plugin plugin, AbstractConnectController connectController) {
this.workerConfig = workerConfig;
this.taskExecutor = Executors.newCachedThreadPool(new DefaultThreadFactory("task-Worker-Executor-"));
this.positionManagementService = positionManagementService;
@@ -146,7 +146,7 @@ public class Worker {
* @throws Exception
*/
public synchronized void startConnectors(Map<String, ConnectKeyValue> connectorConfigs,
- AbstractConnectController connectController) throws Exception {
+ AbstractConnectController connectController) throws Exception {
Set<WorkerConnector> stoppedConnector = new HashSet<>();
for (WorkerConnector workerConnector : workingConnectors) {
try {
@@ -258,13 +258,12 @@ public class Worker {
}
-
public Set<WorkerConnector> getWorkingConnectors() {
return workingConnectors;
}
public void setWorkingConnectors(
- Set<WorkerConnector> workingConnectors) {
+ Set<WorkerConnector> workingConnectors) {
this.workingConnectors = workingConnectors;
}
@@ -378,7 +377,7 @@ public class Worker {
break;
default:
log.error("[BUG] Illegal State in when checking running tasks, {} is in {} state",
- ((WorkerTask) runnable).id().connector(), state);
+ ((WorkerTask) runnable).id().connector(), state);
break;
}
}
@@ -499,7 +498,7 @@ public class Worker {
break;
default:
log.error("[BUG] Illegal State in when checking stopping tasks, {} is in {} state",
- ((WorkerTask) runnable).id().connector(), state.toString());
+ ((WorkerTask) runnable).id().connector(), state.toString());
}
}
}
@@ -531,7 +530,7 @@ public class Worker {
break;
default:
log.error("[BUG] Illegal State in when checking pending tasks, {} is in {} state",
- ((WorkerTask) runnable).id().connector(), state.toString());
+ ((WorkerTask) runnable).id().connector(), state.toString());
break;
}
}
@@ -554,7 +553,6 @@ public class Worker {
continue;
}
-
ClassLoader savedLoader = plugin.currentThreadLoader();
try {
@@ -573,7 +571,7 @@ public class Worker {
RecordConverter keyConverter = Class.forName(keyConverterClazzName).asSubclass(RecordConverter.class).getDeclaredConstructor().newInstance();
//value config
- Map<String, String> valueConverterConfig = keyValue.originalsWithPrefix(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER,true);
+ Map<String, String> valueConverterConfig = keyValue.originalsWithPrefix(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, true);
valueConverterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName());
valueConverter.configure(valueConverterConfig);
@@ -593,7 +591,7 @@ public class Worker {
retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters(connectorName, keyValue));
WorkerSourceTask workerSourceTask = new WorkerSourceTask(workerConfig, id,
- (SourceTask) task, savedLoader, keyValue, positionManagementService, keyConverter, valueConverter, producer, workerState, connectStatsManager, connectStatsService, transformChain, retryWithToleranceOperator);
+ (SourceTask) task, savedLoader, keyValue, positionManagementService, keyConverter, valueConverter, producer, workerState, connectStatsManager, connectStatsService, transformChain, retryWithToleranceOperator);
Future future = taskExecutor.submit(workerSourceTask);
// schedule offset committer
@@ -615,8 +613,8 @@ public class Worker {
retryWithToleranceOperator.reporters(ReporterManagerUtil.sinkTaskReporters(connectorName, keyValue, workerConfig));
WorkerSinkTask workerSinkTask = new WorkerSinkTask(workerConfig, id,
- (SinkTask) task, savedLoader, keyValue, keyConverter, valueConverter, consumer, workerState, connectStatsManager, connectStatsService, transformChain,
- retryWithToleranceOperator, ReporterManagerUtil.createWorkerErrorRecordReporter(keyValue, retryWithToleranceOperator, valueConverter));
+ (SinkTask) task, savedLoader, keyValue, keyConverter, valueConverter, consumer, workerState, connectStatsManager, connectStatsService, transformChain,
+ retryWithToleranceOperator, ReporterManagerUtil.createWorkerErrorRecordReporter(keyValue, retryWithToleranceOperator, valueConverter));
Future future = taskExecutor.submit(workerSinkTask);
taskToFutureMap.put(workerSinkTask, future);
this.pendingTasks.put(workerSinkTask, System.currentTimeMillis());
@@ -662,18 +660,18 @@ public class Worker {
retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters(id.connector(), keyValue));
WorkerDirectTask workerDirectTask = new WorkerDirectTask(
- workerConfig,
- id,
- (SourceTask) sourceTask,
- null,
- (SinkTask) sinkTask,
- keyValue,
- positionManagementService,
- workerState,
- connectStatsManager,
- connectStatsService,
- transformChain,
- retryWithToleranceOperator);
+ workerConfig,
+ id,
+ (SourceTask) sourceTask,
+ null,
+ (SinkTask) sinkTask,
+ keyValue,
+ positionManagementService,
+ workerState,
+ connectStatsManager,
+ connectStatsService,
+ transformChain,
+ retryWithToleranceOperator);
Future future = taskExecutor.submit(workerDirectTask);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 0a26faa..40c7eac 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -117,7 +117,6 @@ public class WorkerSourceTask extends WorkerTask {
private final AtomicReference<Throwable> producerSendException;
private List<ConnectRecord> toSendRecord;
-
private volatile RecordOffsetManagement.CommittableOffsets committableOffsets;
private final RecordOffsetManagement offsetManagement;
/**
@@ -131,19 +130,19 @@ public class WorkerSourceTask extends WorkerTask {
}
public WorkerSourceTask(ConnectConfig workerConfig,
- ConnectorTaskId id,
- SourceTask sourceTask,
- ClassLoader classLoader,
- ConnectKeyValue taskConfig,
- PositionManagementService positionManagementService,
- RecordConverter keyConverter,
- RecordConverter valueConverter,
- DefaultMQProducer producer,
- AtomicReference<WorkerState> workerState,
- ConnectStatsManager connectStatsManager,
- ConnectStatsService connectStatsService,
- TransformChain<ConnectRecord> transformChain,
- RetryWithToleranceOperator retryWithToleranceOperator) {
+ ConnectorTaskId id,
+ SourceTask sourceTask,
+ ClassLoader classLoader,
+ ConnectKeyValue taskConfig,
+ PositionManagementService positionManagementService,
+ RecordConverter keyConverter,
+ RecordConverter valueConverter,
+ DefaultMQProducer producer,
+ AtomicReference<WorkerState> workerState,
+ ConnectStatsManager connectStatsManager,
+ ConnectStatsService connectStatsService,
+ TransformChain<ConnectRecord> transformChain,
+ RetryWithToleranceOperator retryWithToleranceOperator) {
super(workerConfig, id, classLoader, taskConfig, retryWithToleranceOperator, transformChain, workerState);
this.sourceTask = sourceTask;
@@ -174,7 +173,6 @@ public class WorkerSourceTask extends WorkerTask {
}
}
-
@Override
public void close() {
producer.shutdown();
@@ -191,15 +189,13 @@ public class WorkerSourceTask extends WorkerTask {
}
}
-
protected Optional<RecordOffsetManagement.SubmittedPosition> prepareToSendRecord(
- ConnectRecord record
+ ConnectRecord record
) {
maybeThrowProducerSendException();
return Optional.of(this.offsetManagement.submitRecord(record.getPosition()));
}
-
/**
* Send list of sourceDataEntries to MQ.
*/
@@ -242,7 +238,7 @@ public class WorkerSourceTask extends WorkerTask {
});
} catch (RetriableException e) {
log.warn("{} Failed to send record to topic '{}'. Backing off before retrying: ",
- this, sourceMessage.getTopic(), e);
+ this, sourceMessage.getTopic(), e);
// Intercepted as successfully sent, used to continue sending next time
toSendRecord = toSendRecord.subList(processed, toSendRecord.size());
// remove pre submit position, for retry
@@ -270,17 +266,17 @@ public class WorkerSourceTask extends WorkerTask {
private void maybeThrowProducerSendException() {
if (producerSendException.get() != null) {
throw new ConnectException(
- "Unrecoverable exception from producer send callback",
- producerSendException.get()
+ "Unrecoverable exception from producer send callback",
+ producerSendException.get()
);
}
}
private void recordSendFailed(
- boolean synchronous,
- Message sourceMessage,
- ConnectRecord preTransformRecord,
- Throwable e) {
+ boolean synchronous,
+ Message sourceMessage,
+ ConnectRecord preTransformRecord,
+ Throwable e) {
if (synchronous) {
throw new ConnectException("Unrecoverable exception trying to send", e);
}
@@ -288,16 +284,16 @@ public class WorkerSourceTask extends WorkerTask {
if (retryWithToleranceOperator.getErrorToleranceType() == ToleranceType.ALL) {
// ignore all error
log.trace(
- "Ignoring failed record send: {} failed to send record to {}: ",
- WorkerSourceTask.this,
- topic,
- e
+ "Ignoring failed record send: {} failed to send record to {}: ",
+ WorkerSourceTask.this,
+ topic,
+ e
);
retryWithToleranceOperator.executeFailed(
- ErrorReporter.Stage.ROCKETMQ_PRODUCE,
- WorkerSourceTask.class,
- preTransformRecord,
- e);
+ ErrorReporter.Stage.ROCKETMQ_PRODUCE,
+ WorkerSourceTask.class,
+ preTransformRecord,
+ e);
commitTaskRecord(preTransformRecord, null);
} else {
log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
@@ -306,7 +302,6 @@ public class WorkerSourceTask extends WorkerTask {
}
}
-
/**
* failed send
*
@@ -316,7 +311,6 @@ public class WorkerSourceTask extends WorkerTask {
commitTaskRecord(record, null);
}
-
/**
* send success record
*
@@ -325,9 +319,9 @@ public class WorkerSourceTask extends WorkerTask {
* @param result
*/
private void recordSent(
- ConnectRecord preTransformRecord,
- Message sourceMessage,
- SendResult result) {
+ ConnectRecord preTransformRecord,
+ Message sourceMessage,
+ SendResult result) {
commitTaskRecord(preTransformRecord, result);
}
@@ -348,7 +342,6 @@ public class WorkerSourceTask extends WorkerTask {
sourceTask.commit(preTransformRecord, keyValue == null ? null : keyValue.getProperties());
}
-
/**
* Convert the source record into a producer record.
*/
@@ -359,10 +352,10 @@ public class WorkerSourceTask extends WorkerTask {
Message sourceMessage = new Message();
sourceMessage.setTopic(topic);
byte[] key = retryWithToleranceOperator.execute(() -> keyConverter.fromConnectData(topic, record.getKeySchema(), record.getKey()),
- ErrorReporter.Stage.CONVERTER, keyConverter.getClass());
+ ErrorReporter.Stage.CONVERTER, keyConverter.getClass());
byte[] value = retryWithToleranceOperator.execute(() -> valueConverter.fromConnectData(topic, record.getSchema(), record.getData()),
- ErrorReporter.Stage.CONVERTER, valueConverter.getClass());
+ ErrorReporter.Stage.CONVERTER, valueConverter.getClass());
if (value.length > RuntimeConfigDefine.MAX_MESSAGE_SIZE) {
log.error("Send record, message size is greater than {} bytes, record: {}", RuntimeConfigDefine.MAX_MESSAGE_SIZE, JSON.toJSONString(record));
}
@@ -449,10 +442,9 @@ public class WorkerSourceTask extends WorkerTask {
log.info("{} Source task finished initialization and start", this);
}
-
protected void recordPollReturned(int numRecordsInBatch) {
connectStatsManager.incSourceRecordPollTotalNums(numRecordsInBatch);
- connectStatsManager.incSourceRecordPollNums(id().toString()+"", numRecordsInBatch);
+ connectStatsManager.incSourceRecordPollNums(id().toString() + "", numRecordsInBatch);
}
/**
@@ -502,8 +494,8 @@ public class WorkerSourceTask extends WorkerTask {
protected void finalOffsetCommit(boolean b) {
offsetManagement.awaitAllMessages(
- workerConfig.getOffsetCommitTimeoutMsConfig(),
- TimeUnit.MILLISECONDS
+ workerConfig.getOffsetCommitTimeoutMsConfig(),
+ TimeUnit.MILLISECONDS
);
updateCommittableOffsets();
commitOffsets();
@@ -524,9 +516,9 @@ public class WorkerSourceTask extends WorkerTask {
if (committableOffsets.isEmpty()) {
log.debug("{} Either no records were produced by the task since the last offset commit, "
- + "or every record has been filtered out by a transformation "
- + "or dropped due to transformation or conversion errors.",
- this
+ + "or every record has been filtered out by a transformation "
+ + "or dropped due to transformation or conversion errors.",
+ this
);
// We continue with the offset commit process here instead of simply returning immediately
// in order to invoke SourceTask::commit and record metrics for a successful offset commit
@@ -534,17 +526,17 @@ public class WorkerSourceTask extends WorkerTask {
log.info("{} Committing offsets for {} acknowledged messages", this, committableOffsets.numCommittableMessages());
if (committableOffsets.hasPending()) {
log.debug("{} There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. "
- + "The source partition with the most pending messages is {}, with {} pending messages",
- this,
- committableOffsets.numUncommittableMessages(),
- committableOffsets.numDeques(),
- committableOffsets.largestDequePartition(),
- committableOffsets.largestDequeSize()
+ + "The source partition with the most pending messages is {}, with {} pending messages",
+ this,
+ committableOffsets.numUncommittableMessages(),
+ committableOffsets.numDeques(),
+ committableOffsets.largestDequePartition(),
+ committableOffsets.largestDequeSize()
);
} else {
log.debug("{} There are currently no pending messages for this offset commit; "
- + "all messages dispatched to the task's producer since the last commit have been acknowledged",
- this
+ + "all messages dispatched to the task's producer since the last commit have been acknowledged",
+ this
);
}
}
@@ -557,7 +549,7 @@ public class WorkerSourceTask extends WorkerTask {
// There was nothing in the offsets to process, but we still mark a successful offset commit.
long durationMillis = System.currentTimeMillis() - started;
log.debug("{} Finished offset commitOffsets successfully in {} ms",
- this, durationMillis);
+ this, durationMillis);
commitSourceTask();
return true;
}
@@ -586,7 +578,7 @@ public class WorkerSourceTask extends WorkerTask {
}
long durationMillis = System.currentTimeMillis() - started;
log.debug("{} Finished commitOffsets successfully in {} ms",
- this, durationMillis);
+ this, durationMillis);
commitSourceTask();
return true;
}
@@ -599,7 +591,6 @@ public class WorkerSourceTask extends WorkerTask {
}
}
-
private void inWriteRecordFail() {
connectStatsManager.incSourceRecordWriteTotalFailNums();
connectStatsManager.incSourceRecordWriteFailNums(id().toString());
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java
index 4a6492a..d46a7e1 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java
@@ -116,6 +116,7 @@ public class PluginUtils {
+ "|org\\.xml\\.sax"
+ "|io\\.openmessaging\\.connector\\.api"
+ "|org\\.slf4j"
+ + "|org\\.apache\\.rocketmq"
+ ")\\..*$"
+ "|io\\.openmessaging\\.KeyValue");