You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/12/17 11:38:14 UTC

[incubator-inlong] branch master updated: [INLONG-2023][Bug] Agent stream id is not passed to proxy (#2025)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e8892dd  [INLONG-2023][Bug]  Agent stream id is not passed to proxy (#2025)
e8892dd is described below

commit e8892ddb8fea2d88e691aedebf363128455eadb4
Author: ziruipeng <zp...@connect.ust.hk>
AuthorDate: Fri Dec 17 19:38:01 2021 +0800

    [INLONG-2023][Bug]  Agent stream id is not passed to proxy (#2025)
---
 .../java/org/apache/inlong/agent/plugin/sinks/ProxySink.java   | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index 9f94523..6ec9906 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -108,7 +108,8 @@ public class ProxySink extends AbstractSink {
     @Override
     public void write(Message message) {
         if (message != null) {
-            message.getHeader().put(CommonConstants.PROXY_KEY_GROUP_ID, inlongStreamId);
+            message.getHeader().put(CommonConstants.PROXY_KEY_GROUP_ID, inlongGroupId);
+            message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, inlongStreamId);
             extractStreamFromMessage(message, fieldSplitter);
             if (!(message instanceof EndMessage)) {
                 ProxyMessage proxyMessage = ProxyMessage.parse(message);
@@ -160,16 +161,16 @@ public class ProxySink extends AbstractSink {
      */
     private Runnable flushCache() {
         return () -> {
-            LOGGER.info("start flush cache thread for {} ProxySink", inlongStreamId);
+            LOGGER.info("start flush cache thread for {} ProxySink", inlongGroupId);
             while (!shutdown) {
                 try {
                     cache.forEach((s, packProxyMessage) -> {
                         Pair<String, List<byte[]>> result = packProxyMessage.fetchBatch();
                         if (result != null) {
-                            senderManager.sendBatch(jobInstanceId, inlongStreamId, result.getKey(),
+                            senderManager.sendBatch(jobInstanceId, inlongGroupId, result.getKey(),
                                     result.getValue(), 0, dataTime);
                             LOGGER.info("send group id {} with message size {}, the job id is {}, read file is {}"
-                                    + "dataTime is {}", inlongStreamId, result.getRight().size(),
+                                    + "dataTime is {}", inlongGroupId, result.getRight().size(),
                                 jobInstanceId, sourceFile, dataTime);
                         }
 
@@ -193,7 +194,6 @@ public class ProxySink extends AbstractSink {
         batchFlushInterval = jobConf.getInt(PROXY_BATCH_FLUSH_INTERVAL,
             DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
         cache = new ConcurrentHashMap<>(10);
-        inlongStreamId = jobConf.get(PROXY_INLONG_GROUP_ID);
         dataTime = AgentUtils.timeStrConvertToMillSec(jobConf.get(JOB_DATA_TIME, ""),
             jobConf.get(JOB_CYCLE_UNIT, ""));
         inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID);