You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/06/08 01:40:49 UTC

[rocketmq-connect] branch master updated: fix(runtime) fix build error

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 826786f  fix(runtime) fix build error
826786f is described below

commit 826786fd59490978a7db686234004405e4eb5013
Author: 翊名 <du...@alibaba-inc.com>
AuthorDate: Wed Jun 8 09:40:34 2022 +0800

    fix(runtime) fix build error
---
 .../runtime/connectorwrapper/TransformChain.java    |  3 +--
 .../runtime/connectorwrapper/WorkerSinkTask.java    |  8 ++++----
 .../runtime/connectorwrapper/WorkerSourceTask.java  | 21 ++++++++++++---------
 3 files changed, 17 insertions(+), 15 deletions(-)

diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
index 485297a..86ce305 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
@@ -23,8 +23,6 @@ import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.Transform;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.internal.DefaultKeyValue;
-
-import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
@@ -127,6 +125,7 @@ public class TransformChain<R extends ConnectRecord> implements AutoCloseable {
 
     /**
      * close transforms
+     *
      * @throws Exception if this resource cannot be closed
      */
     @Override
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
index 40db678..8f9952c 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
@@ -80,9 +80,9 @@ public class WorkerSinkTask implements WorkerTask {
     public static final String QUEUENAMES_CONFIG = "topicNames";
 
     /**
-     * The configuration key that provide the list of topicQueues that are inputs for this SinkTask;
-     * The config value format is topicName1,brokerName1,queueId1;topicName2,brokerName2,queueId2,
-     * use topicName1, brokerName1, queueId1 can construct {@link MessageQueue}
+     * The configuration key that provide the list of topicQueues that are inputs for this SinkTask; The config value
+     * format is topicName1,brokerName1,queueId1;topicName2,brokerName2,queueId2, use topicName1, brokerName1, queueId1
+     * can construct {@link MessageQueue}
      */
     public static final String TOPIC_QUEUES_CONFIG = "topicQueues";
 
@@ -505,7 +505,7 @@ public class WorkerSinkTask implements WorkerTask {
         try {
             transformChain.close();
         } catch (Exception exception) {
-            log.error("Transform close failed,{}", exception);
+            log.error("Transform close failed, {}", exception);
         }
     }
 
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 2743481..da9891e 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -88,8 +88,6 @@ public class WorkerSourceTask implements WorkerTask {
      */
     private AtomicReference<WorkerTaskState> state;
 
-
-
     /**
      * Used to read the position of source data source.
      */
@@ -166,15 +164,18 @@ public class WorkerSourceTask implements WorkerTask {
             sourceTask.init(taskConfig);
             sourceTask.start(new SourceTaskContext() {
 
-                @Override public OffsetStorageReader offsetStorageReader() {
+                @Override
+                public OffsetStorageReader offsetStorageReader() {
                     return offsetStorageReader;
                 }
 
-                @Override public String getConnectorName() {
+                @Override
+                public String getConnectorName() {
                     return taskConfig.getString(RuntimeConfigDefine.CONNECTOR_ID);
                 }
 
-                @Override public String getTaskName() {
+                @Override
+                public String getTaskName() {
                     return taskConfig.getString(RuntimeConfigDefine.TASK_ID);
                 }
             });
@@ -212,7 +213,7 @@ public class WorkerSourceTask implements WorkerTask {
             state.compareAndSet(WorkerTaskState.STOPPING, WorkerTaskState.STOPPED);
             log.info("Source task stop, config:{}", JSON.toJSONString(taskConfig));
         } catch (Exception e) {
-            log.error("Run task failed., task config: " +  JSON.toJSONString(taskConfig), e);
+            log.error("Run task failed., task config: " + JSON.toJSONString(taskConfig), e);
             state.set(WorkerTaskState.ERROR);
         } finally {
             if (producer != null) {
@@ -249,7 +250,7 @@ public class WorkerSourceTask implements WorkerTask {
         try {
             transformChain.close();
         } catch (Exception exception) {
-            log.error("Transform close failed,{}", exception);
+            log.error("Transform close failed, {}", exception);
         }
         log.warn("Stop a task success.");
     }
@@ -324,7 +325,8 @@ public class WorkerSourceTask implements WorkerTask {
             }
             try {
                 producer.send(sourceMessage, new SendCallback() {
-                    @Override public void onSuccess(org.apache.rocketmq.client.producer.SendResult result) {
+                    @Override
+                    public void onSuccess(org.apache.rocketmq.client.producer.SendResult result) {
                         log.info("Successful send message to RocketMQ:{}, Topic {}", result.getMsgId(), result.getMessageQueue().getTopic());
                         connectStatsManager.incSourceRecordWriteTotalNums();
                         connectStatsManager.incSourceRecordWriteNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
@@ -341,7 +343,8 @@ public class WorkerSourceTask implements WorkerTask {
                         }
                     }
 
-                    @Override public void onException(Throwable throwable) {
+                    @Override
+                    public void onException(Throwable throwable) {
                         log.error("Source task send record failed ,error msg {}. message {}", throwable.getMessage(), JSON.toJSONString(sourceMessage), throwable);
                         connectStatsManager.incSourceRecordWriteTotalFailNums();
                         connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));