You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/04 07:52:35 UTC
[incubator-inlong] branch master updated: [INLONG-2894][Agent] Adapt the interface and field modification of the manager module (#2897)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d918248 [INLONG-2894][Agent] Adapt the interface and field modification of the manager module (#2897)
d918248 is described below
commit d918248eb9a89c25b0e31b32a456f4dec2168807
Author: healchow <he...@gmail.com>
AuthorDate: Fri Mar 4 15:52:28 2022 +0800
[INLONG-2894][Agent] Adapt the interface and field modification of the manager module (#2897)
---
.../apache/inlong/agent/constant/FetcherConstants.java | 5 ++++-
.../java/org/apache/inlong/agent/pojo/JobProfileDto.java | 4 ++--
.../agent/plugin/fetcher/ManagerResultFormatter.java | 5 ++---
.../org/apache/inlong/common/pojo/agent/DataConfig.java | 2 +-
.../manager/service/core/impl/AgentServiceImpl.java | 15 ++++++++-------
5 files changed, 17 insertions(+), 14 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
index 639cd13..216a7ff 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
@@ -17,6 +17,9 @@
package org.apache.inlong.agent.constant;
+/**
+ * Constants of job fetcher.
+ */
public class FetcherConstants {
public static final String AGENT_FETCHER_INTERVAL = "agent.fetcher.interval";
@@ -37,7 +40,7 @@ public class FetcherConstants {
public static final String DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH = "/api/inlong/manager/openapi";
public static final String AGENT_MANAGER_TASK_HTTP_PATH = "agent.manager.task.http.path";
- public static final String DEFAULT_AGENT_MANAGER_TASK_HTTP_PATH = "/agent/getTask";
+ public static final String DEFAULT_AGENT_MANAGER_TASK_HTTP_PATH = "/agent/reportAndGetTask";
public static final String AGENT_MANAGER_IP_CHECK_HTTP_PATH = "agent.manager.vip.http.checkIP.path";
public static final String DEFAULT_AGENT_TDM_IP_CHECK_HTTP_PATH = "/fileAgent/confirmAgentIp";
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 9148a3a..1441c1a 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -169,8 +169,8 @@ public class JobProfileDto {
profileDto.setProxy(proxy);
Job job = new Job();
- //common Attribu
- job.setId(String.valueOf(dataConfigs.getJobId()));
+ // common attribute
+ job.setId(String.valueOf(dataConfigs.getTaskId()));
job.setChannel(DEFAULT_CHANNEL);
job.setIp(dataConfigs.getIp());
job.setOp(dataConfigs.getOp());
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
index d52aaf2..acb03ae 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
@@ -46,11 +46,10 @@ public class ManagerResultFormatter {
*/
public static JsonObject getResultData(String jsonStr) {
JsonObject object = GSON.fromJson(jsonStr, JsonObject.class);
- if (object == null || !object.has(RESULT_CODE)
- || !object.has(RESULT_DATA)
+ if (object == null || !object.has(RESULT_CODE) || !object.has(RESULT_DATA)
|| !SUCCESS_CODE.equals(object.get(RESULT_CODE).getAsString())) {
throw new IllegalArgumentException("cannot get result data,"
- + " please check manager status, return str is" + jsonStr);
+ + " please check manager status, return str is " + jsonStr);
}
return object;
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
index b481987..4275f41 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
@@ -33,7 +33,7 @@ public class DataConfig {
private String inlongGroupId;
private String inlongStreamId;
private String op;
- private Integer jobId;
+ private Integer taskId;
private Integer taskType;
private String taskName;
private String snapshot;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index b757433..fd2b8ae 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -83,7 +83,7 @@ public class AgentServiceImpl implements AgentService {
@Autowired
private InlongStreamFieldEntityMapper streamFieldMapper;
@Autowired
- private InlongStreamEntityMapper inlongStreamMapper;
+ private InlongStreamEntityMapper streamMapper;
/**
* If the reported task time and the modification time in the database exceed this value,
@@ -121,20 +121,21 @@ public class AgentServiceImpl implements AgentService {
List<StreamSourceEntity> entityList = sourceMapper.selectByIpAndUuid(agentIp, uuid);
for (StreamSourceEntity entity : entityList) {
DataConfig dataConfig = new DataConfig();
- dataConfig.setJobId(entity.getId());
+ dataConfig.setTaskId(entity.getId());
SourceType sourceType = SourceType.forType(entity.getSourceType());
dataConfig.setTaskType(sourceType.getTaskType().getType());
dataConfig.setTaskName(entity.getSourceName());
dataConfig.setOp(String.valueOf(entity.getStatus() % 100));
- String inlongGroupId = entity.getInlongGroupId();
- String inlongStreamId = entity.getInlongStreamId();
- dataConfig.setInlongGroupId(inlongGroupId);
- dataConfig.setInlongStreamId(inlongStreamId);
dataConfig.setIp(entity.getAgentIp());
dataConfig.setUuid(entity.getUuid());
dataConfig.setExtParams(entity.getExtParams());
dataConfig.setSnapshot(entity.getSnapshot());
- InlongStreamEntity inlongStreamEntity = inlongStreamMapper.selectByIdentifier(inlongGroupId,inlongStreamId);
+
+ String groupId = entity.getInlongGroupId();
+ String streamId = entity.getInlongStreamId();
+ dataConfig.setInlongGroupId(groupId);
+ dataConfig.setInlongStreamId(streamId);
+ InlongStreamEntity inlongStreamEntity = streamMapper.selectByIdentifier(groupId, streamId);
dataConfig.setSyncSend(inlongStreamEntity.getSyncSend());
dataConfigs.add(dataConfig);
}