You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/07/29 08:27:41 UTC

[rocketmq-connect] branch master updated: [ISSUE #218]The same class conversion fails when the rocketmq client is included in the connector implementation

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 920717d  [ISSUE #218]The same class conversion fails when the rocketmq client is included in the connector implementation
920717d is described below

commit 920717ddff6aba686fca1103ab8044980cedb445
Author: zhoubo <87...@qq.com>
AuthorDate: Fri Jul 29 16:27:36 2022 +0800

    [ISSUE #218]The same class conversion fails when the rocketmq client is included in the connector implementation
---
 .../connect/runtime/connectorwrapper/Worker.java   |  50 +++++-----
 .../runtime/connectorwrapper/WorkerSourceTask.java | 109 ++++++++++-----------
 .../runtime/controller/isolation/PluginUtils.java  |   1 +
 3 files changed, 75 insertions(+), 85 deletions(-)

diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index cbfd271..ca34ca7 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -120,9 +120,9 @@ public class Worker {
     private final ConnectStatsService connectStatsService;
 
     public Worker(ConnectConfig workerConfig,
-                  PositionManagementService positionManagementService,
-                  ConfigManagementService configManagementService,
-                  Plugin plugin, AbstractConnectController connectController) {
+        PositionManagementService positionManagementService,
+        ConfigManagementService configManagementService,
+        Plugin plugin, AbstractConnectController connectController) {
         this.workerConfig = workerConfig;
         this.taskExecutor = Executors.newCachedThreadPool(new DefaultThreadFactory("task-Worker-Executor-"));
         this.positionManagementService = positionManagementService;
@@ -146,7 +146,7 @@ public class Worker {
      * @throws Exception
      */
     public synchronized void startConnectors(Map<String, ConnectKeyValue> connectorConfigs,
-                                             AbstractConnectController connectController) throws Exception {
+        AbstractConnectController connectController) throws Exception {
         Set<WorkerConnector> stoppedConnector = new HashSet<>();
         for (WorkerConnector workerConnector : workingConnectors) {
             try {
@@ -258,13 +258,12 @@ public class Worker {
 
     }
 
-
     public Set<WorkerConnector> getWorkingConnectors() {
         return workingConnectors;
     }
 
     public void setWorkingConnectors(
-            Set<WorkerConnector> workingConnectors) {
+        Set<WorkerConnector> workingConnectors) {
         this.workingConnectors = workingConnectors;
     }
 
@@ -378,7 +377,7 @@ public class Worker {
                     break;
                 default:
                     log.error("[BUG] Illegal State in when checking running tasks, {} is in {} state",
-                            ((WorkerTask) runnable).id().connector(), state);
+                        ((WorkerTask) runnable).id().connector(), state);
                     break;
             }
         }
@@ -499,7 +498,7 @@ public class Worker {
                     break;
                 default:
                     log.error("[BUG] Illegal State in when checking stopping tasks, {} is in {} state",
-                            ((WorkerTask) runnable).id().connector(), state.toString());
+                        ((WorkerTask) runnable).id().connector(), state.toString());
             }
         }
     }
@@ -531,7 +530,7 @@ public class Worker {
                     break;
                 default:
                     log.error("[BUG] Illegal State in when checking pending tasks, {} is in {} state",
-                            ((WorkerTask) runnable).id().connector(), state.toString());
+                        ((WorkerTask) runnable).id().connector(), state.toString());
                     break;
             }
         }
@@ -554,7 +553,6 @@ public class Worker {
                     continue;
                 }
 
-
                 ClassLoader savedLoader = plugin.currentThreadLoader();
                 try {
 
@@ -573,7 +571,7 @@ public class Worker {
                     RecordConverter keyConverter = Class.forName(keyConverterClazzName).asSubclass(RecordConverter.class).getDeclaredConstructor().newInstance();
 
                     //value config
-                    Map<String, String> valueConverterConfig = keyValue.originalsWithPrefix(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER,true);
+                    Map<String, String> valueConverterConfig = keyValue.originalsWithPrefix(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, true);
                     valueConverterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName());
                     valueConverter.configure(valueConverterConfig);
 
@@ -593,7 +591,7 @@ public class Worker {
                         retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters(connectorName, keyValue));
 
                         WorkerSourceTask workerSourceTask = new WorkerSourceTask(workerConfig, id,
-                                (SourceTask) task, savedLoader, keyValue, positionManagementService, keyConverter, valueConverter, producer, workerState, connectStatsManager, connectStatsService, transformChain, retryWithToleranceOperator);
+                            (SourceTask) task, savedLoader, keyValue, positionManagementService, keyConverter, valueConverter, producer, workerState, connectStatsManager, connectStatsService, transformChain, retryWithToleranceOperator);
 
                         Future future = taskExecutor.submit(workerSourceTask);
                         // schedule offset committer
@@ -615,8 +613,8 @@ public class Worker {
                         retryWithToleranceOperator.reporters(ReporterManagerUtil.sinkTaskReporters(connectorName, keyValue, workerConfig));
 
                         WorkerSinkTask workerSinkTask = new WorkerSinkTask(workerConfig, id,
-                                (SinkTask) task, savedLoader, keyValue, keyConverter, valueConverter, consumer, workerState, connectStatsManager, connectStatsService, transformChain,
-                                retryWithToleranceOperator, ReporterManagerUtil.createWorkerErrorRecordReporter(keyValue, retryWithToleranceOperator, valueConverter));
+                            (SinkTask) task, savedLoader, keyValue, keyConverter, valueConverter, consumer, workerState, connectStatsManager, connectStatsService, transformChain,
+                            retryWithToleranceOperator, ReporterManagerUtil.createWorkerErrorRecordReporter(keyValue, retryWithToleranceOperator, valueConverter));
                         Future future = taskExecutor.submit(workerSinkTask);
                         taskToFutureMap.put(workerSinkTask, future);
                         this.pendingTasks.put(workerSinkTask, System.currentTimeMillis());
@@ -662,18 +660,18 @@ public class Worker {
         retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters(id.connector(), keyValue));
 
         WorkerDirectTask workerDirectTask = new WorkerDirectTask(
-                workerConfig,
-                id,
-                (SourceTask) sourceTask,
-                null,
-                (SinkTask) sinkTask,
-                keyValue,
-                positionManagementService,
-                workerState,
-                connectStatsManager,
-                connectStatsService,
-                transformChain,
-                retryWithToleranceOperator);
+            workerConfig,
+            id,
+            (SourceTask) sourceTask,
+            null,
+            (SinkTask) sinkTask,
+            keyValue,
+            positionManagementService,
+            workerState,
+            connectStatsManager,
+            connectStatsService,
+            transformChain,
+            retryWithToleranceOperator);
 
         Future future = taskExecutor.submit(workerDirectTask);
 
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 0a26faa..40c7eac 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -117,7 +117,6 @@ public class WorkerSourceTask extends WorkerTask {
     private final AtomicReference<Throwable> producerSendException;
     private List<ConnectRecord> toSendRecord;
 
-
     private volatile RecordOffsetManagement.CommittableOffsets committableOffsets;
     private final RecordOffsetManagement offsetManagement;
     /**
@@ -131,19 +130,19 @@ public class WorkerSourceTask extends WorkerTask {
     }
 
     public WorkerSourceTask(ConnectConfig workerConfig,
-                            ConnectorTaskId id,
-                            SourceTask sourceTask,
-                            ClassLoader classLoader,
-                            ConnectKeyValue taskConfig,
-                            PositionManagementService positionManagementService,
-                            RecordConverter keyConverter,
-                            RecordConverter valueConverter,
-                            DefaultMQProducer producer,
-                            AtomicReference<WorkerState> workerState,
-                            ConnectStatsManager connectStatsManager,
-                            ConnectStatsService connectStatsService,
-                            TransformChain<ConnectRecord> transformChain,
-                            RetryWithToleranceOperator retryWithToleranceOperator) {
+        ConnectorTaskId id,
+        SourceTask sourceTask,
+        ClassLoader classLoader,
+        ConnectKeyValue taskConfig,
+        PositionManagementService positionManagementService,
+        RecordConverter keyConverter,
+        RecordConverter valueConverter,
+        DefaultMQProducer producer,
+        AtomicReference<WorkerState> workerState,
+        ConnectStatsManager connectStatsManager,
+        ConnectStatsService connectStatsService,
+        TransformChain<ConnectRecord> transformChain,
+        RetryWithToleranceOperator retryWithToleranceOperator) {
         super(workerConfig, id, classLoader, taskConfig, retryWithToleranceOperator, transformChain, workerState);
 
         this.sourceTask = sourceTask;
@@ -174,7 +173,6 @@ public class WorkerSourceTask extends WorkerTask {
         }
     }
 
-
     @Override
     public void close() {
         producer.shutdown();
@@ -191,15 +189,13 @@ public class WorkerSourceTask extends WorkerTask {
         }
     }
 
-
     protected Optional<RecordOffsetManagement.SubmittedPosition> prepareToSendRecord(
-            ConnectRecord record
+        ConnectRecord record
     ) {
         maybeThrowProducerSendException();
         return Optional.of(this.offsetManagement.submitRecord(record.getPosition()));
     }
 
-
     /**
      * Send list of sourceDataEntries to MQ.
      */
@@ -242,7 +238,7 @@ public class WorkerSourceTask extends WorkerTask {
                 });
             } catch (RetriableException e) {
                 log.warn("{} Failed to send record to topic '{}'. Backing off before retrying: ",
-                        this, sourceMessage.getTopic(), e);
+                    this, sourceMessage.getTopic(), e);
                 // Intercepted as successfully sent, used to continue sending next time
                 toSendRecord = toSendRecord.subList(processed, toSendRecord.size());
                 // remove pre submit position, for retry
