You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/07/07 11:01:10 UTC

[rocketmq-connect] branch master updated: fix compile error

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 4758256  fix compile error
4758256 is described below

commit 475825677fb0da3e06013587fad8f3e2a7c8c7cc
Author: yuntian.zb <yu...@alibaba-inc.com>
AuthorDate: Thu Jul 7 19:01:02 2022 +0800

    fix compile error
---
 .../org/apache/rocketmq/connect/runtime/ConnectController.java   | 0
 .../connect/runtime/connectorwrapper/WorkerDirectTask.java       | 6 ------
 .../connect/runtime/errors/WorkerErrorRecordReporter.java        | 9 ++++-----
 3 files changed, 4 insertions(+), 11 deletions(-)

diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectController.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectController.java
deleted file mode 100644
index e69de29..0000000
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
index de3d25b..3387a80 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
@@ -179,9 +179,6 @@ public class WorkerDirectTask implements WorkerTask {
                 return taskConfig;
             }
 
-            @Override public KeyValue configs() {
-                return taskConfig;
-            }
 
             @Override
             public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
@@ -230,9 +227,6 @@ public class WorkerDirectTask implements WorkerTask {
                 return taskConfig.getString(RuntimeConfigDefine.TASK_ID);
             }
 
-            @Override public KeyValue configs() {
-                return taskConfig;
-            }
             /**
              * Get the configurations of current task.
              *
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/WorkerErrorRecordReporter.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/WorkerErrorRecordReporter.java
index d37ee7c..18323a6 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/WorkerErrorRecordReporter.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/WorkerErrorRecordReporter.java
@@ -25,7 +25,6 @@ import io.openmessaging.connector.api.data.RecordConverter;
 import io.openmessaging.connector.api.data.RecordPartition;
 import org.apache.rocketmq.common.message.MessageExt;
 
-
 /**
  * worker error record reporter
  */
@@ -35,7 +34,7 @@ public class WorkerErrorRecordReporter implements ErrorRecordReporter {
     private RecordConverter converter;
 
     public WorkerErrorRecordReporter(RetryWithToleranceOperator retryWithToleranceOperator,
-                                     RecordConverter converter) {
+        RecordConverter converter) {
         this.retryWithToleranceOperator = retryWithToleranceOperator;
         this.converter = converter;
     }
@@ -56,18 +55,18 @@ public class WorkerErrorRecordReporter implements ErrorRecordReporter {
         String brokerName = partition.getPartition().containsKey("brokerName") ? String.valueOf(partition.getPartition().get("topic")) : null;
 
         MessageExt consumerRecord = new MessageExt();
-        if (converter != null && converter instanceof RecordConverter){
+        if (converter != null && converter instanceof RecordConverter) {
             byte[] value = converter.fromConnectData(topic, record.getSchema(), record.getData());
             consumerRecord.setBody(value);
             consumerRecord.setBrokerName(brokerName);
             consumerRecord.setQueueId(queueId);
             consumerRecord.setQueueOffset(queueOffset);
-        }else {
+        } else {
             byte[] messageBody = JSON.toJSONString(record).getBytes();
             consumerRecord.setBody(messageBody);
         }
         // add extensions
-        record.getExtensions().keySet().forEach(key->{
+        record.getExtensions().keySet().forEach(key -> {
             consumerRecord.putUserProperty(key, record.getExtensions().getString(key));
         });
         retryWithToleranceOperator.executeFailed(ErrorReporter.Stage.TASK_PUT, SinkTask.class, consumerRecord, error);