You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/06 12:02:10 UTC

[incubator-inlong] branch master updated: [INLONG-2946][Agent] Agent change async send param to boolean (#2947)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 11eda4a  [INLONG-2946][Agent] Agent change async send param to boolean (#2947)
11eda4a is described below

commit 11eda4a6e6bacdb2e3394301a6d1813e4e01e759
Author: ziruipeng <zp...@connect.ust.hk>
AuthorDate: Sun Mar 6 20:02:05 2022 +0800

    [INLONG-2946][Agent] Agent change async send param to boolean (#2947)
---
 .../main/java/org/apache/inlong/agent/constant/JobConstants.java    | 6 ++++++
 .../src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java   | 5 +++--
 .../main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java   | 4 ++--
 3 files changed, 11 insertions(+), 4 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index 1cc2ca0..146dd06 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -129,4 +129,10 @@ public class JobConstants extends CommonConstants {
      */
     public static final String JOB_KAFKA_PARTITION_OFFSET_DELIMITER = "#";
 
+    /**
+     * sync send data when sending to DataProxy
+     */
+    public static final int SYNC_SEND_OPEN = 1;
+
+
 }
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 2b02ab2..c53206f 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.pojo;
 import static java.util.Objects.requireNonNull;
 import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
 import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
+import static org.apache.inlong.agent.constant.JobConstants.SYNC_SEND_OPEN;
 
 import com.google.gson.Gson;
 import lombok.Data;
@@ -152,7 +153,7 @@ public class JobProfileDto {
         proxy.setInlongStreamId(dataConfigs.getInlongStreamId());
         proxy.setManager(manager);
         if (null != dataConfigs.getSyncSend()) {
-            proxy.setSync(dataConfigs.getSyncSend());
+            proxy.setSync(dataConfigs.getSyncSend() == SYNC_SEND_OPEN);
         }
         return proxy;
     }
@@ -234,7 +235,7 @@ public class JobProfileDto {
         private String inlongGroupId;
         private String inlongStreamId;
         private Manager manager;
-        private Integer sync;
+        private Boolean sync;
     }
 
 }
\ No newline at end of file
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index fc8eb2f..f974957 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -167,8 +167,8 @@ public class ProxySink extends AbstractSink {
                                     result.getValue(), 0, sendTime);
                             }
                             LOGGER.info("send group id {} with message size {}, the job id is {}, read source is {}"
-                                            + "sendTime is {}", inlongGroupId, result.getRight().size(),
-                                    jobInstanceId, sourceName, sendTime);
+                                            + "sendTime is {} syncSend {}", inlongGroupId, result.getRight().size(),
+                                    jobInstanceId, sourceName, sendTime, syncSend);
                         }
 
                     });