You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/12/17 11:38:14 UTC
[incubator-inlong] branch master updated: [INLONG-2023][Bug] Agent stream id is not passed to proxy (#2025)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e8892dd [INLONG-2023][Bug] Agent stream id is not passed to proxy (#2025)
e8892dd is described below
commit e8892ddb8fea2d88e691aedebf363128455eadb4
Author: ziruipeng <zp...@connect.ust.hk>
AuthorDate: Fri Dec 17 19:38:01 2021 +0800
[INLONG-2023][Bug] Agent stream id is not passed to proxy (#2025)
---
.../java/org/apache/inlong/agent/plugin/sinks/ProxySink.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index 9f94523..6ec9906 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -108,7 +108,8 @@ public class ProxySink extends AbstractSink {
@Override
public void write(Message message) {
if (message != null) {
- message.getHeader().put(CommonConstants.PROXY_KEY_GROUP_ID, inlongStreamId);
+ message.getHeader().put(CommonConstants.PROXY_KEY_GROUP_ID, inlongGroupId);
+ message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, inlongStreamId);
extractStreamFromMessage(message, fieldSplitter);
if (!(message instanceof EndMessage)) {
ProxyMessage proxyMessage = ProxyMessage.parse(message);
@@ -160,16 +161,16 @@ public class ProxySink extends AbstractSink {
*/
private Runnable flushCache() {
return () -> {
- LOGGER.info("start flush cache thread for {} ProxySink", inlongStreamId);
+ LOGGER.info("start flush cache thread for {} ProxySink", inlongGroupId);
while (!shutdown) {
try {
cache.forEach((s, packProxyMessage) -> {
Pair<String, List<byte[]>> result = packProxyMessage.fetchBatch();
if (result != null) {
- senderManager.sendBatch(jobInstanceId, inlongStreamId, result.getKey(),
+ senderManager.sendBatch(jobInstanceId, inlongGroupId, result.getKey(),
result.getValue(), 0, dataTime);
LOGGER.info("send group id {} with message size {}, the job id is {}, read file is {}"
- + "dataTime is {}", inlongStreamId, result.getRight().size(),
+ + "dataTime is {}", inlongGroupId, result.getRight().size(),
jobInstanceId, sourceFile, dataTime);
}
@@ -193,7 +194,6 @@ public class ProxySink extends AbstractSink {
batchFlushInterval = jobConf.getInt(PROXY_BATCH_FLUSH_INTERVAL,
DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
cache = new ConcurrentHashMap<>(10);
- inlongStreamId = jobConf.get(PROXY_INLONG_GROUP_ID);
dataTime = AgentUtils.timeStrConvertToMillSec(jobConf.get(JOB_DATA_TIME, ""),
jobConf.get(JOB_CYCLE_UNIT, ""));
inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID);