You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/06/08 01:40:49 UTC
[rocketmq-connect] branch master updated: fix(runtime) fix build error
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 826786f fix(runtime) fix build error
826786f is described below
commit 826786fd59490978a7db686234004405e4eb5013
Author: 翊名 <du...@alibaba-inc.com>
AuthorDate: Wed Jun 8 09:40:34 2022 +0800
fix(runtime) fix build error
---
.../runtime/connectorwrapper/TransformChain.java | 3 +--
.../runtime/connectorwrapper/WorkerSinkTask.java | 8 ++++----
.../runtime/connectorwrapper/WorkerSourceTask.java | 21 ++++++++++++---------
3 files changed, 17 insertions(+), 15 deletions(-)
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
index 485297a..86ce305 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
@@ -23,8 +23,6 @@ import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.Transform;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.internal.DefaultKeyValue;
-
-import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@@ -127,6 +125,7 @@ public class TransformChain<R extends ConnectRecord> implements AutoCloseable {
/**
* close transforms
+ *
* @throws Exception if this resource cannot be closed
*/
@Override
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
index 40db678..8f9952c 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
@@ -80,9 +80,9 @@ public class WorkerSinkTask implements WorkerTask {
public static final String QUEUENAMES_CONFIG = "topicNames";
/**
- * The configuration key that provide the list of topicQueues that are inputs for this SinkTask;
- * The config value format is topicName1,brokerName1,queueId1;topicName2,brokerName2,queueId2,
- * use topicName1, brokerName1, queueId1 can construct {@link MessageQueue}
+ * The configuration key that provide the list of topicQueues that are inputs for this SinkTask; The config value
+ * format is topicName1,brokerName1,queueId1;topicName2,brokerName2,queueId2, use topicName1, brokerName1, queueId1
+ * can construct {@link MessageQueue}
*/
public static final String TOPIC_QUEUES_CONFIG = "topicQueues";
@@ -505,7 +505,7 @@ public class WorkerSinkTask implements WorkerTask {
try {
transformChain.close();
} catch (Exception exception) {
- log.error("Transform close failed,{}", exception);
+ log.error("Transform close failed, {}", exception);
}
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 2743481..da9891e 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -88,8 +88,6 @@ public class WorkerSourceTask implements WorkerTask {
*/
private AtomicReference<WorkerTaskState> state;
-
-
/**
* Used to read the position of source data source.
*/
@@ -166,15 +164,18 @@ public class WorkerSourceTask implements WorkerTask {
sourceTask.init(taskConfig);
sourceTask.start(new SourceTaskContext() {
- @Override public OffsetStorageReader offsetStorageReader() {
+ @Override
+ public OffsetStorageReader offsetStorageReader() {
return offsetStorageReader;
}
- @Override public String getConnectorName() {
+ @Override
+ public String getConnectorName() {
return taskConfig.getString(RuntimeConfigDefine.CONNECTOR_ID);
}
- @Override public String getTaskName() {
+ @Override
+ public String getTaskName() {
return taskConfig.getString(RuntimeConfigDefine.TASK_ID);
}
});
@@ -212,7 +213,7 @@ public class WorkerSourceTask implements WorkerTask {
state.compareAndSet(WorkerTaskState.STOPPING, WorkerTaskState.STOPPED);
log.info("Source task stop, config:{}", JSON.toJSONString(taskConfig));
} catch (Exception e) {
- log.error("Run task failed., task config: " + JSON.toJSONString(taskConfig), e);
+ log.error("Run task failed., task config: " + JSON.toJSONString(taskConfig), e);
state.set(WorkerTaskState.ERROR);
} finally {
if (producer != null) {
@@ -249,7 +250,7 @@ public class WorkerSourceTask implements WorkerTask {
try {
transformChain.close();
} catch (Exception exception) {
- log.error("Transform close failed,{}", exception);
+ log.error("Transform close failed, {}", exception);
}
log.warn("Stop a task success.");
}
@@ -324,7 +325,8 @@ public class WorkerSourceTask implements WorkerTask {
}
try {
producer.send(sourceMessage, new SendCallback() {
- @Override public void onSuccess(org.apache.rocketmq.client.producer.SendResult result) {
+ @Override
+ public void onSuccess(org.apache.rocketmq.client.producer.SendResult result) {
log.info("Successful send message to RocketMQ:{}, Topic {}", result.getMsgId(), result.getMessageQueue().getTopic());
connectStatsManager.incSourceRecordWriteTotalNums();
connectStatsManager.incSourceRecordWriteNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
@@ -341,7 +343,8 @@ public class WorkerSourceTask implements WorkerTask {
}
}
- @Override public void onException(Throwable throwable) {
+ @Override
+ public void onException(Throwable throwable) {
log.error("Source task send record failed ,error msg {}. message {}", throwable.getMessage(), JSON.toJSONString(sourceMessage), throwable);
connectStatsManager.incSourceRecordWriteTotalFailNums();
connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));