@@ -270,17 +266,17 @@ public class WorkerSourceTask extends WorkerTask {
     private void maybeThrowProducerSendException() {
         if (producerSendException.get() != null) {
             throw new ConnectException(
-                    "Unrecoverable exception from producer send callback",
-                    producerSendException.get()
+                "Unrecoverable exception from producer send callback",
+                producerSendException.get()
             );
         }
     }
 
     private void recordSendFailed(
-            boolean synchronous,
-            Message sourceMessage,
-            ConnectRecord preTransformRecord,
-            Throwable e) {
+        boolean synchronous,
+        Message sourceMessage,
+        ConnectRecord preTransformRecord,
+        Throwable e) {
         if (synchronous) {
             throw new ConnectException("Unrecoverable exception trying to send", e);
         }
@@ -288,16 +284,16 @@ public class WorkerSourceTask extends WorkerTask {
         if (retryWithToleranceOperator.getErrorToleranceType() == ToleranceType.ALL) {
             // ignore all error
             log.trace(
-                    "Ignoring failed record send: {} failed to send record to {}: ",
-                    WorkerSourceTask.this,
-                    topic,
-                    e
+                "Ignoring failed record send: {} failed to send record to {}: ",
+                WorkerSourceTask.this,
+                topic,
+                e
             );
             retryWithToleranceOperator.executeFailed(
-                    ErrorReporter.Stage.ROCKETMQ_PRODUCE,
-                    WorkerSourceTask.class,
-                    preTransformRecord,
-                    e);
+                ErrorReporter.Stage.ROCKETMQ_PRODUCE,
+                WorkerSourceTask.class,
+                preTransformRecord,
+                e);
             commitTaskRecord(preTransformRecord, null);
         } else {
             log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
@@ -306,7 +302,6 @@ public class WorkerSourceTask extends WorkerTask {
         }
     }
 
-
     /**
      * failed send
      *
@@ -316,7 +311,6 @@ public class WorkerSourceTask extends WorkerTask {
         commitTaskRecord(record, null);
     }
 
-
     /**
      * send success record
      *
@@ -325,9 +319,9 @@ public class WorkerSourceTask extends WorkerTask {
      * @param result
      */
     private void recordSent(
-            ConnectRecord preTransformRecord,
-            Message sourceMessage,
-            SendResult result) {
+        ConnectRecord preTransformRecord,
+        Message sourceMessage,
+        SendResult result) {
         commitTaskRecord(preTransformRecord, result);
     }
 
@@ -348,7 +342,6 @@ public class WorkerSourceTask extends WorkerTask {
         sourceTask.commit(preTransformRecord, keyValue == null ? null : keyValue.getProperties());
     }
 
-
     /**
      * Convert the source record into a producer record.
      */
@@ -359,10 +352,10 @@ public class WorkerSourceTask extends WorkerTask {
         Message sourceMessage = new Message();
         sourceMessage.setTopic(topic);
         byte[] key = retryWithToleranceOperator.execute(() -> keyConverter.fromConnectData(topic, record.getKeySchema(), record.getKey()),
-                ErrorReporter.Stage.CONVERTER, keyConverter.getClass());
+            ErrorReporter.Stage.CONVERTER, keyConverter.getClass());
 
         byte[] value = retryWithToleranceOperator.execute(() -> valueConverter.fromConnectData(topic, record.getSchema(), record.getData()),
-                ErrorReporter.Stage.CONVERTER, valueConverter.getClass());
+            ErrorReporter.Stage.CONVERTER, valueConverter.getClass());
         if (value.length > RuntimeConfigDefine.MAX_MESSAGE_SIZE) {
             log.error("Send record, message size is greater than {} bytes, record: {}", RuntimeConfigDefine.MAX_MESSAGE_SIZE, JSON.toJSONString(record));
         }
@@ -449,10 +442,9 @@ public class WorkerSourceTask extends WorkerTask {
         log.info("{} Source task finished initialization and start", this);
     }
 
-
     protected void recordPollReturned(int numRecordsInBatch) {
         connectStatsManager.incSourceRecordPollTotalNums(numRecordsInBatch);
-        connectStatsManager.incSourceRecordPollNums(id().toString()+"", numRecordsInBatch);
+        connectStatsManager.incSourceRecordPollNums(id().toString() + "", numRecordsInBatch);
     }
 
     /**
@@ -502,8 +494,8 @@ public class WorkerSourceTask extends WorkerTask {
     protected void finalOffsetCommit(boolean b) {
 
         offsetManagement.awaitAllMessages(
-                workerConfig.getOffsetCommitTimeoutMsConfig(),
-                TimeUnit.MILLISECONDS
+            workerConfig.getOffsetCommitTimeoutMsConfig(),
+            TimeUnit.MILLISECONDS
         );
         updateCommittableOffsets();
         commitOffsets();
@@ -524,9 +516,9 @@ public class WorkerSourceTask extends WorkerTask {
 
         if (committableOffsets.isEmpty()) {
             log.debug("{} Either no records were produced by the task since the last offset commit, "
-                            + "or every record has been filtered out by a transformation "
-                            + "or dropped due to transformation or conversion errors.",
-                    this
+                    + "or every record has been filtered out by a transformation "
+                    + "or dropped due to transformation or conversion errors.",
+                this
             );
             // We continue with the offset commit process here instead of simply returning immediately
             // in order to invoke SourceTask::commit and record metrics for a successful offset commit
@@ -534,17 +526,17 @@ public class WorkerSourceTask extends WorkerTask {
             log.info("{} Committing offsets for {} acknowledged messages", this, committableOffsets.numCommittableMessages());
             if (committableOffsets.hasPending()) {
                 log.debug("{} There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. "
-                                + "The source partition with the most pending messages is {}, with {} pending messages",
-                        this,
-                        committableOffsets.numUncommittableMessages(),
-                        committableOffsets.numDeques(),
-                        committableOffsets.largestDequePartition(),
-                        committableOffsets.largestDequeSize()
+                        + "The source partition with the most pending messages is {}, with {} pending messages",
+                    this,
+                    committableOffsets.numUncommittableMessages(),
+                    committableOffsets.numDeques(),
+                    committableOffsets.largestDequePartition(),
+                    committableOffsets.largestDequeSize()
                 );
             } else {
                 log.debug("{} There are currently no pending messages for this offset commit; "
-                                + "all messages dispatched to the task's producer since the last commit have been acknowledged",
-                        this
+                        + "all messages dispatched to the task's producer since the last commit have been acknowledged",
+                    this
                 );
             }
         }
@@ -557,7 +549,7 @@ public class WorkerSourceTask extends WorkerTask {
             // There was nothing in the offsets to process, but we still mark a successful offset commit.
             long durationMillis = System.currentTimeMillis() - started;
             log.debug("{} Finished offset commitOffsets successfully in {} ms",
-                    this, durationMillis);
+                this, durationMillis);
             commitSourceTask();
             return true;
         }
@@ -586,7 +578,7 @@ public class WorkerSourceTask extends WorkerTask {
         }
         long durationMillis = System.currentTimeMillis() - started;
         log.debug("{} Finished commitOffsets successfully in {} ms",
-                this, durationMillis);
+            this, durationMillis);
         commitSourceTask();
         return true;
     }
@@ -599,7 +591,6 @@ public class WorkerSourceTask extends WorkerTask {
         }
     }
 
-
     private void inWriteRecordFail() {
         connectStatsManager.incSourceRecordWriteTotalFailNums();
         connectStatsManager.incSourceRecordWriteFailNums(id().toString());
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java
index 4a6492a..d46a7e1 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java
@@ -116,6 +116,7 @@ public class PluginUtils {
         + "|org\\.xml\\.sax"
         + "|io\\.openmessaging\\.connector\\.api"
         + "|org\\.slf4j"
+        + "|org\\.apache\\.rocketmq"
         + ")\\..*$"
         + "|io\\.openmessaging\\.KeyValue");