You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/04/02 08:37:45 UTC

[rocketmq-connect] branch master updated: Fix chinese character issue. (#46)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new d4a8157  Fix chinese character issue. (#46)
d4a8157 is described below

commit d4a8157a893ef2adddcd3975ca70e52800c1f957
Author: sunxi92 <su...@163.com>
AuthorDate: Sat Apr 2 16:37:41 2022 +0800

    Fix chinese character issue. (#46)
---
 .../connect/runtime/connectorwrapper/WorkerSinkTask.java | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)

diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
index 7b1721b..dd6b8bc 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
@@ -230,7 +230,7 @@ public class WorkerSinkTask implements WorkerTask {
                     throw e;
                 } catch (Throwable e) {
                     state.set(WorkerTaskState.ERROR);
-                    log.error(" sink task {},pull message MQClientException, Error {} ", this, e.getMessage(), e);
+                    log.error(" sink task {},pull message MQClientException, Error {} ", this, e.getMessage(), e);
                     connectStatsManager.incSinkRecordPutTotalFailNums();
                     connectStatsManager.incSinkRecordPutFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
                 }
@@ -373,7 +373,7 @@ public class WorkerSinkTask implements WorkerTask {
                 pullMsgErrorCount = 0;
             } catch (MQClientException e) {
                 pullMsgErrorCount++;
-                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message MQClientException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message MQClientException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
                 connectStatsManager.incSinkRecordReadTotalFailNums();
                 connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
                 long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
@@ -381,7 +381,7 @@ public class WorkerSinkTask implements WorkerTask {
                 connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT);
             } catch (RemotingException e) {
                 pullMsgErrorCount++;
-                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message RemotingException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message RemotingException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
                 connectStatsManager.incSinkRecordReadTotalFailNums();
                 connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
                 long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
@@ -389,7 +389,7 @@ public class WorkerSinkTask implements WorkerTask {
                 connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT);
             } catch (MQBrokerException e) {
                 pullMsgErrorCount++;
-                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message MQBrokerException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message MQBrokerException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
                 connectStatsManager.incSinkRecordReadTotalFailNums();
                 connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
                 long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
@@ -397,7 +397,7 @@ public class WorkerSinkTask implements WorkerTask {
                 connectStatsManager.incSinkRecordReadFailRT(taskConfig.getString(RuntimeConfigDefine.TASK_ID), errorPullRT);
             } catch (InterruptedException e) {
                 pullMsgErrorCount++;
-                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message InterruptedException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
+                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message InterruptedException, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), this.state.get(), e);
                 connectStatsManager.incSinkRecordReadTotalFailNums();
                 connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
                 long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
@@ -406,7 +406,7 @@ public class WorkerSinkTask implements WorkerTask {
                 throw e;
             } catch (Throwable e) {
                 pullMsgErrorCount++;
-                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message Throwable, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), e);
+                log.error(" sink task message queue {}, offset {}, taskconfig {},pull message Throwable, Error {}, taskState {}", JSON.toJSONString(entry.getKey()), JSON.toJSONString(entry.getValue()), JSON.toJSONString(taskConfig), e.getMessage(), e);
                 connectStatsManager.incSinkRecordReadTotalFailNums();
                 connectStatsManager.incSinkRecordReadFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
                 long errorPullRT = System.currentTimeMillis() - beginPullMsgTimestamp;
@@ -430,7 +430,7 @@ public class WorkerSinkTask implements WorkerTask {
                 if (messageQueuesOffsetMap.containsKey(entry.getKey())) {
                     messageQueuesOffsetMap.put(entry.getKey(), pullResult.getNextBeginOffset());
                 } else {
-                    log.warn("The consumer may have load balancing, and the current task does not process the message queue,messageQueuesOffsetMap {}, messageQueue {}", JSON.toJSONString(messageQueuesOffsetMap), JSON.toJSONString(entry.getKey()));
+                    log.warn("The consumer may have load balancing, and the current task does not process the message queue,messageQueuesOffsetMap {}, messageQueue {}", JSON.toJSONString(messageQueuesOffsetMap), JSON.toJSONString(entry.getKey()));
                 }
                 try {
                     consumer.updateConsumeOffset(entry.getKey(), pullResult.getNextBeginOffset());
@@ -570,7 +570,7 @@ public class WorkerSinkTask implements WorkerTask {
                 for (Map.Entry<String, String> entry : properties.entrySet()) {
                     if (MQ_SYS_KEYS.contains(entry.getKey())) {
                         keyValue.put("MQ-SYS-" + entry.getKey(), entry.getValue());
-                    } else if (entry.getKey().startsWith("connect-ext-")){
+                    } else if (entry.getKey().startsWith("connect-ext-")) {
                         keyValue.put(entry.getKey().replaceAll("connect-ext-", ""), entry.getValue());
                     } else {
                         keyValue.put(entry.getKey(), entry.getValue());