You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/02/24 13:17:59 UTC
[incubator-inlong] branch master updated: [INLONG-2689][Manager] Support report snapshot for source agent (#2706)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f52c336 [INLONG-2689][Manager] Support report snapshot for source agent (#2706)
f52c336 is described below
commit f52c336db35ff195e2d3b209272a758b08e991ce
Author: healchow <he...@gmail.com>
AuthorDate: Thu Feb 24 21:17:55 2022 +0800
[INLONG-2689][Manager] Support report snapshot for source agent (#2706)
* [INLONG-2689][Manager] Support report snapshot for source agent
* [INLONG-2689][Manager] Add and refactor unit tests
* [INLONG-2689][Manager] Add apache license
---
.../common/pojo/agent/TaskSnapshotMessage.java | 24 ++--
...rtbeatRequest.java => TaskSnapshotRequest.java} | 20 +--
.../manager/dao/entity/StreamSourceEntity.java | 1 +
.../dao/mapper/StreamSourceEntityMapper.java | 3 +-
.../resources/mappers/StreamSourceEntityMapper.xml | 44 ++++---
.../service/source/SourceSnapshotOperation.java | 139 +++++++++++++++++++++
.../service/source/StreamSourceService.java | 4 +-
.../service/source/StreamSourceServiceImpl.java | 14 +--
.../service/core/impl/AuditServiceTest.java | 2 -
.../service/core/impl/InlongGroupServiceTest.java | 3 +-
.../service/core/impl/InlongStreamServiceTest.java | 3 +-
...rviceTest.java => StreamSourceServiceTest.java} | 38 +++++-
.../main/resources/sql/apache_inlong_manager.sql | 3 +-
.../manager-web/sql/apache_inlong_manager.sql | 3 +-
.../web/controller/openapi/AgentController.java | 28 +++--
15 files changed, 257 insertions(+), 72 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceSnapshotRequest.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskSnapshotMessage.java
similarity index 66%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceSnapshotRequest.java
rename to inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskSnapshotMessage.java
index da6c95c..972176e 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceSnapshotRequest.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskSnapshotMessage.java
@@ -15,26 +15,24 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.pojo.source;
+package org.apache.inlong.common.pojo.agent;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
-import javax.validation.constraints.NotNull;
-
/**
- * Request of the source heartbeat
+ * Snapshot message
*/
@Data
-@ApiModel("Request of the source heartbeat")
-public class SourceSnapshotRequest {
+public class TaskSnapshotMessage {
- @NotNull
- @ApiModelProperty("Id of the source")
- private Integer id;
+ /**
+ * The job id
+ */
+ private Integer jobId;
- @ApiModelProperty("snapshot of this source task")
+ /**
+ * Snapshot of this source task
+ */
private String snapshot;
-}
+}
\ No newline at end of file
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskHeartbeatRequest.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskSnapshotRequest.java
similarity index 78%
rename from inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskHeartbeatRequest.java
rename to inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskSnapshotRequest.java
index ed305e6..9874017 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskHeartbeatRequest.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskSnapshotRequest.java
@@ -20,25 +20,24 @@ package org.apache.inlong.common.pojo.agent;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
-import javax.annotation.Nonnull;
import java.util.Date;
+import java.util.List;
/**
- * Request of the agent task heartbeat
+ * Request of the agent task snapshot
*/
@Data
-public class TaskHeartbeatRequest {
+public class TaskSnapshotRequest {
/**
- * The source id
+ * The ip of agent
*/
- @Nonnull
- private Integer id;
+ private String agentIp;
/**
- * Heartbeat of this source task
+ * The mac UUID of agent
*/
- private String heartbeat;
+ private String uuid;
/**
* Report time
@@ -46,4 +45,9 @@ public class TaskHeartbeatRequest {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date reportTime;
+ /**
+ * The snapshot message list
+ */
+ private List<TaskSnapshotMessage> snapshotList;
+
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
index c6238e3..70259d8 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
@@ -38,6 +38,7 @@ public class StreamSourceEntity implements Serializable {
private Integer clusterId;
private String clusterName;
private String snapshot;
+ private Date reportTime;
// extParams saved filePath, fileRollingType, dbName, tableName, etc.
private String extParams;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 1666527..eff8215 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -20,7 +20,6 @@ package org.apache.inlong.manager.dao.mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.inlong.common.pojo.agent.DataConfig;
import org.apache.inlong.common.pojo.agent.TaskRequest;
-import org.apache.inlong.manager.common.pojo.source.SourceSnapshotRequest;
import org.apache.inlong.manager.common.pojo.source.SourcePageRequest;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.springframework.stereotype.Repository;
@@ -84,7 +83,7 @@ public interface StreamSourceEntityMapper {
int updateStatus(StreamSourceEntity entity);
- int updateSnapshot(SourceSnapshotRequest request);
+ int updateSnapshot(StreamSourceEntity entity);
int deleteByPrimaryKey(Integer id);
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 10e6111..77da5cb 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -32,6 +32,7 @@
<result column="cluster_id" jdbcType="INTEGER" property="clusterId"/>
<result column="cluster_name" jdbcType="VARCHAR" property="clusterName"/>
<result column="snapshot" jdbcType="LONGVARCHAR" property="snapshot"/>
+ <result column="report_time" jdbcType="TIMESTAMP" property="reportTime"/>
<result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
<result column="status" jdbcType="INTEGER" property="status"/>
<result column="previous_status" jdbcType="INTEGER" property="previousStatus"/>
@@ -55,8 +56,8 @@
</resultMap>
<sql id="Base_Column_List">
id, inlong_group_id, inlong_stream_id, source_type, agent_ip, uuid, server_id, server_name,
- cluster_id, cluster_name, snapshot, ext_params, status, previous_status, is_deleted,
- creator, modifier, create_time, modify_time
+ cluster_id, cluster_name, snapshot, report_time, ext_params, status, previous_status,
+ is_deleted, creator, modifier, create_time, modify_time
</sql>
<insert id="insert" useGeneratedKeys="true" keyProperty="id"
@@ -64,15 +65,15 @@
insert into stream_source (id, inlong_group_id, inlong_stream_id,
source_type, agent_ip, uuid,
server_id, server_name, cluster_id,
- cluster_name, snapshot, ext_params,
- status, previous_status,
+ cluster_name, snapshot, report_time,
+ ext_params, status, previous_status,
is_deleted, creator, modifier,
create_time, modify_time)
values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
#{sourceType,jdbcType=VARCHAR}, #{agentIp,jdbcType=VARCHAR}, #{uuid,jdbcType=VARCHAR},
#{serverId,jdbcType=INTEGER}, #{serverName,jdbcType=VARCHAR}, #{clusterId,jdbcType=INTEGER},
- #{clusterName,jdbcType=VARCHAR}, #{snapshot,jdbcType=LONGVARCHAR}, #{extParams,jdbcType=LONGVARCHAR},
- #{status,jdbcType=INTEGER}, #{previousStatus,jdbcType=INTEGER},
+ #{clusterName,jdbcType=VARCHAR}, #{snapshot,jdbcType=LONGVARCHAR}, #{modifyTime,jdbcType=TIMESTAMP},
+ #{extParams,jdbcType=LONGVARCHAR}, #{status,jdbcType=INTEGER}, #{previousStatus,jdbcType=INTEGER},
#{isDeleted,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR},
#{createTime,jdbcType=TIMESTAMP}, #{modifyTime,jdbcType=TIMESTAMP})
</insert>
@@ -113,6 +114,12 @@
<if test="snapshot != null">
snapshot,
</if>
+ <if test="reportTime != null">
+ report_time,
+ </if>
+ <if test="extParams != null">
+ ext_params,
+ </if>
<if test="status != null">
status,
</if>
@@ -134,9 +141,6 @@
<if test="modifyTime != null">
modify_time,
</if>
- <if test="extParams != null">
- ext_params,
- </if>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides=",">
<if test="id != null">
@@ -172,6 +176,9 @@
<if test="snapshot != null">
#{snapshot,jdbcType=LONGVARCHAR},
</if>
+ <if test="reportTime != null">
+ #{reportTime,jdbcType=TIMESTAMP},
+ </if>
<if test="extParams != null">
#{extParams,jdbcType=LONGVARCHAR},
</if>
@@ -316,6 +323,9 @@
<if test="snapshot != null">
snapshot = #{snapshot,jdbcType=LONGVARCHAR},
</if>
+ <if test="reportTime != null">
+ report_time = #{reportTime,jdbcType=TIMESTAMP},
+ </if>
<if test="extParams != null">
ext_params = #{extParams,jdbcType=LONGVARCHAR},
</if>
@@ -354,7 +364,8 @@
server_name = #{serverName,jdbcType=VARCHAR},
cluster_id = #{clusterId,jdbcType=INTEGER},
cluster_name = #{clusterName,jdbcType=VARCHAR},
- snapshot = #{snapshot,jdbcType=LONGVARCHAR},
+ snapshot = #{snapshot,jdbcType=LONGVARCHAR},
+ report_time = #{reportTime,jdbcType=TIMESTAMP},
ext_params = #{extParams,jdbcType=LONGVARCHAR},
status = #{status,jdbcType=INTEGER},
previous_status = #{previousStatus,jdbcType=INTEGER},
@@ -372,10 +383,10 @@
modify_time = now()
where id = #{id,jdbcType=INTEGER}
</update>
- <update id="updateSnapshot" parameterType="org.apache.inlong.manager.common.pojo.source.SourceSnapshotRequest">
+ <update id="updateSnapshot" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
update stream_source
- set snapshot = #{snapshot,jdbcType=LONGVARCHAR},
- modify_time = now()
+ set snapshot = #{snapshot,jdbcType=LONGVARCHAR},
+ report_time = #{reportTime,jdbcType=TIMESTAMP}
where id = #{id,jdbcType=INTEGER}
</update>
@@ -385,20 +396,21 @@
where id = #{id,jdbcType=INTEGER}
</delete>
- <select id="selectAgentTaskDataConfig" parameterType="org.apache.inlong.common.pojo.agent.TaskRequest" resultMap="DataConfig">
+ <select id="selectAgentTaskDataConfig" parameterType="org.apache.inlong.common.pojo.agent.TaskRequest"
+ resultMap="DataConfig">
select
detail.inlong_group_id,
detail.inlong_stream_id,
detail.modify_time,
detail.uuid,
detail.agent_ip,
- detail.status mod 100 as op,
+ detail.status mod 100 as op,
detail.id,
detail.source_type,
detail.snapshot,
detail.ext_params,
from stream_source detail,
- where detail.is_deleted = 0
+ where detail.is_deleted = 0
<if test="agentIp != null and agentIp != ''">
and detail.agent_ip = #{agentIp, jdbcType=VARCHAR}
</if>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java
new file mode 100644
index 0000000..57c4e32
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.source;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.common.pojo.agent.TaskSnapshotMessage;
+import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
+import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Operate the source snapshot
+ */
+@Service
+public class SourceSnapshotOperation implements AutoCloseable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SourceSnapshotOperation.class);
+
+ public final ExecutorService executorService = new ThreadPoolExecutor(
+ 1,
+ 1,
+ 10L,
+ TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(100),
+ new ThreadFactoryBuilder().setNameFormat("stream-source-snapshot-%s").build(),
+ new CallerRunsPolicy());
+
+ @Autowired
+ private StreamSourceEntityMapper sourceMapper;
+
+ // The queue for transfer source snapshot
+ private LinkedBlockingQueue<TaskSnapshotRequest> snapshotQueue = null;
+
+ @Value("${stream.source.snapshot.batch.size:100}")
+ private int batchSize;
+
+ @Value("${stream.source.snapshot.queue.size:10000}")
+ private int queueSize = 10000;
+
+ private volatile boolean isClose = false;
+
+ /**
+ * Start a thread to operate source snapshot after the app started.
+ */
+ @PostConstruct
+ private void startSaveSnapshotTask() {
+ if (snapshotQueue == null) {
+ snapshotQueue = new LinkedBlockingQueue<>(queueSize);
+ }
+ SaveSnapshotTaskRunnable taskRunnable = new SaveSnapshotTaskRunnable();
+ this.executorService.execute(taskRunnable);
+ LOGGER.info("source snapshot operate thread started successfully");
+ }
+
+ /**
+ * Put snapshot into data queue
+ */
+ public Boolean putData(TaskSnapshotRequest request) {
+ if (request == null || CollectionUtils.isEmpty(request.getSnapshotList())) {
+ LOGGER.info("request received, but snapshot list is empty, just return");
+ return true;
+ }
+ try {
+ snapshotQueue.offer(request);
+ return true;
+ } catch (Throwable t) {
+ LOGGER.error("put source snapshot error", t);
+ return false;
+ }
+ }
+
+ @Override
+ public void close() {
+ this.isClose = true;
+ }
+
+ /**
+ * The task of saving source task snapshot into DB.
+ */
+ private class SaveSnapshotTaskRunnable implements Runnable {
+
+ @Override
+ public void run() {
+ while (!isClose) {
+ try {
+ TaskSnapshotRequest request = snapshotQueue.poll(1, TimeUnit.SECONDS);
+ if (request == null || CollectionUtils.isEmpty(request.getSnapshotList())) {
+ continue;
+ }
+
+ List<TaskSnapshotMessage> requestList = request.getSnapshotList();
+ for (TaskSnapshotMessage message : requestList) {
+ Integer id = message.getJobId();
+ StreamSourceEntity entity = new StreamSourceEntity();
+ entity.setId(id);
+ entity.setSnapshot(message.getSnapshot());
+ entity.setReportTime(request.getReportTime());
+
+ // update snapshot
+ sourceMapper.updateSnapshot(entity);
+ }
+ } catch (Throwable t) {
+ LOGGER.error("source snapshot task runnable error", t);
+ }
+ }
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
index 380d4b5..01bf199 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
@@ -18,7 +18,7 @@
package org.apache.inlong.manager.service.source;
import com.github.pagehelper.PageInfo;
-import org.apache.inlong.manager.common.pojo.source.SourceSnapshotRequest;
+import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.SourcePageRequest;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
@@ -148,6 +148,6 @@ public interface StreamSourceService {
* @param request Heartbeat request.
* @return Whether succeed.
*/
- Boolean reportSnapshot(SourceSnapshotRequest request);
+ Boolean reportSnapshot(TaskSnapshotRequest request);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 679fc61..cea9fd8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -22,17 +22,16 @@ import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.SourceType;
-import org.apache.inlong.manager.common.pojo.source.SourceSnapshotRequest;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.SourcePageRequest;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.service.CommonOperateService;
@@ -61,6 +60,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
private StreamSourceEntityMapper sourceMapper;
@Autowired
private CommonOperateService commonOperateService;
+ @Autowired
+ private SourceSnapshotOperation heartbeatOperation;
@Override
@Transactional(rollbackFor = Throwable.class)
@@ -72,7 +73,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
// Check if it can be added
String groupId = request.getInlongGroupId();
- InlongGroupEntity inlongGroupEntity = commonOperateService.checkGroupStatus(groupId, operator);
+ commonOperateService.checkGroupStatus(groupId, operator);
// Make sure that there is no source info with the current groupId and streamId
String streamId = request.getInlongStreamId();
@@ -253,11 +254,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
}
@Override
- public Boolean reportSnapshot(SourceSnapshotRequest request) {
- if (request == null || request.getId() == null || request.getSnapshot() == null) {
- return true;
- }
- return sourceMapper.updateSnapshot(request) > 0;
+ public Boolean reportSnapshot(TaskSnapshotRequest request) {
+ return heartbeatOperation.putData(request);
}
private void checkParams(SourceRequest request) {
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java
index f91fcf7..e198b51 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java
@@ -25,14 +25,12 @@ import org.apache.inlong.manager.service.core.AuditService;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.TestComponent;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-@TestComponent
public class AuditServiceTest extends ServiceBaseTest {
@Autowired
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceTest.java
index 4ed7bbc..aaedbb7 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceTest.java
@@ -24,7 +24,6 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
-import org.apache.inlong.manager.service.ServiceBaseTest;
import org.apache.inlong.manager.service.core.InlongGroupService;
import org.junit.Assert;
import org.junit.Test;
@@ -38,7 +37,7 @@ import java.util.List;
* Inlong group service test
*/
@TestComponent
-public class InlongGroupServiceTest extends ServiceBaseTest {
+public class InlongGroupServiceTest {
private final String globalGroupId = "b_group1";
private final String globalGroupName = "group1";
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceTest.java
index 300c1fd..5eb136c 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceTest.java
@@ -18,7 +18,6 @@
package org.apache.inlong.manager.service.core.impl;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.service.ServiceBaseTest;
import org.apache.inlong.manager.service.core.InlongStreamService;
import org.junit.Assert;
import org.junit.Test;
@@ -29,7 +28,7 @@ import org.springframework.boot.test.context.TestComponent;
* Inlong stream service test
*/
@TestComponent
-public class InlongStreamServiceTest extends ServiceBaseTest {
+public class InlongStreamServiceTest {
private final String globalGroupId = "b_group1";
private final String globalGroupName = "group1";
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/BinlogStreamSourceServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
similarity index 71%
rename from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/BinlogStreamSourceServiceTest.java
rename to inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
index a103d9f..e4cb65b 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/BinlogStreamSourceServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
@@ -17,6 +17,8 @@
package org.apache.inlong.manager.service.core.source;
+import org.apache.inlong.common.pojo.agent.TaskSnapshotMessage;
+import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceRequest;
@@ -29,10 +31,13 @@ import org.junit.Assert;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
+import java.util.Collections;
+import java.util.Date;
+
/**
* Stream source service test
*/
-public class BinlogStreamSourceServiceTest extends ServiceBaseTest {
+public class StreamSourceServiceTest extends ServiceBaseTest {
private final String globalGroupId = "b_group1";
private final String globalStreamId = "stream1";
@@ -49,7 +54,7 @@ public class BinlogStreamSourceServiceTest extends ServiceBaseTest {
BinlogSourceRequest sourceInfo = new BinlogSourceRequest();
sourceInfo.setInlongGroupId(globalGroupId);
sourceInfo.setInlongStreamId(globalStreamId);
- sourceInfo.setSourceType(Constant.SINK_HIVE);
+ sourceInfo.setSourceType(Constant.SOURCE_DB_BINLOG);
return sourceService.save(sourceInfo, globalOperator);
}
@@ -59,7 +64,7 @@ public class BinlogStreamSourceServiceTest extends ServiceBaseTest {
Integer id = this.saveSource();
Assert.assertNotNull(id);
- boolean result = sourceService.delete(id, Constant.SINK_HIVE, globalOperator);
+ boolean result = sourceService.delete(id, Constant.SOURCE_DB_BINLOG, globalOperator);
Assert.assertTrue(result);
}
@@ -67,16 +72,16 @@ public class BinlogStreamSourceServiceTest extends ServiceBaseTest {
public void testListByIdentifier() {
Integer id = this.saveSource();
- SourceResponse source = sourceService.get(id, Constant.SINK_HIVE);
+ SourceResponse source = sourceService.get(id, Constant.SOURCE_DB_BINLOG);
Assert.assertEquals(globalGroupId, source.getInlongGroupId());
- sourceService.delete(id, Constant.SINK_HIVE, globalOperator);
+ sourceService.delete(id, Constant.SOURCE_DB_BINLOG, globalOperator);
}
@Test
public void testGetAndUpdate() {
Integer id = this.saveSource();
- SourceResponse response = sourceService.get(id, Constant.SINK_HIVE);
+ SourceResponse response = sourceService.get(id, Constant.SOURCE_DB_BINLOG);
Assert.assertEquals(globalGroupId, response.getInlongGroupId());
BinlogSourceResponse binlogResponse = (BinlogSourceResponse) response;
@@ -84,6 +89,27 @@ public class BinlogStreamSourceServiceTest extends ServiceBaseTest {
BinlogSourceRequest request = CommonBeanUtils.copyProperties(binlogResponse, BinlogSourceRequest::new);
boolean result = sourceService.update(request, globalOperator);
Assert.assertTrue(result);
+
+ sourceService.delete(id, Constant.SOURCE_DB_BINLOG, globalOperator);
+ }
+
+ @Test
+ public void testReportSnapshot() {
+ Integer id = this.saveSource();
+
+ TaskSnapshotRequest request = new TaskSnapshotRequest();
+ request.setAgentIp("127.0.0.1");
+ request.setReportTime(new Date());
+
+ TaskSnapshotMessage message = new TaskSnapshotMessage();
+ message.setJobId(id);
+ message.setSnapshot("{\"offset\": 100}");
+ request.setSnapshotList(Collections.singletonList(message));
+
+ Boolean result = sourceService.reportSnapshot(request);
+ Assert.assertTrue(result);
+
+ sourceService.delete(id, Constant.SOURCE_DB_BINLOG, globalOperator);
}
}
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index fccb595..75d8725 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -566,7 +566,8 @@ CREATE TABLE `stream_source`
`server_name` varchar(50) DEFAULT '' COMMENT 'Name of the source server',
`cluster_id` int(11) DEFAULT NULL COMMENT 'Id of the cluster that collected this source',
`cluster_name` varchar(50) DEFAULT '' COMMENT 'Name of the cluster that collected this source',
- `heartbeat` text DEFAULT NULL COMMENT 'Heartbeat of this source task',
+ `snapshot` text DEFAULT NULL COMMENT 'Snapshot of this source task',
+ `report_time` timestamp DEFAULT NULL COMMENT 'Snapshot time',
`ext_params` text DEFAULT NULL COMMENT 'Another fields will saved as JSON string, such as filePath, dbName, tableName, etc',
`status` int(4) DEFAULT '0' COMMENT 'Data source status',
`previous_status` int(4) DEFAULT '0' COMMENT 'Previous status',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index e921d7a..dbd781b 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -598,7 +598,8 @@ CREATE TABLE `stream_source`
`server_name` varchar(50) DEFAULT '' COMMENT 'Name of the source server',
`cluster_id` int(11) DEFAULT NULL COMMENT 'Id of the cluster that collected this source',
`cluster_name` varchar(50) DEFAULT '' COMMENT 'Name of the cluster that collected this source',
- `snapshot` text DEFAULT NULL COMMENT 'snapshot of this source task',
+ `snapshot` text DEFAULT NULL COMMENT 'Snapshot of this source task',
+ `report_time` timestamp DEFAULT NULL COMMENT 'Snapshot time',
`ext_params` text DEFAULT NULL COMMENT 'Another fields will saved as JSON string, such as filePath, dbName, tableName, etc',
`status` int(4) DEFAULT '0' COMMENT 'Data source status',
`previous_status` int(4) DEFAULT '0' COMMENT 'Previous status',
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
index 787005e..5fe4bfe 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.web.controller.openapi;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
+import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
import org.apache.inlong.common.pojo.agent.TaskRequest;
import org.apache.inlong.common.pojo.agent.TaskResult;
import org.apache.inlong.manager.common.beans.Response;
@@ -35,6 +36,7 @@ import org.apache.inlong.manager.service.core.AgentHeartbeatService;
import org.apache.inlong.manager.service.core.AgentSysConfigService;
import org.apache.inlong.manager.service.core.AgentTaskService;
import org.apache.inlong.manager.service.core.ThirdPartyClusterService;
+import org.apache.inlong.manager.service.source.StreamSourceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
@@ -45,11 +47,13 @@ import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
-@RequestMapping("/openapi")
+@RequestMapping("/openapi/agent")
@Api(tags = "Agent Config")
public class AgentController {
@Autowired
+ private StreamSourceService sourceService;
+ @Autowired
private AgentTaskService agentTaskService;
@Autowired
private AgentSysConfigService agentSysConfigService;
@@ -59,50 +63,56 @@ public class AgentController {
private ThirdPartyClusterService thirdPartyClusterService;
@GetMapping("/getInLongManagerIp")
- @ApiOperation(value = "get inlong manager ip list")
+ @ApiOperation(value = "Get inlong manager ip list")
public Response<List<String>> getInLongManagerIp() {
return Response.success(thirdPartyClusterService.listClusterIpByType("inlong-openapi"));
}
@PostMapping("/getTask")
- @ApiOperation(value = "general fetch task")
+ @ApiOperation(value = "Get source task config")
public Response<TaskResult> getTask(@RequestBody TaskRequest taskRequest) {
return Response.success(agentTaskService.getAgentTask(taskRequest));
}
+ @PostMapping("/reportSnapshot")
+ @ApiOperation(value = "Report source task snapshot")
+ public Response<Boolean> reportSnapshot(TaskSnapshotRequest request) {
+ return Response.success(sourceService.reportSnapshot(request));
+ }
+
@Deprecated
@PostMapping("/fileAgent/getTaskConf")
- @ApiOperation(value = "fetch file access task")
+ @ApiOperation(value = "Get file task")
public Response<FileAgentTaskInfo> getFileAgentTask(@RequestBody FileAgentCommandInfo info) {
return Response.success(agentTaskService.getFileAgentTask(info));
}
@PostMapping("/fileAgent/confirmAgentIp")
- @ApiOperation(value = "confirm current agent ip")
+ @ApiOperation(value = "Confirm current agent ip")
public Response<String> confirmAgentIp(@RequestBody ConfirmAgentIpRequest info) {
return Response.success(agentTaskService.confirmAgentIp(info));
}
@PostMapping("/fileAgent/getAgentSysConf")
- @ApiOperation(value = "get agent system config")
+ @ApiOperation(value = "Get agent system config")
public Response<AgentSysConfig> getAgentSysConf(@RequestBody AgentSysconfRequest info) {
return Response.success(agentSysConfigService.getAgentSysConfig(info));
}
@PostMapping("/fileAgent/heartbeat")
- @ApiOperation(value = "agent heartbeat report")
+ @ApiOperation(value = "Report agent heartbeat")
public Response<String> heartbeat(@RequestBody AgentHeartbeatRequest info) {
return Response.success(heartbeatService.heartbeat(info));
}
@PostMapping("/fileAgent/checkAgentTaskConf")
- @ApiOperation(value = "agent data source comparison")
+ @ApiOperation(value = "Check agent source config")
public Response<List<FileAgentTaskConfig>> checkAgentTaskConf(@RequestBody CheckAgentTaskConfRequest info) {
return Response.success(agentTaskService.checkAgentTaskConf(info));
}
@PostMapping("/fileAgent/reportAgentStatus")
- @ApiOperation(value = "agent status report")
+ @ApiOperation(value = "Report agent status")
public Response<String> reportAgentStatus(@RequestBody AgentStatusReportRequest info) {
return Response.success(agentTaskService.reportAgentStatus(info));
}