You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/04 07:52:35 UTC

[incubator-inlong] branch master updated: [INLONG-2894][Agent] Adapt the interface and field modification of the manager module (#2897)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d918248  [INLONG-2894][Agent] Adapt the interface and field modification of the manager module (#2897)
d918248 is described below

commit d918248eb9a89c25b0e31b32a456f4dec2168807
Author: healchow <he...@gmail.com>
AuthorDate: Fri Mar 4 15:52:28 2022 +0800

    [INLONG-2894][Agent] Adapt the interface and field modification of the manager module (#2897)
---
 .../apache/inlong/agent/constant/FetcherConstants.java    |  5 ++++-
 .../java/org/apache/inlong/agent/pojo/JobProfileDto.java  |  4 ++--
 .../agent/plugin/fetcher/ManagerResultFormatter.java      |  5 ++---
 .../org/apache/inlong/common/pojo/agent/DataConfig.java   |  2 +-
 .../manager/service/core/impl/AgentServiceImpl.java       | 15 ++++++++-------
 5 files changed, 17 insertions(+), 14 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
index 639cd13..216a7ff 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
@@ -17,6 +17,9 @@
 
 package org.apache.inlong.agent.constant;
 
+/**
+ * Constants of job fetcher.
+ */
 public class FetcherConstants {
 
     public static final String AGENT_FETCHER_INTERVAL = "agent.fetcher.interval";
@@ -37,7 +40,7 @@ public class FetcherConstants {
     public static final String DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH = "/api/inlong/manager/openapi";
 
     public static final String AGENT_MANAGER_TASK_HTTP_PATH = "agent.manager.task.http.path";
-    public static final String DEFAULT_AGENT_MANAGER_TASK_HTTP_PATH = "/agent/getTask";
+    public static final String DEFAULT_AGENT_MANAGER_TASK_HTTP_PATH = "/agent/reportAndGetTask";
 
     public static final String AGENT_MANAGER_IP_CHECK_HTTP_PATH = "agent.manager.vip.http.checkIP.path";
     public static final String DEFAULT_AGENT_TDM_IP_CHECK_HTTP_PATH = "/fileAgent/confirmAgentIp";
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 9148a3a..1441c1a 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -169,8 +169,8 @@ public class JobProfileDto {
         profileDto.setProxy(proxy);
         Job job = new Job();
 
-        //common Attribu
-        job.setId(String.valueOf(dataConfigs.getJobId()));
+        // common attribute
+        job.setId(String.valueOf(dataConfigs.getTaskId()));
         job.setChannel(DEFAULT_CHANNEL);
         job.setIp(dataConfigs.getIp());
         job.setOp(dataConfigs.getOp());
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
index d52aaf2..acb03ae 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
@@ -46,11 +46,10 @@ public class ManagerResultFormatter {
      */
     public static JsonObject getResultData(String jsonStr) {
         JsonObject object = GSON.fromJson(jsonStr, JsonObject.class);
-        if (object == null || !object.has(RESULT_CODE)
-                || !object.has(RESULT_DATA)
+        if (object == null || !object.has(RESULT_CODE) || !object.has(RESULT_DATA)
                 || !SUCCESS_CODE.equals(object.get(RESULT_CODE).getAsString())) {
             throw new IllegalArgumentException("cannot get result data,"
-                    + " please check manager status, return str is" + jsonStr);
+                    + " please check manager status, return str is " + jsonStr);
 
         }
         return object;
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
index b481987..4275f41 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
@@ -33,7 +33,7 @@ public class DataConfig {
     private String inlongGroupId;
     private String inlongStreamId;
     private String op;
-    private Integer jobId;
+    private Integer taskId;
     private Integer taskType;
     private String taskName;
     private String snapshot;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index b757433..fd2b8ae 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -83,7 +83,7 @@ public class AgentServiceImpl implements AgentService {
     @Autowired
     private InlongStreamFieldEntityMapper streamFieldMapper;
     @Autowired
-    private InlongStreamEntityMapper inlongStreamMapper;
+    private InlongStreamEntityMapper streamMapper;
 
     /**
      * If the reported task time and the modification time in the database exceed this value,
@@ -121,20 +121,21 @@ public class AgentServiceImpl implements AgentService {
         List<StreamSourceEntity> entityList = sourceMapper.selectByIpAndUuid(agentIp, uuid);
         for (StreamSourceEntity entity : entityList) {
             DataConfig dataConfig = new DataConfig();
-            dataConfig.setJobId(entity.getId());
+            dataConfig.setTaskId(entity.getId());
             SourceType sourceType = SourceType.forType(entity.getSourceType());
             dataConfig.setTaskType(sourceType.getTaskType().getType());
             dataConfig.setTaskName(entity.getSourceName());
             dataConfig.setOp(String.valueOf(entity.getStatus() % 100));
-            String inlongGroupId = entity.getInlongGroupId();
-            String inlongStreamId = entity.getInlongStreamId();
-            dataConfig.setInlongGroupId(inlongGroupId);
-            dataConfig.setInlongStreamId(inlongStreamId);
             dataConfig.setIp(entity.getAgentIp());
             dataConfig.setUuid(entity.getUuid());
             dataConfig.setExtParams(entity.getExtParams());
             dataConfig.setSnapshot(entity.getSnapshot());
-            InlongStreamEntity inlongStreamEntity = inlongStreamMapper.selectByIdentifier(inlongGroupId,inlongStreamId);
+
+            String groupId = entity.getInlongGroupId();
+            String streamId = entity.getInlongStreamId();
+            dataConfig.setInlongGroupId(groupId);
+            dataConfig.setInlongStreamId(streamId);
+            InlongStreamEntity inlongStreamEntity = streamMapper.selectByIdentifier(groupId, streamId);
             dataConfig.setSyncSend(inlongStreamEntity.getSyncSend());
             dataConfigs.add(dataConfig);
         }