You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/06 12:02:10 UTC
[incubator-inlong] branch master updated: [INLONG-2946][Agent] Agent change async send param to boolean (#2947)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 11eda4a [INLONG-2946][Agent] Agent change async send param to boolean (#2947)
11eda4a is described below
commit 11eda4a6e6bacdb2e3394301a6d1813e4e01e759
Author: ziruipeng <zp...@connect.ust.hk>
AuthorDate: Sun Mar 6 20:02:05 2022 +0800
[INLONG-2946][Agent] Agent change async send param to boolean (#2947)
---
.../main/java/org/apache/inlong/agent/constant/JobConstants.java | 6 ++++++
.../src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java | 5 +++--
.../main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java | 4 ++--
3 files changed, 11 insertions(+), 4 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index 1cc2ca0..146dd06 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -129,4 +129,10 @@ public class JobConstants extends CommonConstants {
*/
public static final String JOB_KAFKA_PARTITION_OFFSET_DELIMITER = "#";
+ /**
+ * sync send data when sending to DataProxy
+ */
+ public static final int SYNC_SEND_OPEN = 1;
+
+
}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 2b02ab2..c53206f 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.pojo;
import static java.util.Objects.requireNonNull;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
+import static org.apache.inlong.agent.constant.JobConstants.SYNC_SEND_OPEN;
import com.google.gson.Gson;
import lombok.Data;
@@ -152,7 +153,7 @@ public class JobProfileDto {
proxy.setInlongStreamId(dataConfigs.getInlongStreamId());
proxy.setManager(manager);
if (null != dataConfigs.getSyncSend()) {
- proxy.setSync(dataConfigs.getSyncSend());
+ proxy.setSync(dataConfigs.getSyncSend() == SYNC_SEND_OPEN);
}
return proxy;
}
@@ -234,7 +235,7 @@ public class JobProfileDto {
private String inlongGroupId;
private String inlongStreamId;
private Manager manager;
- private Integer sync;
+ private Boolean sync;
}
}
\ No newline at end of file
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index fc8eb2f..f974957 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -167,8 +167,8 @@ public class ProxySink extends AbstractSink {
result.getValue(), 0, sendTime);
}
LOGGER.info("send group id {} with message size {}, the job id is {}, read source is {}"
- + "sendTime is {}", inlongGroupId, result.getRight().size(),
- jobInstanceId, sourceName, sendTime);
+ + "sendTime is {} syncSend {}", inlongGroupId, result.getRight().size(),
+ jobInstanceId, sourceName, sendTime, syncSend);
}
});