You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/04/02 08:37:45 UTC
[rocketmq-connect] branch master updated: Fix chinese character issue. (#46)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new d4a8157 Fix chinese character issue. (#46)
d4a8157 is described below
commit d4a8157a893ef2adddcd3975ca70e52800c1f957
Author: sunxi92 <su...@163.com>
AuthorDate: Sat Apr 2 16:37:41 2022 +0800
Fix chinese character issue. (#46)
---
.../connect/runtime/connectorwrapper/WorkerSinkTask.java | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
index 7b1721b..dd6b8bc 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
@@ -230,7 +230,7 @@ public class WorkerSinkTask implements WorkerTask {
throw e;
} catch (Throwable e) {
state.set(WorkerTaskState.ERROR);
- log.error(" sink task {},pull message MQClientException, Error {} ", this, e.getMessage(), e);
+ log.error(" sink task {},pull message MQClientException, Error {} ", this, e.getMessage(), e);
connectStatsManager.incSinkRecordPutTotalFailNums();
connectStatsManager.incSinkRecordPutFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
}
@@ -373,7 +373,7 @@ public class WorkerSinkTask implements WorkerTask {
pullMsgErrorCount = 0;
} catch (MQClientException e) {
pullMsgErrorCount++;
- log.error(" sink task message queue {}, offset {}, taskconfig {},pull message MQClientException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+ log.error(" sink task message queue {}, offset {}, taskconfig {},pull message MQClientException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
connectStatsManager.incSinkRecordReadTotalFailNums();
connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
@@ -381,7 +381,7 @@ public class WorkerSinkTask implements WorkerTask {
connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT);
} catch (RemotingException e) {
pullMsgErrorCount++;
- log.error(" sink task message queue {}, offset {}, taskconfig {},pull message RemotingException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+ log.error(" sink task message queue {}, offset {}, taskconfig {},pull message RemotingException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
connectStatsManager.incSinkRecordReadTotalFailNums();
connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
@@ -389,7 +389,7 @@ public class WorkerSinkTask implements WorkerTask {
connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT);
} catch (MQBrokerException e) {
pullMsgErrorCount++;
- log.error(" sink task message queue {}, offset {}, taskconfig {},pull message MQBrokerException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+ log.error(" sink task message queue {}, offset {}, taskconfig {},pull message MQBrokerException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
connectStatsManager.incSinkRecordReadTotalFailNums();
connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
@@ -397,7 +397,7 @@ public class WorkerSinkTask implements WorkerTask {
connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT);
} catch (InterruptedException e) {
pullMsgErrorCount++;
- log.error(" sink task message queue {}, offset {}, taskconfig {},pull message InterruptedException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+ log.error(" sink task message queue {}, offset {}, taskconfig {},pull message InterruptedException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
connectStatsManager.incSinkRecordReadTotalFailNums();
connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
@@ -406,7 +406,7 @@ public class WorkerSinkTask implements WorkerTask {
throw e;
} catch (Throwable e) {
pullMsgErrorCount++;
- log.error(" sink task message queue {}, offset {}, taskconfig {},pull message Throwable, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), e);
+ log.error(" sink task message queue {}, offset {}, taskconfig {},pull message Throwable, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), e);
connectStatsManager.incSinkRecordReadTotalFailNums();
connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
@@ -430,7 +430,7 @@ public class WorkerSinkTask implements WorkerTask {
if (messageQueuesOffsetMap.containsKey(entry.getKey())) {
messageQueuesOffsetMap.put(entry.getKey(), pullResult.getNextBeginOffset());
} else {
- log.warn("The consumer may have load balancing, and the current task does not process the message queue,messageQueuesOffsetMap {}, messageQueue {}", JSON.toJSONString(messageQueuesOffsetMap), JSON.toJSONString(entry.getKey()));
+ log.warn("The consumer may have load balancing, and the current task does not process the message queue,messageQueuesOffsetMap {}, messageQueue {}", JSON.toJSONString(messageQueuesOffsetMap), JSON.toJSONString(entry.getKey()));
}
try {
consumer.updateConsumeOffset(entry.getKey(), pullResult.getNextBeginOffset());
@@ -570,7 +570,7 @@ public class WorkerSinkTask implements WorkerTask {
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (MQ_SYS_KEYS.contains(entry.getKey())) {
keyValue.put("MQ-SYS-" + entry.getKey(), entry.getValue());
- } else if (entry.getKey().startsWith("connect-ext-")){
+ } else if (entry.getKey().startsWith("connect-ext-")) {
keyValue.put(entry.getKey().replaceAll("connect-ext-", ""), entry.getValue());
} else {
keyValue.put(entry.getKey(), entry.getValue());