You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/07/07 11:01:10 UTC
[rocketmq-connect] branch master updated: fix compile error
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 4758256 fix compile error
4758256 is described below
commit 475825677fb0da3e06013587fad8f3e2a7c8c7cc
Author: yuntian.zb <yu...@alibaba-inc.com>
AuthorDate: Thu Jul 7 19:01:02 2022 +0800
fix compile error
---
.../org/apache/rocketmq/connect/runtime/ConnectController.java | 0
.../connect/runtime/connectorwrapper/WorkerDirectTask.java | 6 ------
.../connect/runtime/errors/WorkerErrorRecordReporter.java | 9 ++++-----
3 files changed, 4 insertions(+), 11 deletions(-)
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectController.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectController.java
deleted file mode 100644
index e69de29..0000000
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
index de3d25b..3387a80 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
@@ -179,9 +179,6 @@ public class WorkerDirectTask implements WorkerTask {
return taskConfig;
}
- @Override public KeyValue configs() {
- return taskConfig;
- }
@Override
public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
@@ -230,9 +227,6 @@ public class WorkerDirectTask implements WorkerTask {
return taskConfig.getString(RuntimeConfigDefine.TASK_ID);
}
- @Override public KeyValue configs() {
- return taskConfig;
- }
/**
* Get the configurations of current task.
*
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/WorkerErrorRecordReporter.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/WorkerErrorRecordReporter.java
index d37ee7c..18323a6 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/WorkerErrorRecordReporter.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/WorkerErrorRecordReporter.java
@@ -25,7 +25,6 @@ import io.openmessaging.connector.api.data.RecordConverter;
import io.openmessaging.connector.api.data.RecordPartition;
import org.apache.rocketmq.common.message.MessageExt;
-
/**
* worker error record reporter
*/
@@ -35,7 +34,7 @@ public class WorkerErrorRecordReporter implements ErrorRecordReporter {
private RecordConverter converter;
public WorkerErrorRecordReporter(RetryWithToleranceOperator retryWithToleranceOperator,
- RecordConverter converter) {
+ RecordConverter converter) {
this.retryWithToleranceOperator = retryWithToleranceOperator;
this.converter = converter;
}
@@ -56,18 +55,18 @@ public class WorkerErrorRecordReporter implements ErrorRecordReporter {
String brokerName = partition.getPartition().containsKey("brokerName") ? String.valueOf(partition.getPartition().get("topic")) : null;
MessageExt consumerRecord = new MessageExt();
- if (converter != null && converter instanceof RecordConverter){
+ if (converter != null && converter instanceof RecordConverter) {
byte[] value = converter.fromConnectData(topic, record.getSchema(), record.getData());
consumerRecord.setBody(value);
consumerRecord.setBrokerName(brokerName);
consumerRecord.setQueueId(queueId);
consumerRecord.setQueueOffset(queueOffset);
- }else {
+ } else {
byte[] messageBody = JSON.toJSONString(record).getBytes();
consumerRecord.setBody(messageBody);
}
// add extensions
- record.getExtensions().keySet().forEach(key->{
+ record.getExtensions().keySet().forEach(key -> {
consumerRecord.putUserProperty(key, record.getExtensions().getString(key));
});
retryWithToleranceOperator.executeFailed(ErrorReporter.Stage.TASK_PUT, SinkTask.class, consumerRecord, error);