You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/03 13:09:45 UTC
[incubator-inlong] branch master updated: [INLONG-2867][Manager] Support report the task result and get tasks for the agent (#2868)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 859480a [INLONG-2867][Manager] Support report the task result and get tasks for the agent (#2868)
859480a is described below
commit 859480a8b97e15b9f376415ed5719350dbb2632c
Author: healchow <he...@gmail.com>
AuthorDate: Thu Mar 3 21:09:41 2022 +0800
[INLONG-2867][Manager] Support report the task result and get tasks for the agent (#2868)
---
.../apache/inlong/agent/conf/TriggerProfile.java | 15 ++-
.../java/org/apache/inlong/agent/db/CommandDb.java | 25 ++--
.../apache/inlong/agent/pojo/JobProfileDto.java | 8 +-
.../apache/inlong/agent/db/TestBerkeleyDBImp.java | 9 +-
.../CmdConfig.java => constant/Constants.java} | 17 +--
.../org/apache/inlong/common/db/CommandEntity.java | 10 +-
.../apache/inlong/common/pojo/agent/CmdConfig.java | 5 +
.../inlong/common/pojo/agent/DataConfig.java | 13 +-
.../inlong/common/pojo/agent/TaskRequest.java | 5 +
.../inlong/common/pojo/agent/TaskResult.java | 3 +
.../api/impl/DefaultInlongStreamBuilder.java | 2 +-
.../client/api/source/MySQLBinlogSource.java | 2 +-
.../manager/client/api/source/MySQLSource.java | 4 +-
.../manager/client/api/util/InlongParser.java | 23 ++--
.../api/util/InlongStreamSourceTransfer.java | 4 +-
.../inlong/manager/common/enums/Constant.java | 4 +-
.../inlong/manager/common/enums/SourceType.java | 7 +-
.../common/pojo/agent/FileAgentCommandInfo.java | 8 +-
.../pojo/source/binlog/BinlogSourceRequest.java | 4 +-
.../pojo/source/binlog/BinlogSourceResponse.java | 2 +-
.../dao/mapper/StreamSourceEntityMapper.java | 18 +--
.../resources/mappers/StreamSourceEntityMapper.xml | 57 +++-----
.../{AgentTaskService.java => AgentService.java} | 25 +++-
...tTaskServiceImpl.java => AgentServiceImpl.java} | 150 +++++++++++++++------
.../service/source/SourceSnapshotOperation.java | 2 +-
.../service/source/StreamSourceService.java | 9 --
.../service/source/StreamSourceServiceImpl.java | 16 +--
.../source/binlog/BinlogStreamSourceOperation.java | 4 +-
.../listener/AbstractSourceOperateListener.java | 2 +-
.../thirdparty/sort/util/SerializationUtils.java | 2 +-
.../inlong/manager/service/ServiceBaseTest.java | 4 +
.../AgentServiceTest.java} | 55 ++------
.../core/source/StreamSourceServiceTest.java | 40 +-----
.../source/listener/DataSourceListenerTest.java | 4 +-
.../web/controller/openapi/AgentController.java | 32 +++--
35 files changed, 306 insertions(+), 284 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TriggerProfile.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TriggerProfile.java
index 6d1aa79..82bc73b 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TriggerProfile.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TriggerProfile.java
@@ -21,11 +21,19 @@ import org.apache.inlong.agent.constant.JobConstants;
import org.apache.inlong.agent.pojo.JobProfileDto;
import org.apache.inlong.common.pojo.agent.DataConfig;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Date;
+
/**
* profile used in trigger. Trigger profile is a special job profile
*/
public class TriggerProfile extends JobProfile {
+ private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
/**
* Parse a given json string and get a TriggerProfile
*/
@@ -67,8 +75,11 @@ public class TriggerProfile extends JobProfile {
return getInt(JobConstants.JOB_OP);
}
- public String getDeliveryTime() {
- return get(JobConstants.JOB_DELIVERY_TIME);
+ public Date getDeliveryTime() {
+ String dateStr = get(JobConstants.JOB_DELIVERY_TIME);
+ LocalDateTime localDateTime = LocalDateTime.parse(dateStr, TIME_FORMATTER);
+ Instant instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant();
+ return Date.from(instant);
}
}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java
index fedeeb5..66d0697 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java
@@ -17,17 +17,17 @@
package org.apache.inlong.agent.db;
-import java.util.List;
import org.apache.inlong.agent.conf.TriggerProfile;
+import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.db.CommandEntity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.List;
+
+/**
+ * Command for database
+ */
public class CommandDb {
- private static final Logger LOGGER = LoggerFactory.getLogger(CommandDb.class);
- public static final int MANAGER_SUCCESS_CODE = 0;
- public static final int MANAGER_FAIL_CODE = 1;
private final Db db;
public CommandDb(Db db) {
@@ -36,7 +36,6 @@ public class CommandDb {
/**
* store manager command to db
- * @param commandEntity
*/
public void storeCommand(CommandEntity commandEntity) {
db.putCommand(commandEntity);
@@ -44,7 +43,6 @@ public class CommandDb {
/**
* get those commands not ack to manager
- * @return
*/
public List<CommandEntity> getUnackedCommands() {
return db.searchCommands(false);
@@ -52,31 +50,26 @@ public class CommandDb {
/**
* save normal command result for trigger
- * @param profile
- * @param success
*/
public void saveNormalCmds(TriggerProfile profile, boolean success) {
CommandEntity entity = new CommandEntity();
entity.setId(CommandEntity.generateCommandId(profile.getTriggerId(), profile.getOpType()));
- entity.setTaskId(Integer.valueOf(profile.getTriggerId()));
+ entity.setTaskId(Integer.parseInt(profile.getTriggerId()));
entity.setDeliveryTime(profile.getDeliveryTime());
- entity.setCommandResult(success ? MANAGER_SUCCESS_CODE : MANAGER_FAIL_CODE);
+ entity.setCommandResult(success ? Constants.RESULT_SUCCESS : Constants.RESULT_FAIL);
entity.setAcked(false);
storeCommand(entity);
}
/**
* save special command result for trigger (retry\makeup\check)
- * @param id
- * @param taskId
- * @param success
*/
public void saveSpecialCmds(Integer id, Integer taskId, boolean success) {
CommandEntity entity = new CommandEntity();
entity.setId(String.valueOf(id));
entity.setTaskId(taskId);
entity.setAcked(false);
- entity.setCommandResult(success ? MANAGER_SUCCESS_CODE : MANAGER_FAIL_CODE);
+ entity.setCommandResult(success ? Constants.RESULT_SUCCESS : Constants.RESULT_FAIL);
storeCommand(entity);
}
}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 6393732..4ce2d90 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -17,13 +17,16 @@
package org.apache.inlong.agent.pojo;
+import com.fasterxml.jackson.annotation.JsonFormat;
import com.google.gson.Gson;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.TriggerProfile;
-import org.apache.inlong.common.pojo.agent.DataConfig;
import org.apache.inlong.common.enums.TaskTypeEnum;
+import org.apache.inlong.common.pojo.agent.DataConfig;
+
+import java.util.Date;
import static java.util.Objects.requireNonNull;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
@@ -213,7 +216,8 @@ public class JobProfileDto {
private String name;
private String op;
private String retryTime;
- private String deliveryTime;
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date deliveryTime;
private String uuid;
private FileJob fileJob;
diff --git a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java
index 7d72113..ebecf00 100755
--- a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java
+++ b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java
@@ -17,8 +17,6 @@
package org.apache.inlong.agent.db;
-import java.util.List;
-
import org.apache.inlong.agent.AgentBaseTestsHelper;
import org.apache.inlong.common.db.CommandEntity;
import org.junit.AfterClass;
@@ -26,6 +24,9 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.util.Date;
+import java.util.List;
+
public class TestBerkeleyDBImp {
private static BerkeleyDbImp db;
@@ -38,7 +39,7 @@ public class TestBerkeleyDBImp {
}
@AfterClass
- public static void teardown() throws Exception {
+ public static void teardown() {
db.close();
helper.teardownAgentHome();
}
@@ -80,7 +81,7 @@ public class TestBerkeleyDBImp {
@Test
public void testCommandDb() {
- CommandEntity commandEntity = new CommandEntity("1", 0, false, 1, "");
+ CommandEntity commandEntity = new CommandEntity("1", 0, false, 1, new Date());
db.putCommand(commandEntity);
CommandEntity command = db.getCommand("1");
Assert.assertEquals("1", command.getId());
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/CmdConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/constant/Constants.java
similarity index 79%
copy from inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/CmdConfig.java
copy to inlong-common/src/main/java/org/apache/inlong/common/constant/Constants.java
index 4f2d0d3..0c2714a 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/CmdConfig.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/constant/Constants.java
@@ -15,14 +15,15 @@
* limitations under the License.
*/
-package org.apache.inlong.common.pojo.agent;
+package org.apache.inlong.common.constant;
-import lombok.Data;
+/**
+ * The constants class
+ */
+public class Constants {
+
+ public static final int RESULT_SUCCESS = 0;
+
+ public static final int RESULT_FAIL = 1;
-@Data
-public class CmdConfig {
- private String dataTime;
- private Integer id;
- private Integer op;
- private Integer taskId;
}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/db/CommandEntity.java b/inlong-common/src/main/java/org/apache/inlong/common/db/CommandEntity.java
index 2ee676a..f9cf516 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/db/CommandEntity.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/db/CommandEntity.java
@@ -17,6 +17,7 @@
package org.apache.inlong.common.db;
+import com.fasterxml.jackson.annotation.JsonFormat;
import com.sleepycat.persist.model.Entity;
import com.sleepycat.persist.model.PrimaryKey;
import com.sleepycat.persist.model.Relationship;
@@ -25,6 +26,8 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
+import java.util.Date;
+
@Entity(version = 1)
@Data
@AllArgsConstructor
@@ -37,7 +40,12 @@ public class CommandEntity {
@SecondaryKey(relate = Relationship.MANY_TO_ONE)
private boolean isAcked;
private Integer taskId;
- private String deliveryTime;
+
+ /**
+ * The task delivery time
+ */
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date deliveryTime;
public static String generateCommandId(String taskId, int opType) {
return taskId + opType;
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/CmdConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/CmdConfig.java
index 4f2d0d3..d29b350 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/CmdConfig.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/CmdConfig.java
@@ -19,10 +19,15 @@ package org.apache.inlong.common.pojo.agent;
import lombok.Data;
+/**
+ * The specifically command config for agent.
+ */
@Data
public class CmdConfig {
+
private String dataTime;
private Integer id;
private Integer op;
private Integer taskId;
+
}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
index aed1252..928e8ca 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
@@ -17,22 +17,29 @@
package org.apache.inlong.common.pojo.agent;
+import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
+import java.util.Date;
+
+/**
+ * The task config for agent.
+ */
@Data
public class DataConfig {
+ private String ip;
+ private String uuid;
private String inlongGroupId;
private String inlongStreamId;
- private String deliveryTime;
- private String uuid;
- private String ip;
private String op;
private Integer jobId;
private Integer taskType;
private String snapshot;
private String syncSend;
private String extParams;
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date deliveryTime;
public boolean isValid() {
return true;
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskRequest.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskRequest.java
index cc86605..fdc994a 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskRequest.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskRequest.java
@@ -23,11 +23,16 @@ import org.apache.inlong.common.db.CommandEntity;
import java.util.ArrayList;
import java.util.List;
+/**
+ * Request task from agent to manager.
+ */
@Data
public class TaskRequest {
private String agentIp;
+
private String uuid;
+
private List<CommandEntity> commandInfo = new ArrayList<>();
}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
index a4b1e35..bb746a2 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
@@ -22,6 +22,9 @@ import lombok.Data;
import java.util.List;
+/**
+ * The task result pulled by the agent from the manager.
+ */
@Data
@Builder
public class TaskResult {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
index 98e9229..b84538a 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
@@ -137,7 +137,7 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
private int initOrUpdateSource(SourceRequest sourceRequest) {
String sourceType = sourceRequest.getSourceType();
- if (SourceType.KAFKA.name().equals(sourceType) || SourceType.DB_BINLOG.name().equals(sourceType)) {
+ if (SourceType.KAFKA.name().equals(sourceType) || SourceType.BINLOG.name().equals(sourceType)) {
List<SourceListResponse> responses = managerClient.listSources(sourceRequest.getInlongGroupId(),
sourceRequest.getInlongStreamId(), sourceRequest.getSourceType());
if (CollectionUtils.isEmpty(responses)) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
index 0b3743a..96944e2 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
@@ -34,7 +34,7 @@ import org.apache.inlong.manager.common.enums.SourceType;
public class MySQLBinlogSource extends StreamSource {
@ApiModelProperty(value = "DataSource type", required = true)
- private SourceType sourceType = SourceType.DB_BINLOG;
+ private SourceType sourceType = SourceType.BINLOG;
@ApiModelProperty("SyncType for MySQL")
private SyncType syncType;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java
index 2846154..d012a8f 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java
@@ -21,19 +21,21 @@ import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.client.api.DataFormat;
import org.apache.inlong.manager.client.api.StreamSource;
import org.apache.inlong.manager.common.enums.SourceType;
@Data
+@EqualsAndHashCode(callSuper = true)
@AllArgsConstructor
@NoArgsConstructor
@ApiModel("Base configuration for MySQL collection")
public class MySQLSource extends StreamSource {
@ApiModelProperty(value = "DataSource type", required = true)
- private SourceType sourceType = SourceType.DB_SQL;
+ private SourceType sourceType = SourceType.SQL;
@ApiModelProperty("SyncType for MySQL")
private SyncType syncType;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
index 6ccfdf4..c11264b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
@@ -41,7 +41,7 @@ import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
import java.util.List;
-import static org.apache.inlong.manager.common.enums.SourceType.DB_BINLOG;
+import static org.apache.inlong.manager.common.enums.SourceType.BINLOG;
import static org.apache.inlong.manager.common.enums.SourceType.KAFKA;
/**
@@ -57,8 +57,7 @@ public class InlongParser {
public static WorkflowResult parseWorkflowResult(Response response) {
Object data = response.getData();
String resultData = GsonUtil.toJson(data);
- WorkflowResult workflowResult = GsonUtil.fromJson(resultData, WorkflowResult.class);
- return workflowResult;
+ return GsonUtil.fromJson(resultData, WorkflowResult.class);
}
public static InlongGroupResponse parseGroupInfo(Response response) {
@@ -69,25 +68,22 @@ public class InlongParser {
public static PageInfo<InlongGroupListResponse> parseGroupList(Response response) {
Object data = response.getData();
String pageInfoJson = GsonUtil.toJson(data);
- PageInfo<InlongGroupListResponse> pageInfo = GsonUtil.fromJson(pageInfoJson,
+ return GsonUtil.fromJson(pageInfoJson,
new TypeToken<PageInfo<InlongGroupListResponse>>() {
}.getType());
- return pageInfo;
}
public static InlongStreamInfo parseStreamInfo(Response response) {
Object data = response.getData();
- InlongStreamInfo streamInfo = GsonUtil.fromJson(GsonUtil.toJson(data), InlongStreamInfo.class);
- return streamInfo;
+ return GsonUtil.fromJson(GsonUtil.toJson(data), InlongStreamInfo.class);
}
public static PageInfo<FullStreamResponse> parseStreamList(Response response) {
Object data = response.getData();
String pageInfoJson = GsonUtil.toJson(data);
- PageInfo<FullStreamResponse> pageInfo = GsonUtil.fromJson(pageInfoJson,
+ return GsonUtil.fromJson(pageInfoJson,
new TypeToken<PageInfo<FullStreamResponse>>() {
}.getType());
- return pageInfo;
}
public static PageInfo<SourceListResponse> parseSourceList(Response response) {
@@ -99,7 +95,7 @@ public class InlongParser {
if (pageInfo.getList() != null && !pageInfo.getList().isEmpty()) {
SourceListResponse sourceListResponse = pageInfo.getList().get(0);
SourceType sourceType = SourceType.forType(sourceListResponse.getSourceType());
- if (sourceType == DB_BINLOG) {
+ if (sourceType == BINLOG) {
return GsonUtil.fromJson(pageInfoJson,
new TypeToken<PageInfo<BinlogSourceListResponse>>() {
}.getType());
@@ -119,10 +115,9 @@ public class InlongParser {
public static PageInfo<SinkListResponse> parseSinkList(Response response) {
Object data = response.getData();
String pageInfoJson = GsonUtil.toJson(data);
- PageInfo<SinkListResponse> pageInfo = GsonUtil.fromJson(pageInfoJson,
+ return GsonUtil.fromJson(pageInfoJson,
new TypeToken<PageInfo<SinkListResponse>>() {
}.getType());
- return pageInfo;
}
public static Pair<InlongGroupApproveRequest, List<InlongStreamApproveRequest>> parseGroupForm(String formJson) {
@@ -156,9 +151,9 @@ public class InlongParser {
public static PageInfo<EventLogView> parseEventLogViewList(Response response) {
Object data = response.getData();
String pageInfoJson = GsonUtil.toJson(data);
- PageInfo<EventLogView> pageInfo = GsonUtil.fromJson(pageInfoJson,
+ return GsonUtil.fromJson(pageInfoJson,
new TypeToken<PageInfo<EventLogView>>() {
}.getType());
- return pageInfo;
}
+
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index f03309c..fda2efb 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -41,7 +41,7 @@ public class InlongStreamSourceTransfer {
switch (sourceType) {
case KAFKA:
return createKafkaSourceRequest((KafkaSource) streamSource, streamInfo);
- case DB_BINLOG:
+ case BINLOG:
return createBinlogSourceRequest((MySQLBinlogSource) streamSource, streamInfo);
default:
throw new RuntimeException(String.format("Unsupport source=%s for Inlong", sourceType));
@@ -54,7 +54,7 @@ public class InlongStreamSourceTransfer {
if (sourceType == SourceType.KAFKA && sourceListResponse instanceof KafkaSourceListResponse) {
return parseKafkaSource((KafkaSourceListResponse) sourceListResponse);
}
- if (sourceType == SourceType.DB_BINLOG && sourceListResponse instanceof BinlogSourceListResponse) {
+ if (sourceType == SourceType.BINLOG && sourceListResponse instanceof BinlogSourceListResponse) {
return parseMySQLBinlogSource((BinlogSourceListResponse) sourceListResponse);
}
throw new IllegalArgumentException(String.format("Unsupport source type : %s for Inlong", sourceType));
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
index c08ef0f..f950a93 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
@@ -26,9 +26,9 @@ public class Constant {
public static final String SOURCE_FILE = "FILE";
- public static final String SOURCE_DB_SQL = "DB_SQL";
+ public static final String SOURCE_SQL = "SQL";
- public static final String SOURCE_DB_BINLOG = "DB_BINLOG";
+ public static final String SOURCE_BINLOG = "BINLOG";
public static final String SOURCE_KAFKA = "KAFKA";
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
index 99ac6e2..93998e7 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
@@ -17,15 +17,16 @@
package org.apache.inlong.manager.common.enums;
-import java.util.Locale;
import lombok.Getter;
import org.apache.inlong.common.enums.TaskTypeEnum;
+import java.util.Locale;
+
public enum SourceType {
DATABASE_MIGRATION("DATABASE_MIGRATION",TaskTypeEnum.DATABASE_MIGRATION),
FILE("FILE", TaskTypeEnum.FILE),
- DB_SQL("DB_SQL", TaskTypeEnum.SQL),
- DB_BINLOG("DB_BINLOG", TaskTypeEnum.BINLOG),
+ SQL("SQL", TaskTypeEnum.SQL),
+ BINLOG("BINLOG", TaskTypeEnum.BINLOG),
KAFKA("KAFKA", TaskTypeEnum.KAFKA);
@Getter
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/agent/FileAgentCommandInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/agent/FileAgentCommandInfo.java
index f406ad3..815e842 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/agent/FileAgentCommandInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/agent/FileAgentCommandInfo.java
@@ -17,11 +17,14 @@
package org.apache.inlong.manager.common.pojo.agent;
+import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import java.util.List;
import lombok.Data;
+import java.util.Date;
+import java.util.List;
+
/**
* File agent operation result
*/
@@ -43,7 +46,8 @@ public class FileAgentCommandInfo {
private int commandResult;
@ApiModelProperty(value = "Command issuance time")
- private long deliveryTime;
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date deliveryTime;
@ApiModelProperty(value = "task id")
private int taskId;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
index 8c853f0..da9b968 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
@@ -34,11 +34,11 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@ApiModel(value = "Request of the binlog source info")
-@JsonTypeDefine(value = Constant.SOURCE_DB_BINLOG)
+@JsonTypeDefine(value = Constant.SOURCE_BINLOG)
public class BinlogSourceRequest extends SourceRequest {
public BinlogSourceRequest() {
- this.setSourceType(SourceType.DB_BINLOG.toString());
+ this.setSourceType(SourceType.BINLOG.toString());
}
@ApiModelProperty("Username of the DB server")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
index a6e2fa5..49388fe 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
@@ -34,7 +34,7 @@ import org.apache.inlong.manager.common.pojo.source.SourceResponse;
@ApiModel(value = "Response of the binlog source")
public class BinlogSourceResponse extends SourceResponse {
- private String sourceType = Constant.SOURCE_DB_BINLOG;
+ private String sourceType = Constant.SOURCE_BINLOG;
@ApiModelProperty("Username of the DB server")
private String user;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 725dd2a..cb49a05 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -18,7 +18,6 @@
package org.apache.inlong.manager.dao.mapper;
import org.apache.ibatis.annotations.Param;
-import org.apache.inlong.common.pojo.agent.TaskRequest;
import org.apache.inlong.manager.common.pojo.source.SourcePageRequest;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.springframework.stereotype.Repository;
@@ -52,13 +51,13 @@ public interface StreamSourceEntityMapper {
List<StreamSourceEntity> selectByCondition(@Param("request") SourcePageRequest request);
/**
- * According to the inlong group id and inlong stream id, query valid source information
+ * Query valid source list by the given group id and stream id.
*
- * @param groupId inlong group id
- * @param streamId inlong stream id
- * @return Source entity list
+ * @param groupId Inlong group id.
+ * @param streamId Inlong stream id.
+ * @return Source entity list.
*/
- List<StreamSourceEntity> selectByIdentifier(@Param("groupId") String groupId, @Param("streamId") String streamId);
+ List<StreamSourceEntity> selectByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId);
/**
* According to the group id, stream id and source type, query valid source entity list.
@@ -72,6 +71,11 @@ public interface StreamSourceEntityMapper {
@Param("sourceType") String sourceType);
/**
+ * Query the source list by the given agent ip and agent uuid.
+ */
+ List<StreamSourceEntity> selectByIpAndUuid(@Param("agentIp") String agentIp, @Param("uuid") String uuid);
+
+ /**
* Get the distinct source type from the given groupId and streamId
*/
List<String> selectSourceType(@Param("groupId") String groupId, @Param("streamId") String streamId);
@@ -101,6 +105,4 @@ public interface StreamSourceEntityMapper {
int deleteByPrimaryKey(Integer id);
- List<StreamSourceEntity> selectAgentTaskDataConfig(TaskRequest taskRequest);
-
}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 6e58818..80b206f 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -42,18 +42,6 @@
<result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
<result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
</resultMap>
- <resultMap id="DataConfig" type="org.apache.inlong.common.pojo.agent.DataConfig">
- <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
- <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId"/>
- <result column="modify_time" jdbcType="VARCHAR" property="deliveryTime"/>
- <result column="uuid" jdbcType="VARCHAR" property="uuid"/>
- <result column="agent_ip" jdbcType="VARCHAR" property="ip"/>
- <result column="op" jdbcType="VARCHAR" property="op"/>
- <result column="id" jdbcType="VARCHAR" property="taskId"/>
- <result column="source_type" jdbcType="INTEGER" property="taskType"/>
- <result column="snapshot" jdbcType="LONGVARCHAR" property="snapshot"/>
- <result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
- </resultMap>
<sql id="Base_Column_List">
id, inlong_group_id, inlong_stream_id, source_type, agent_ip, uuid, server_id, server_name,
cluster_id, cluster_name, snapshot, report_time, ext_params, status, previous_status,
@@ -232,13 +220,13 @@
from stream_source
<where>
is_deleted = 0
- <if test="request.sourceType != null and request.sourceType != ''">
- and source_type = #{request.sourceType, jdbcType=VARCHAR}
- </if>
and inlong_group_id = #{request.inlongGroupId, jdbcType=VARCHAR}
<if test="request.inlongStreamId != null and request.inlongStreamId != ''">
and inlong_stream_id = #{request.inlongStreamId, jdbcType=VARCHAR}
</if>
+ <if test="request.sourceType != null and request.sourceType != ''">
+ and source_type = #{request.sourceType, jdbcType=VARCHAR}
+ </if>
<if test="request.keyWord != null and request.keyWord != ''">
and (
inlong_group_id like CONCAT('%', #{request.keyWord}, '%')
@@ -251,7 +239,7 @@
order by modify_time desc
</where>
</select>
- <select id="selectByIdentifier" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+ <select id="selectByRelatedId" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
select
<include refid="Base_Column_List"/>
from stream_source
@@ -278,6 +266,18 @@
</if>
</where>
</select>
+ <select id="selectByIpAndUuid" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from stream_source
+ <where>
+ is_deleted = 0
+ and agent_ip = #{agentIp, jdbcType=VARCHAR}
+ <if test="uuid != null and uuid != ''">
+ and uuid = #{uuid, jdbcType=VARCHAR}
+ </if>
+ </where>
+ </select>
<select id="selectSourceType" resultType="java.lang.String">
select distinct (source_type)
from stream_source
@@ -410,29 +410,4 @@
from stream_source
where id = #{id,jdbcType=INTEGER}
</delete>
-
- <select id="selectAgentTaskDataConfig" parameterType="org.apache.inlong.common.pojo.agent.TaskRequest"
- resultMap="DataConfig">
- select
- detail.inlong_group_id,
- detail.inlong_stream_id,
- detail.modify_time,
- detail.uuid,
- detail.agent_ip,
- detail.status,
- detail.id,
- detail.source_type,
- detail.snapshot,
- detail.ext_params
- from stream_source detail
- where detail.is_deleted = 0
- <if test="agentIp != null and agentIp != ''">
- and detail.agent_ip = #{agentIp, jdbcType=VARCHAR}
- </if>
- <if test="uuid != null and uuid != ''">
- and detail.uuid = #{uuid, jdbcType=VARCHAR}
- </if>
- and (floor(detail.STATUS / 100) = 2)
- </select>
-
</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentTaskService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
similarity index 75%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentTaskService.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
index dbdf36f..35130a6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentTaskService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
@@ -17,9 +17,9 @@
package org.apache.inlong.manager.service.core;
-import java.util.List;
import org.apache.inlong.common.pojo.agent.TaskRequest;
import org.apache.inlong.common.pojo.agent.TaskResult;
+import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
import org.apache.inlong.manager.common.pojo.agent.AgentStatusReportRequest;
import org.apache.inlong.manager.common.pojo.agent.CheckAgentTaskConfRequest;
import org.apache.inlong.manager.common.pojo.agent.ConfirmAgentIpRequest;
@@ -27,9 +27,28 @@ import org.apache.inlong.manager.common.pojo.agent.FileAgentCommandInfo;
import org.apache.inlong.manager.common.pojo.agent.FileAgentTaskConfig;
import org.apache.inlong.manager.common.pojo.agent.FileAgentTaskInfo;
-public interface AgentTaskService {
+import java.util.List;
- TaskResult getAgentTask(TaskRequest taskRequest);
+/**
+ * The service interface for agent
+ */
+public interface AgentService {
+
+ /**
+ * Report the heartbeat for given source.
+ *
+ * @param request Heartbeat request.
+ * @return Whether succeed.
+ */
+ Boolean reportSnapshot(TaskSnapshotRequest request);
+
+ /**
+ * Agent report the task result, and pull task config to operate.
+ *
+ * @param request Request of the task result.
+ * @return Task result.
+ */
+ TaskResult reportAndGetTask(TaskRequest request);
@Deprecated
FileAgentTaskInfo getFileAgentTask(FileAgentCommandInfo info);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentTaskServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
similarity index 77%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentTaskServiceImpl.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 02740a3..5b1993c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentTaskServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -19,20 +19,17 @@ package org.apache.inlong.manager.service.core.impl;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.common.constant.Constants;
+import org.apache.inlong.common.db.CommandEntity;
import org.apache.inlong.common.pojo.agent.CmdConfig;
import org.apache.inlong.common.pojo.agent.DataConfig;
import org.apache.inlong.common.pojo.agent.TaskRequest;
import org.apache.inlong.common.pojo.agent.TaskResult;
+import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.enums.FileAgentDataGenerateRule;
+import org.apache.inlong.manager.common.enums.SourceState;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.agent.AgentStatusReportRequest;
import org.apache.inlong.manager.common.pojo.agent.CheckAgentTaskConfRequest;
@@ -50,64 +47,135 @@ import org.apache.inlong.manager.dao.mapper.DataSourceCmdConfigEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper;
import org.apache.inlong.manager.dao.mapper.SourceFileDetailEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
-import org.apache.inlong.manager.service.core.AgentTaskService;
+import org.apache.inlong.manager.service.core.AgentService;
+import org.apache.inlong.manager.service.source.SourceSnapshotOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
@Service
-@Slf4j
-public class AgentTaskServiceImpl implements AgentTaskService {
+public class AgentServiceImpl implements AgentService {
- private static final Logger LOGGER = LoggerFactory.getLogger(AgentTaskServiceImpl.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(AgentServiceImpl.class);
+ private static final int UNISSUED_STATUS = 2;
+ private static final int ISSUED_STATUS = 3;
@Autowired
+ private StreamSourceEntityMapper sourceMapper;
+ @Autowired
+ private SourceSnapshotOperation snapshotOperation;
+ @Autowired
private SourceFileDetailEntityMapper fileDetailMapper;
-
@Autowired
private DataSourceCmdConfigEntityMapper sourceCmdConfigMapper;
-
- @Autowired
- private StreamSourceEntityMapper streamSourceMapper;
-
@Autowired
private InlongStreamFieldEntityMapper streamFieldMapper;
+ /**
+ * If the reported task time and the modification time in the database exceed this value,
+ * it will be considered that the user has modified the task, and the result of this report will be ignored.
+ */
+ @Value("${stream.source.maxModifyTime:5000}")
+ private Integer maxModifyTime;
+
+ @Override
+ public Boolean reportSnapshot(TaskSnapshotRequest request) {
+ return snapshotOperation.snapshot(request);
+ }
+
@Override
- public TaskResult getAgentTask(TaskRequest taskRequest) {
- LOGGER.debug("begin to get agent task by taskRequestDto={}", taskRequest);
- if (taskRequest == null || taskRequest.getAgentIp() == null) {
- LOGGER.error("agent command taskRequestDto cannot be empty");
+ public TaskResult reportAndGetTask(TaskRequest request) {
+ LOGGER.debug("begin to get agent task: {}", request);
+ if (request == null || request.getAgentIp() == null) {
+ LOGGER.warn("agent request was empty, just return");
return null;
}
- // Query pending tasks by agentIp
- List<DataConfig> dataConfigs = getAgentDataConfigs(taskRequest);
- // Query pending special commands
- List<CmdConfig> cmdConfigs = getAgentCmdConfigs(taskRequest);
+ this.updateTaskStatus(request);
- return TaskResult.builder().dataConfigs(dataConfigs).cmdConfigs(cmdConfigs).build();
+ return this.getTaskResult(request);
}
- private List<DataConfig> getAgentDataConfigs(TaskRequest taskRequest) {
- List<StreamSourceEntity> sourceEntities = streamSourceMapper.selectAgentTaskDataConfig(taskRequest);
- List<DataConfig> dataConfigs = sourceEntities.stream().map(sourceEntity -> {
+ /**
+ * Get task result by the request
+ */
+ private TaskResult getTaskResult(TaskRequest request) {
+ // Query all tasks with status in 20x
+ String agentIp = request.getAgentIp();
+ String uuid = request.getUuid();
+ List<StreamSourceEntity> entityList = sourceMapper.selectByIpAndUuid(agentIp, uuid);
+
+ List<DataConfig> dataConfigs = entityList.stream().map(entity -> {
DataConfig dataConfig = new DataConfig();
- dataConfig.setOp(String.valueOf(sourceEntity.getStatus() % 100));
- dataConfig.setJobId(sourceEntity.getId());
- SourceType sourceType = SourceType.forType(sourceEntity.getSourceType());
+ dataConfig.setJobId(entity.getId());
+ SourceType sourceType = SourceType.forType(entity.getSourceType());
dataConfig.setTaskType(sourceType.getTaskType().getType());
- dataConfig.setInlongGroupId(sourceEntity.getInlongGroupId());
- dataConfig.setInlongStreamId(sourceEntity.getInlongStreamId());
- dataConfig.setIp(sourceEntity.getAgentIp());
- dataConfig.setUuid(sourceEntity.getUuid());
- dataConfig.setExtParams(sourceEntity.getExtParams());
- dataConfig.setSnapshot(sourceEntity.getSnapshot());
+ dataConfig.setOp(String.valueOf(entity.getStatus() % 100));
+ dataConfig.setInlongGroupId(entity.getInlongGroupId());
+ dataConfig.setInlongStreamId(entity.getInlongStreamId());
+ dataConfig.setIp(entity.getAgentIp());
+ dataConfig.setUuid(entity.getUuid());
+ dataConfig.setExtParams(entity.getExtParams());
+ dataConfig.setSnapshot(entity.getSnapshot());
+
return dataConfig;
}).collect(Collectors.toList());
- //Forward Compatible File task type
- return dataConfigs;
+
+ // Query pending special commands
+ List<CmdConfig> cmdConfigs = getAgentCmdConfigs(request);
+
+ return TaskResult.builder().dataConfigs(dataConfigs).cmdConfigs(cmdConfigs).build();
+ }
+
+ /**
+ * Update the task status by the request
+ */
+ private void updateTaskStatus(TaskRequest request) {
+ if (CollectionUtils.isEmpty(request.getCommandInfo())) {
+ LOGGER.warn("task result was empty, just return");
+ return;
+ }
+
+ for (CommandEntity command : request.getCommandInfo()) {
+ Integer taskId = command.getTaskId();
+ StreamSourceEntity current = sourceMapper.selectByPrimaryKey(taskId);
+ if (current == null) {
+ continue;
+ }
+
+ if (current.getModifyTime().getTime() - command.getDeliveryTime().getTime() > maxModifyTime) {
+ LOGGER.warn("task {} receive result delay more than {} ms, skip it", taskId, maxModifyTime);
+ continue;
+ }
+
+ int result = command.getCommandResult();
+ int previousStatus = current.getStatus();
+ int nextStatus = SourceState.SOURCE_NORMAL.getCode();
+ if (previousStatus / 100 == UNISSUED_STATUS) {
+ if (Constants.RESULT_SUCCESS == result) {
+ if (SourceState.TEMP_TO_NORMAL.contains(previousStatus)) {
+ nextStatus = SourceState.SOURCE_NORMAL.getCode();
+ } else if (SourceState.BEEN_ISSUED_DELETE.getCode() == previousStatus) {
+ nextStatus = SourceState.SOURCE_DISABLE.getCode();
+ } else if (SourceState.BEEN_ISSUED_FROZEN.getCode() == previousStatus) {
+ nextStatus = SourceState.SOURCE_FROZEN.getCode();
+ }
+ } else if (Constants.RESULT_FAIL == result) {
+ nextStatus = SourceState.SOURCE_FAILED.getCode();
+ }
+
+ sourceMapper.updateStatus(taskId, nextStatus);
+ }
+ }
}
private List<CmdConfig> getAgentCmdConfigs(TaskRequest taskRequest) {
@@ -214,8 +282,8 @@ public class AgentTaskServiceImpl implements AgentTaskService {
}
} else { // Modify the result status of the data collection task
- if (current.getModifyTime().getTime() - command.getDeliveryTime() > 1000 * 5) {
- log.warn(" task id {} receive heartbeat time delay more than 5's, skip it!",
+ if (current.getModifyTime().getTime() - command.getDeliveryTime().getTime() > 1000 * 5) {
+ LOGGER.warn(" task id {} receive heartbeat time delay more than 5's, skip it!",
command.getTaskId());
continue;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java
index 39c6892..e9fadfe 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java
@@ -170,7 +170,7 @@ public class SourceSnapshotOperation implements AutoCloseable {
* @see org.apache.inlong.manager.common.enums.SourceState
*/
private ConcurrentHashMap<String, ConcurrentHashMap<Integer, Integer>> getTaskIpAndStatusMap() {
- ConcurrentHashMap<String, ConcurrentHashMap<Integer, Integer>> ipTaskMap = new ConcurrentHashMap<>();
+ ConcurrentHashMap<String, ConcurrentHashMap<Integer, Integer>> ipTaskMap = new ConcurrentHashMap<>(16);
List<StreamSourceEntity> sourceList = sourceMapper.selectTempStatusSource();
for (StreamSourceEntity entity : sourceList) {
String ip = entity.getAgentIp();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
index 190b049..5f876b3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
@@ -18,7 +18,6 @@
package org.apache.inlong.manager.service.source;
import com.github.pagehelper.PageInfo;
-import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.SourcePageRequest;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
@@ -174,12 +173,4 @@ public interface StreamSourceService {
return true;
}
- /**
- * Report the heartbeat for given source.
- *
- * @param request Heartbeat request.
- * @return Whether succeed.
- */
- Boolean reportSnapshot(TaskSnapshotRequest request);
-
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index c32c0f5..c326274 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -23,7 +23,6 @@ import com.github.pagehelper.PageInfo;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupState;
@@ -66,8 +65,6 @@ public class StreamSourceServiceImpl implements StreamSourceService {
private StreamSourceEntityMapper sourceMapper;
@Autowired
private CommonOperateService commonOperateService;
- @Autowired
- private SourceSnapshotOperation heartbeatOperation;
@Override
@Transactional(rollbackFor = Throwable.class)
@@ -111,7 +108,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
LOGGER.debug("begin to list source by groupId={}, streamId={}", groupId, streamId);
Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
- List<StreamSourceEntity> entityList = sourceMapper.selectByIdentifier(groupId, streamId);
+ List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedId(groupId, streamId);
if (CollectionUtils.isEmpty(entityList)) {
return Collections.emptyList();
}
@@ -138,7 +135,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
SourceType sourceType = SourceType.forType(entity.getSourceType());
StreamSourceOperation operation = operationFactory.getInstance(sourceType);
switch (sourceType) {
- case DB_BINLOG:
+ case BINLOG:
BinlogSourceListResponse binlogSourceListResponse = operation.getFromEntity(entity,
BinlogSourceListResponse::new);
responses.add(binlogSourceListResponse);
@@ -263,7 +260,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
nextStatus = SourceState.SOURCE_DISABLE.getCode();
}
Date now = new Date();
- List<StreamSourceEntity> entityList = sourceMapper.selectByIdentifier(groupId, streamId);
+ List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedId(groupId, streamId);
if (CollectionUtils.isNotEmpty(entityList)) {
for (StreamSourceEntity entity : entityList) {
Integer id = entity.getId();
@@ -291,7 +288,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
// Check if it can be deleted
commonOperateService.checkGroupStatus(groupId, operator);
- List<StreamSourceEntity> entityList = sourceMapper.selectByIdentifier(groupId, streamId);
+ List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedId(groupId, streamId);
if (CollectionUtils.isNotEmpty(entityList)) {
entityList.forEach(entity -> sourceMapper.deleteByPrimaryKey(entity.getId()));
}
@@ -312,11 +309,6 @@ public class StreamSourceServiceImpl implements StreamSourceService {
return resultList;
}
- @Override
- public Boolean reportSnapshot(TaskSnapshotRequest request) {
- return heartbeatOperation.snapshot(request);
- }
-
private void checkParams(SourceRequest request) {
Preconditions.checkNotNull(request, Constant.REQUEST_IS_EMPTY);
String groupId = request.getInlongGroupId();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogStreamSourceOperation.java
index d0966e3..6531900 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogStreamSourceOperation.java
@@ -52,12 +52,12 @@ public class BinlogStreamSourceOperation extends AbstractStreamSourceOperation {
@Override
public Boolean accept(SourceType sourceType) {
- return SourceType.DB_BINLOG == sourceType;
+ return SourceType.BINLOG == sourceType;
}
@Override
protected String getSourceType() {
- return Constant.SOURCE_DB_BINLOG;
+ return Constant.SOURCE_BINLOG;
}
@Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/AbstractSourceOperateListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/AbstractSourceOperateListener.java
index 610cb0c..d5f3e4b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/AbstractSourceOperateListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/AbstractSourceOperateListener.java
@@ -81,7 +81,7 @@ public abstract class AbstractSourceOperateListener implements DataSourceOperate
String sourceType = sourceResponse.getSourceType();
SourceType type = SourceType.valueOf(sourceType);
switch (type) {
- case DB_BINLOG:
+ case BINLOG:
return CommonBeanUtils.copyProperties((BinlogSourceResponse) sourceResponse, BinlogSourceRequest::new);
case KAFKA:
return CommonBeanUtils.copyProperties((KafkaSourceResponse) sourceResponse, KafkaSourceRequest::new);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
index 124fb62..345d5b4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
@@ -44,7 +44,7 @@ public class SerializationUtils {
InlongStreamInfo streamInfo) {
SourceType sourceType = SourceType.forType(sourceResponse.getSourceType());
switch (sourceType) {
- case DB_BINLOG:
+ case BINLOG:
return forBinlog((BinlogSourceResponse) sourceResponse, streamInfo);
case KAFKA:
return forKafka((KafkaSourceResponse) sourceResponse, streamInfo);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
index 7f6cb24..5022d48 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
@@ -25,4 +25,8 @@ import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest(classes = ServiceBaseTest.class)
public class ServiceBaseTest extends BaseTest {
+ public final String globalGroupId = "b_group1";
+ public final String globalStreamId = "stream1";
+ public final String globalOperator = "admin";
+
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
similarity index 54%
copy from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
copy to inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
index e4cb65b..003936b 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
@@ -15,17 +15,14 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.core.source;
+package org.apache.inlong.manager.service.core.impl;
import org.apache.inlong.common.pojo.agent.TaskSnapshotMessage;
import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
import org.apache.inlong.manager.common.enums.Constant;
-import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceRequest;
-import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.service.ServiceBaseTest;
-import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest;
+import org.apache.inlong.manager.service.core.AgentService;
import org.apache.inlong.manager.service.source.StreamSourceService;
import org.junit.Assert;
import org.junit.Test;
@@ -37,15 +34,13 @@ import java.util.Date;
/**
* Stream source service test
*/
-public class StreamSourceServiceTest extends ServiceBaseTest {
-
- private final String globalGroupId = "b_group1";
- private final String globalStreamId = "stream1";
- private final String globalOperator = "test_user";
+public class AgentServiceTest extends ServiceBaseTest {
@Autowired
private StreamSourceService sourceService;
@Autowired
+ private AgentService agentService;
+ @Autowired
private InlongStreamServiceTest streamServiceTest;
public Integer saveSource() {
@@ -54,46 +49,12 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
BinlogSourceRequest sourceInfo = new BinlogSourceRequest();
sourceInfo.setInlongGroupId(globalGroupId);
sourceInfo.setInlongStreamId(globalStreamId);
- sourceInfo.setSourceType(Constant.SOURCE_DB_BINLOG);
+ sourceInfo.setSourceType(Constant.SOURCE_BINLOG);
return sourceService.save(sourceInfo, globalOperator);
}
@Test
- public void testSaveAndDelete() {
- Integer id = this.saveSource();
- Assert.assertNotNull(id);
-
- boolean result = sourceService.delete(id, Constant.SOURCE_DB_BINLOG, globalOperator);
- Assert.assertTrue(result);
- }
-
- @Test
- public void testListByIdentifier() {
- Integer id = this.saveSource();
-
- SourceResponse source = sourceService.get(id, Constant.SOURCE_DB_BINLOG);
- Assert.assertEquals(globalGroupId, source.getInlongGroupId());
-
- sourceService.delete(id, Constant.SOURCE_DB_BINLOG, globalOperator);
- }
-
- @Test
- public void testGetAndUpdate() {
- Integer id = this.saveSource();
- SourceResponse response = sourceService.get(id, Constant.SOURCE_DB_BINLOG);
- Assert.assertEquals(globalGroupId, response.getInlongGroupId());
-
- BinlogSourceResponse binlogResponse = (BinlogSourceResponse) response;
-
- BinlogSourceRequest request = CommonBeanUtils.copyProperties(binlogResponse, BinlogSourceRequest::new);
- boolean result = sourceService.update(request, globalOperator);
- Assert.assertTrue(result);
-
- sourceService.delete(id, Constant.SOURCE_DB_BINLOG, globalOperator);
- }
-
- @Test
public void testReportSnapshot() {
Integer id = this.saveSource();
@@ -106,10 +67,10 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
message.setSnapshot("{\"offset\": 100}");
request.setSnapshotList(Collections.singletonList(message));
- Boolean result = sourceService.reportSnapshot(request);
+ Boolean result = agentService.reportSnapshot(request);
Assert.assertTrue(result);
- sourceService.delete(id, Constant.SOURCE_DB_BINLOG, globalOperator);
+ sourceService.delete(id, Constant.SOURCE_BINLOG, globalOperator);
}
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
index e4cb65b..1f6588a 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
@@ -17,8 +17,6 @@
package org.apache.inlong.manager.service.core.source;
-import org.apache.inlong.common.pojo.agent.TaskSnapshotMessage;
-import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceRequest;
@@ -31,18 +29,11 @@ import org.junit.Assert;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
-import java.util.Collections;
-import java.util.Date;
-
/**
* Stream source service test
*/
public class StreamSourceServiceTest extends ServiceBaseTest {
- private final String globalGroupId = "b_group1";
- private final String globalStreamId = "stream1";
- private final String globalOperator = "test_user";
-
@Autowired
private StreamSourceService sourceService;
@Autowired
@@ -54,7 +45,7 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
BinlogSourceRequest sourceInfo = new BinlogSourceRequest();
sourceInfo.setInlongGroupId(globalGroupId);
sourceInfo.setInlongStreamId(globalStreamId);
- sourceInfo.setSourceType(Constant.SOURCE_DB_BINLOG);
+ sourceInfo.setSourceType(Constant.SOURCE_BINLOG);
return sourceService.save(sourceInfo, globalOperator);
}
@@ -64,7 +55,7 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
Integer id = this.saveSource();
Assert.assertNotNull(id);
- boolean result = sourceService.delete(id, Constant.SOURCE_DB_BINLOG, globalOperator);
+ boolean result = sourceService.delete(id, Constant.SOURCE_BINLOG, globalOperator);
Assert.assertTrue(result);
}
@@ -72,16 +63,16 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
public void testListByIdentifier() {
Integer id = this.saveSource();
- SourceResponse source = sourceService.get(id, Constant.SOURCE_DB_BINLOG);
+ SourceResponse source = sourceService.get(id, Constant.SOURCE_BINLOG);
Assert.assertEquals(globalGroupId, source.getInlongGroupId());
- sourceService.delete(id, Constant.SOURCE_DB_BINLOG, globalOperator);
+ sourceService.delete(id, Constant.SOURCE_BINLOG, globalOperator);
}
@Test
public void testGetAndUpdate() {
Integer id = this.saveSource();
- SourceResponse response = sourceService.get(id, Constant.SOURCE_DB_BINLOG);
+ SourceResponse response = sourceService.get(id, Constant.SOURCE_BINLOG);
Assert.assertEquals(globalGroupId, response.getInlongGroupId());
BinlogSourceResponse binlogResponse = (BinlogSourceResponse) response;
@@ -90,26 +81,7 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
boolean result = sourceService.update(request, globalOperator);
Assert.assertTrue(result);
- sourceService.delete(id, Constant.SOURCE_DB_BINLOG, globalOperator);
- }
-
- @Test
- public void testReportSnapshot() {
- Integer id = this.saveSource();
-
- TaskSnapshotRequest request = new TaskSnapshotRequest();
- request.setAgentIp("127.0.0.1");
- request.setReportTime(new Date());
-
- TaskSnapshotMessage message = new TaskSnapshotMessage();
- message.setJobId(id);
- message.setSnapshot("{\"offset\": 100}");
- request.setSnapshotList(Collections.singletonList(message));
-
- Boolean result = sourceService.reportSnapshot(request);
- Assert.assertTrue(result);
-
- sourceService.delete(id, Constant.SOURCE_DB_BINLOG, globalOperator);
+ sourceService.delete(id, Constant.SOURCE_BINLOG, globalOperator);
}
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
index 2f49ce4..7d85a03 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
@@ -80,7 +80,7 @@ public class DataSourceListenerTest extends WorkflowServiceImplTest {
WorkflowProcess process = context.getProcess();
WorkflowTask task = process.getTaskByName("stopSource");
Assert.assertTrue(task instanceof ServiceTask);
- SourceResponse sourceResponse = streamSourceService.get(sourceId, SourceType.DB_BINLOG.toString());
+ SourceResponse sourceResponse = streamSourceService.get(sourceId, SourceType.BINLOG.toString());
Assert.assertSame(SourceState.forCode(sourceResponse.getStatus()), SourceState.TO_BE_ISSUED_FROZEN);
}
@@ -109,7 +109,7 @@ public class DataSourceListenerTest extends WorkflowServiceImplTest {
WorkflowProcess process = context.getProcess();
WorkflowTask task = process.getTaskByName("restartSource");
Assert.assertTrue(task instanceof ServiceTask);
- SourceResponse sourceResponse = streamSourceService.get(sourceId, SourceType.DB_BINLOG.toString());
+ SourceResponse sourceResponse = streamSourceService.get(sourceId, SourceType.BINLOG.toString());
Assert.assertSame(SourceState.forCode(sourceResponse.getStatus()), SourceState.TO_BE_ISSUED_ACTIVE);
}
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
index ccef5df..ceb2983 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
@@ -19,9 +19,9 @@ package org.apache.inlong.manager.web.controller.openapi;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
-import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
import org.apache.inlong.common.pojo.agent.TaskRequest;
import org.apache.inlong.common.pojo.agent.TaskResult;
+import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
import org.apache.inlong.manager.common.beans.Response;
import org.apache.inlong.manager.common.pojo.agent.AgentHeartbeatRequest;
import org.apache.inlong.manager.common.pojo.agent.AgentStatusReportRequest;
@@ -33,10 +33,9 @@ import org.apache.inlong.manager.common.pojo.agent.FileAgentCommandInfo;
import org.apache.inlong.manager.common.pojo.agent.FileAgentTaskConfig;
import org.apache.inlong.manager.common.pojo.agent.FileAgentTaskInfo;
import org.apache.inlong.manager.service.core.AgentHeartbeatService;
+import org.apache.inlong.manager.service.core.AgentService;
import org.apache.inlong.manager.service.core.AgentSysConfigService;
-import org.apache.inlong.manager.service.core.AgentTaskService;
import org.apache.inlong.manager.service.core.ThirdPartyClusterService;
-import org.apache.inlong.manager.service.source.StreamSourceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
@@ -52,9 +51,7 @@ import java.util.List;
public class AgentController {
@Autowired
- private StreamSourceService sourceService;
- @Autowired
- private AgentTaskService agentTaskService;
+ private AgentService agentService;
@Autowired
private AgentSysConfigService agentSysConfigService;
@Autowired
@@ -68,29 +65,29 @@ public class AgentController {
return Response.success(thirdPartyClusterService.listClusterIpByType("inlong-openapi"));
}
- @PostMapping("/getTask")
- @ApiOperation(value = "Get source task config")
- public Response<TaskResult> getTask(@RequestBody TaskRequest taskRequest) {
- return Response.success(agentTaskService.getAgentTask(taskRequest));
- }
-
@PostMapping("/reportSnapshot")
@ApiOperation(value = "Report source task snapshot")
public Response<Boolean> reportSnapshot(TaskSnapshotRequest request) {
- return Response.success(sourceService.reportSnapshot(request));
+ return Response.success(agentService.reportSnapshot(request));
+ }
+
+ @PostMapping("/reportAndGetTask")
+ @ApiOperation(value = "Report source task snapshot")
+ public Response<TaskResult> reportAndGetTask(@RequestBody TaskRequest request) {
+ return Response.success(agentService.reportAndGetTask(request));
}
@Deprecated
@PostMapping("/fileAgent/getTaskConf")
@ApiOperation(value = "Get file task")
public Response<FileAgentTaskInfo> getFileAgentTask(@RequestBody FileAgentCommandInfo info) {
- return Response.success(agentTaskService.getFileAgentTask(info));
+ return Response.success(agentService.getFileAgentTask(info));
}
@PostMapping("/fileAgent/confirmAgentIp")
@ApiOperation(value = "Confirm current agent ip")
public Response<String> confirmAgentIp(@RequestBody ConfirmAgentIpRequest info) {
- return Response.success(agentTaskService.confirmAgentIp(info));
+ return Response.success(agentService.confirmAgentIp(info));
}
@PostMapping("/fileAgent/getAgentSysConf")
@@ -108,12 +105,13 @@ public class AgentController {
@PostMapping("/fileAgent/checkAgentTaskConf")
@ApiOperation(value = "Check agent source config")
public Response<List<FileAgentTaskConfig>> checkAgentTaskConf(@RequestBody CheckAgentTaskConfRequest info) {
- return Response.success(agentTaskService.checkAgentTaskConf(info));
+ return Response.success(agentService.checkAgentTaskConf(info));
}
@PostMapping("/fileAgent/reportAgentStatus")
@ApiOperation(value = "Report agent status")
public Response<String> reportAgentStatus(@RequestBody AgentStatusReportRequest info) {
- return Response.success(agentTaskService.reportAgentStatus(info));
+ return Response.success(agentService.reportAgentStatus(info));
}
+
}