You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/04/08 08:04:18 UTC
[incubator-inlong] branch master updated: [INLONG-3550] Support File source in client and server (#3572)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 268b3dbd7 [INLONG-3550] Support File source in client and server (#3572)
268b3dbd7 is described below
commit 268b3dbd782085fc245f67d5d2bb2bb70d205033
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Fri Apr 8 16:04:12 2022 +0800
[INLONG-3550] Support File source in client and server (#3572)
---
.../apache/inlong/agent/constant/JobConstants.java | 2 -
.../java/org/apache/inlong/agent/pojo/FileJob.java | 12 ++--
.../apache/inlong/agent/pojo/JobProfileDto.java | 20 +++---
.../inlong/agent/plugin/sinks/ProxySink.java | 70 +++++++--------------
.../manager/client/api/source/AgentFileSource.java | 59 +++++++++++++++++
.../manager/client/api/util/InlongParser.java | 40 +++++++-----
.../api/util/InlongStreamSourceTransfer.java | 59 ++++++++++++++++-
.../inlong/manager/common/enums/Constant.java | 12 ----
.../inlong/manager/common/enums/SourceType.java | 7 +++
.../pojo/source/binlog/BinlogSourceRequest.java | 3 +-
.../common/pojo/source/file/FileSourceDTO.java | 73 ++++++++++++++++++++++
.../pojo/source/file/FileSourceListResponse.java | 51 +++++++++++++++
.../common/pojo/source/file/FileSourceRequest.java | 56 +++++++++++++++++
.../pojo/source/file/FileSourceResponse.java | 51 +++++++++++++++
.../pojo/source/kafka/KafkaSourceRequest.java | 3 +-
.../dao/mapper/StreamSourceEntityMapper.java | 8 ++-
.../resources/mappers/StreamSourceEntityMapper.xml | 19 +++++-
.../service/core/impl/AgentServiceImpl.java | 31 ++++++---
.../source/AbstractStreamSourceOperation.java | 3 +-
.../service/source/StreamSourceServiceImpl.java | 2 +-
.../source/binlog/BinlogStreamSourceOperation.java | 5 +-
.../FileStreamSourceOperation.java} | 57 ++++++-----------
.../source/kafka/KafkaStreamSourceOperation.java | 5 +-
.../service/core/impl/AgentServiceTest.java | 6 +-
.../core/source/StreamSourceServiceTest.java | 14 ++---
.../web/controller/CommonDBServerController.java | 1 +
.../web/controller/CommonFileServerController.java | 1 +
27 files changed, 500 insertions(+), 170 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index 146dd0601..034e43f73 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -47,9 +47,7 @@ public class JobConstants extends CommonConstants {
public static final String JOB_DIR_FILTER_PATTERN = "job.fileJob.dir.pattern";
public static final String JOB_FILE_TIME_OFFSET = "job.fileJob.timeOffset";
public static final String JOB_FILE_MAX_WAIT = "job.fileJob.file.max.wait";
- public static final String JOB_ADDITION_STR = "job.fileJob.additionStr";
public static final String JOB_CYCLE_UNIT = "job.fileJob.cycleUnit";
- public static final String JOB_DIR_FILTER_PATH = "job.fileJob.dir.path";
//Binlog job
public static final String JOB_DATABASE_USER = "job.binlogJob.user";
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
index 733d88665..21c94d34b 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
@@ -27,15 +27,12 @@ public class FileJob {
private Dir dir;
private Thread thread;
private int id;
- private String pattern;
- private String cycleUnit;
private String timeOffset;
private String addictiveString;
@Data
public static class Dir {
- private String path;
private String pattern;
}
@@ -54,12 +51,13 @@ public class FileJob {
@Data
public static class FileJobTaskConfig {
- private String dataName;
- private String path;
- private int taskId;
private String pattern;
- private String cycleUnit;
+ // '1m' means one minute after, '-1m' means one minute before
+ // '1h' means one hour after, '-1h' means one hour before
+ // '1d' means one day after, '-1d' means one day before
+ // Null means from current timestamp
private String timeOffset;
+ //For example: a=b&c=b&e=f
private String additionalAttr;
}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 6a7587972..19a5f9dd1 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -17,11 +17,6 @@
package org.apache.inlong.agent.pojo;
-import static java.util.Objects.requireNonNull;
-import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
-import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
-import static org.apache.inlong.agent.constant.JobConstants.SYNC_SEND_OPEN;
-
import com.google.gson.Gson;
import lombok.Data;
import org.apache.inlong.agent.conf.AgentConfiguration;
@@ -29,6 +24,11 @@ import org.apache.inlong.agent.conf.TriggerProfile;
import org.apache.inlong.common.enums.TaskTypeEnum;
import org.apache.inlong.common.pojo.agent.DataConfig;
+import static java.util.Objects.requireNonNull;
+import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
+import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
+import static org.apache.inlong.agent.constant.JobConstants.SYNC_SEND_OPEN;
+
@Data
public class JobProfileDto {
@@ -92,26 +92,20 @@ public class JobProfileDto {
private static FileJob getFileJob(DataConfig dataConfigs) {
FileJob fileJob = new FileJob();
+ fileJob.setId(dataConfigs.getTaskId());
fileJob.setTrigger(DEFAULT_TRIGGER);
FileJob.FileJobTaskConfig fileJobTaskConfig = GSON.fromJson(dataConfigs.getExtParams(),
FileJob.FileJobTaskConfig.class);
FileJob.Dir dir = new FileJob.Dir();
- dir.setPattern(fileJobTaskConfig.getDataName());
- dir.setPath(fileJobTaskConfig.getPath());
+ dir.setPattern(fileJobTaskConfig.getPattern());
fileJob.setDir(dir);
-
- fileJob.setId(fileJobTaskConfig.getTaskId());
fileJob.setTimeOffset(fileJobTaskConfig.getTimeOffset());
if (!fileJobTaskConfig.getAdditionalAttr().isEmpty()) {
fileJob.setAddictiveString(fileJobTaskConfig.getAdditionalAttr());
}
- if (fileJobTaskConfig.getCycleUnit() != null) {
- fileJob.setCycleUnit(fileJobTaskConfig.getCycleUnit());
- }
-
return fileJob;
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index 514887e7f..8d31a380f 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -17,43 +17,12 @@
package org.apache.inlong.agent.plugin.sinks;
-import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_AGENT_IP;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_ID;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_OCEANUS_BL;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_OCEANUS_F;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_SYNC;
-import static org.apache.inlong.agent.constant.JobConstants.PROXY_BATCH_FLUSH_INTERVAL;
-import static org.apache.inlong.agent.constant.JobConstants.PROXY_PACKAGE_MAX_SIZE;
-import static org.apache.inlong.agent.constant.JobConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS;
-import static org.apache.inlong.agent.constant.JobConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
-import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
-import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
-import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS;
-import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_ADDITION_STR;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_IP;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_RETRY;
-
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.CommonConstants;
-import org.apache.inlong.agent.message.ProxyMessage;
import org.apache.inlong.agent.message.EndMessage;
+import org.apache.inlong.agent.message.ProxyMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.MessageFilter;
@@ -62,6 +31,27 @@ import org.apache.inlong.agent.utils.AgentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_SYNC;
+import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
+import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
+import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
+import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
+import static org.apache.inlong.agent.constant.JobConstants.PROXY_BATCH_FLUSH_INTERVAL;
+import static org.apache.inlong.agent.constant.JobConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
+import static org.apache.inlong.agent.constant.JobConstants.PROXY_PACKAGE_MAX_SIZE;
+import static org.apache.inlong.agent.constant.JobConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS;
+
public class ProxySink extends AbstractSink {
private static final Logger LOGGER = LoggerFactory.getLogger(ProxySink.class);
@@ -212,22 +202,6 @@ public class ProxySink extends AbstractSink {
}
}
- private HashMap<String, String> parseAttrFromJobProfile(JobProfile jobProfile) {
- HashMap<String, String> attr = new HashMap<>();
- String additionStr = jobProfile.get(JOB_ADDITION_STR, "");
- if (!additionStr.isEmpty()) {
- Map<String, String> addAttr = AgentUtils.getAdditionAttr(additionStr);
- attr.putAll(addAttr);
- }
- if (jobProfile.getBoolean(JOB_RETRY, false)) {
- // used for online compute filter consume
- attr.put(PROXY_OCEANUS_F, PROXY_OCEANUS_BL);
- }
- attr.put(PROXY_KEY_ID, jobProfile.get(JOB_ID));
- attr.put(PROXY_KEY_AGENT_IP, jobProfile.get(JOB_IP));
- return attr;
- }
-
@Override
public void destroy() {
LOGGER.info("destroy sink which sink from source name {}", sourceName);
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AgentFileSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AgentFileSource.java
new file mode 100644
index 000000000..92953ab93
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AgentFileSource.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.source;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.client.api.DataFormat;
+import org.apache.inlong.manager.client.api.StreamSource;
+import org.apache.inlong.manager.common.enums.SourceType;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+@AllArgsConstructor
+@NoArgsConstructor
+@ApiModel("Base configuration for Agent file collection")
+public class AgentFileSource extends StreamSource {
+
+ @ApiModelProperty(value = "DataSource type", required = true)
+ private SourceType sourceType = SourceType.FILE;
+
+ @ApiModelProperty("SyncType for Kafka")
+ private SyncType syncType = SyncType.INCREMENT;
+
+ @ApiModelProperty("Data format type for kafka")
+ private DataFormat dataFormat = DataFormat.NONE;
+
+ @ApiModelProperty(value = "Agent IP address", required = true)
+ private String ip;
+
+ @ApiModelProperty(value = "Path regex pattern for file, such as /a/b/*.txt", required = true)
+ private String pattern;
+
+ @ApiModelProperty("TimeOffset for collection, "
+ + "'1m' means from one minute after, '-1m' means from one minute before, "
+ + "'1h' means from one hour after, '-1h' means from one minute before"
+ + "'1d' means from one day after, '-1d' means from one minute before"
+ + "Null means from current timestamp")
+ private String timeOffset;
+
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
index 4042fa1ff..af45811cd 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
@@ -41,6 +41,8 @@ import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceListResponse;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
+import org.apache.inlong.manager.common.pojo.source.file.FileSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.file.FileSourceResponse;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
@@ -52,9 +54,6 @@ import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
import java.util.List;
-import static org.apache.inlong.manager.common.enums.SourceType.BINLOG;
-import static org.apache.inlong.manager.common.enums.SourceType.KAFKA;
-
/**
* Parser for Inlong entity
*/
@@ -136,6 +135,11 @@ public class InlongParser {
KafkaSourceResponse.class);
sourceResponses.add(kafkaSourceResponse);
break;
+ case FILE:
+ FileSourceResponse fileSourceResponse = GsonUtil.fromJson(sourceJson.toString(),
+ FileSourceResponse.class);
+ sourceResponses.add(fileSourceResponse);
+ break;
default:
throw new RuntimeException(String.format("Unsupport sourceType=%s for Inlong", sourceType));
}
@@ -187,18 +191,23 @@ public class InlongParser {
if (pageInfo.getList() != null && !pageInfo.getList().isEmpty()) {
SourceListResponse sourceListResponse = pageInfo.getList().get(0);
SourceType sourceType = SourceType.forType(sourceListResponse.getSourceType());
- if (sourceType == BINLOG) {
- return GsonUtil.fromJson(pageInfoJson,
- new TypeToken<PageInfo<BinlogSourceListResponse>>() {
- }.getType());
+ switch (sourceType) {
+ case BINLOG:
+ return GsonUtil.fromJson(pageInfoJson,
+ new TypeToken<PageInfo<BinlogSourceListResponse>>() {
+ }.getType());
+ case KAFKA:
+ return GsonUtil.fromJson(pageInfoJson,
+ new TypeToken<PageInfo<KafkaSourceListResponse>>() {
+ }.getType());
+ case FILE:
+ return GsonUtil.fromJson(pageInfoJson,
+ new TypeToken<PageInfo<FileSourceListResponse>>() {
+ }.getType());
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unsupported sourceType=%s for Inlong", sourceType));
}
- if (sourceType == KAFKA) {
- return GsonUtil.fromJson(pageInfoJson,
- new TypeToken<PageInfo<KafkaSourceListResponse>>() {
- }.getType());
- }
- throw new IllegalArgumentException(
- String.format("Unsupported sourceType=%s for Inlong", sourceType));
} else {
return new PageInfo<>();
}
@@ -221,7 +230,8 @@ public class InlongParser {
if (mqExtInfo != null && mqExtInfo.get(MIDDLEWARE_TYPE) != null) {
MQType mqType = MQType.forType(mqExtInfo.get(MIDDLEWARE_TYPE).getAsString());
if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
- InlongGroupPulsarInfo pulsarInfo = GsonUtil.fromJson(mqExtInfo.toString(), InlongGroupPulsarInfo.class);
+ InlongGroupPulsarInfo pulsarInfo = GsonUtil.fromJson(mqExtInfo.toString(),
+ InlongGroupPulsarInfo.class);
groupApproveInfo.setAckQuorum(pulsarInfo.getAckQuorum());
groupApproveInfo.setEnsemble(pulsarInfo.getEnsemble());
groupApproveInfo.setWriteQuorum(pulsarInfo.getWriteQuorum());
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index 115eb37da..93f513037 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -26,6 +26,7 @@ import org.apache.inlong.manager.client.api.StreamSource;
import org.apache.inlong.manager.client.api.StreamSource.State;
import org.apache.inlong.manager.client.api.StreamSource.SyncType;
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
+import org.apache.inlong.manager.client.api.source.AgentFileSource;
import org.apache.inlong.manager.client.api.source.KafkaSource;
import org.apache.inlong.manager.client.api.source.MySQLBinlogSource;
import org.apache.inlong.manager.common.enums.SourceType;
@@ -35,6 +36,9 @@ import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceListResponse;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceRequest;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
+import org.apache.inlong.manager.common.pojo.source.file.FileSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.file.FileSourceRequest;
+import org.apache.inlong.manager.common.pojo.source.file.FileSourceResponse;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceRequest;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
@@ -54,6 +58,8 @@ public class InlongStreamSourceTransfer {
return createKafkaSourceRequest((KafkaSource) streamSource, streamInfo);
case BINLOG:
return createBinlogSourceRequest((MySQLBinlogSource) streamSource, streamInfo);
+ case FILE:
+ return createFileSourceRequest((AgentFileSource) streamSource, streamInfo);
default:
throw new RuntimeException(String.format("Unsupported source=%s for Inlong", sourceType));
}
@@ -68,6 +74,9 @@ public class InlongStreamSourceTransfer {
if (sourceType == SourceType.BINLOG && sourceResponse instanceof BinlogSourceResponse) {
return parseMySQLBinlogSource((BinlogSourceResponse) sourceResponse);
}
+ if (sourceType == SourceType.FILE && sourceResponse instanceof FileSourceResponse) {
+ return parseAgentFileSource((FileSourceResponse) sourceResponse);
+ }
throw new IllegalArgumentException(String.format("Unsupported source type : %s for Inlong", sourceType));
}
@@ -80,6 +89,9 @@ public class InlongStreamSourceTransfer {
if (sourceType == SourceType.BINLOG && sourceListResponse instanceof BinlogSourceListResponse) {
return parseMySQLBinlogSource((BinlogSourceListResponse) sourceListResponse);
}
+ if (sourceType == SourceType.FILE && sourceListResponse instanceof FileSourceListResponse) {
+ return parseAgentFileSource((FileSourceListResponse) sourceListResponse);
+ }
throw new IllegalArgumentException(String.format("Unsupported source type : %s for Inlong", sourceType));
}
@@ -176,12 +188,34 @@ public class InlongStreamSourceTransfer {
return binlogSource;
}
+ private static AgentFileSource parseAgentFileSource(FileSourceResponse response) {
+ AgentFileSource fileSource = new AgentFileSource();
+ fileSource.setSourceName(response.getSourceName());
+ fileSource.setState(State.parseByStatus(response.getStatus()));
+ fileSource.setDataFormat(DataFormat.NONE);
+ fileSource.setPattern(response.getPattern());
+ fileSource.setIp(response.getIp());
+ fileSource.setTimeOffset(response.getTimeOffset());
+ return fileSource;
+ }
+
+ private static AgentFileSource parseAgentFileSource(FileSourceListResponse response) {
+ AgentFileSource fileSource = new AgentFileSource();
+ fileSource.setSourceName(response.getSourceName());
+ fileSource.setState(State.parseByStatus(response.getStatus()));
+ fileSource.setDataFormat(DataFormat.NONE);
+ fileSource.setPattern(response.getPattern());
+ fileSource.setIp(response.getIp());
+ fileSource.setTimeOffset(response.getTimeOffset());
+ return fileSource;
+ }
+
private static KafkaSourceRequest createKafkaSourceRequest(KafkaSource kafkaSource, InlongStreamInfo stream) {
KafkaSourceRequest sourceRequest = new KafkaSourceRequest();
sourceRequest.setSourceName(kafkaSource.getSourceName());
sourceRequest.setInlongGroupId(stream.getInlongGroupId());
sourceRequest.setInlongStreamId(stream.getInlongStreamId());
- sourceRequest.setSourceType(kafkaSource.getSourceType().name());
+ sourceRequest.setSourceType(kafkaSource.getSourceType().getType());
sourceRequest.setAgentIp(kafkaSource.getAgentIp());
sourceRequest.setBootstrapServers(kafkaSource.getBootstrapServers());
sourceRequest.setTopic(kafkaSource.getTopic());
@@ -204,8 +238,7 @@ public class InlongStreamSourceTransfer {
sourceRequest.setSourceName(binlogSource.getSourceName());
sourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
sourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
- sourceRequest.setSourceType(binlogSource.getSourceType().name());
- sourceRequest.setAgentIp(binlogSource.getAgentIp());
+ sourceRequest.setSourceType(binlogSource.getSourceType().getType());
DefaultAuthentication authentication = binlogSource.getAuthentication();
sourceRequest.setUser(authentication.getUserName());
sourceRequest.setPassword(authentication.getPassword());
@@ -228,4 +261,24 @@ public class InlongStreamSourceTransfer {
sourceRequest.setTimestampFormatStandard(binlogSource.getTimestampFormatStandard());
return sourceRequest;
}
+
+ private static FileSourceRequest createFileSourceRequest(AgentFileSource fileSource, InlongStreamInfo streamInfo) {
+ FileSourceRequest sourceRequest = new FileSourceRequest();
+ sourceRequest.setSourceName(fileSource.getSourceName());
+ sourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
+ sourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
+ sourceRequest.setSourceType(fileSource.getSourceType().getType());
+ if (StringUtils.isEmpty(fileSource.getIp())) {
+ throw new IllegalArgumentException(
+ String.format("AgentIp should not be null for fileSource=%s", fileSource));
+ }
+ sourceRequest.setIp(fileSource.getIp());
+ if (StringUtils.isEmpty(fileSource.getPattern())) {
+ throw new IllegalArgumentException(
+ String.format("SourcePattern should not be null for fileSource=%s", fileSource));
+ }
+ sourceRequest.setPattern(fileSource.getPattern());
+ sourceRequest.setTimeOffset(fileSource.getTimeOffset());
+ return sourceRequest;
+ }
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
index f614460d9..44c185d71 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
@@ -28,14 +28,6 @@ public class Constant {
public static final String URL_SPLITTER = ",";
public static final String HOST_SPLITTER = ":";
- public static final String SOURCE_FILE = "FILE";
-
- public static final String SOURCE_SQL = "SQL";
-
- public static final String SOURCE_BINLOG = "BINLOG";
-
- public static final String SOURCE_KAFKA = "KAFKA";
-
public static final String SINK_HIVE = "HIVE";
public static final String SINK_KAFKA = "KAFKA";
@@ -77,10 +69,6 @@ public class Constant {
public static final String REQUEST_IS_EMPTY = "request is empty";
- public static final String SOURCE_TYPE_IS_EMPTY = "sourceType is empty";
-
- public static final String SOURCE_TYPE_NOT_SAME = "Expected sourceType is %s, but found %s";
-
public static final String SINK_TYPE_IS_EMPTY = "sinkType is empty";
public static final String SINK_TYPE_NOT_SAME = "Expected sinkType is %s, but found %s";
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
index 19c17af6a..33ecdca8f 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
@@ -33,6 +33,13 @@ public enum SourceType {
BINLOG("BINLOG", TaskTypeEnum.BINLOG),
KAFKA("KAFKA", TaskTypeEnum.KAFKA);
+ public static final String SOURCE_FILE = "FILE";
+ public static final String SOURCE_SQL = "SQL";
+ public static final String SOURCE_BINLOG = "BINLOG";
+ public static final String SOURCE_KAFKA = "KAFKA";
+ public static final String SOURCE_TYPE_IS_EMPTY = "sourceType is empty";
+ public static final String SOURCE_TYPE_NOT_SAME = "Expected sourceType is %s, but found %s";
+
@Getter
private final String type;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
index 4af744106..f5b16d044 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
@@ -22,7 +22,6 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
@@ -34,7 +33,7 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@ApiModel(value = "Request of the binlog source info")
-@JsonTypeDefine(value = Constant.SOURCE_BINLOG)
+@JsonTypeDefine(value = SourceType.SOURCE_BINLOG)
public class BinlogSourceRequest extends SourceRequest {
@ApiModelProperty("Username of the DB server")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceDTO.java
new file mode 100644
index 000000000..7d5f2338a
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceDTO.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.file;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * File source information data transfer object
+ */
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+@Data
+public class FileSourceDTO {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ @ApiModelProperty("Agent IP address")
+ private String ip;
+
+ @ApiModelProperty("Path regex pattern for file, such as /a/b/*.txt")
+ private String pattern;
+
+ @ApiModelProperty("TimeOffset for collection, "
+ + "'1m' means from one minute after, '-1m' means from one minute before, "
+ + "'1h' means from one hour after, '-1h' means from one minute before"
+ + "'1d' means from one day after, '-1d' means from one minute before"
+ + "Null means from current timestamp")
+ private String timeOffset;
+
+ public static FileSourceDTO getFromRequest(@NotNull FileSourceRequest fileSourceRequest) {
+ return FileSourceDTO.builder()
+ .ip(fileSourceRequest.getIp())
+ .pattern(fileSourceRequest.getPattern())
+ .timeOffset(fileSourceRequest.getTimeOffset())
+ .build();
+ }
+
+ public static FileSourceDTO getFromJson(@NotNull String extParams) {
+ try {
+ OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ return OBJECT_MAPPER.readValue(extParams, FileSourceDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceListResponse.java
new file mode 100644
index 000000000..84871259e
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceListResponse.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.file;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+
+/**
+ * Response of File source list
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+@ApiModel("Response of File source paging list")
+public class FileSourceListResponse extends SourceListResponse {
+
+ @ApiModelProperty("Agent IP address")
+ private String ip;
+
+ @ApiModelProperty("Path regex pattern for file, such as /a/b/*.txt")
+ private String pattern;
+
+ @ApiModelProperty("TimeOffset for collection, "
+ + "'1m' means from one minute after, '-1m' means from one minute before, "
+ + "'1h' means from one hour after, '-1h' means from one minute before"
+ + "'1d' means from one day after, '-1d' means from one minute before"
+ + "Null means from current timestamp")
+ private String timeOffset;
+
+ public FileSourceListResponse() {
+ this.setSourceType(SourceType.FILE.getType());
+ }
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceRequest.java
new file mode 100644
index 000000000..11ec99bae
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceRequest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.file;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Request of File Source Info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Request of the kafka source info")
+@JsonTypeDefine(value = SourceType.SOURCE_FILE)
+public class FileSourceRequest extends SourceRequest {
+
+ @ApiModelProperty("Agent IP address")
+ private String ip;
+
+ @ApiModelProperty("Path regex pattern for file, such as /a/b/*.txt")
+ private String pattern;
+
+ @ApiModelProperty("TimeOffset for collection, "
+ + "'1m' means from one minute after, '-1m' means from one minute before, "
+ + "'1h' means from one hour after, '-1h' means from one minute before"
+ + "'1d' means from one day after, '-1d' means from one minute before"
+ + "Null means from current timestamp")
+ private String timeOffset;
+
+ public FileSourceRequest() {
+ this.setSourceType(SourceType.FILE.toString());
+ }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceResponse.java
new file mode 100644
index 000000000..d8c0e85a2
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceResponse.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.file;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+
+/**
+ * Response of the File source
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Response of the File source")
+public class FileSourceResponse extends SourceResponse {
+
+ @ApiModelProperty("Agent IP address")
+ private String ip;
+
+ @ApiModelProperty("Path regex pattern for file, such as /a/b/*.txt")
+ private String pattern;
+
+ @ApiModelProperty("TimeOffset for collection, "
+ + "'1m' means one minute before, '1h' means one hour before, '1d' means one day before, "
+ + "Null means from current timestamp")
+ private String timeOffset;
+
+ public FileSourceResponse() {
+ this.setSourceType(SourceType.FILE.getType());
+ }
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
index 0b6a23091..86e0168dc 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
@@ -22,7 +22,6 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
@@ -34,7 +33,7 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@ApiModel(value = "Request of the kafka source info")
-@JsonTypeDefine(value = Constant.SOURCE_KAFKA)
+@JsonTypeDefine(value = SourceType.SOURCE_KAFKA)
public class KafkaSourceRequest extends SourceRequest {
@ApiModelProperty("Kafka topic")
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 1cd75af2a..d8f7d2e36 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -57,7 +57,13 @@ public interface StreamSourceEntityMapper {
/**
* Query the tasks by the given status list.
*/
- List<StreamSourceEntity> selectByStatus(@Param("list") List<Integer> list);
+ List<StreamSourceEntity> selectByStatus(@Param("list") List<Integer> list, @Param("limit") int limit);
+
+ /**
+ * Query the tasks by the given status list and type List.
+ */
+ List<StreamSourceEntity> selectByStatusAndType(@Param("list") List<Integer> list,
+ @Param("sourceType") List<String> sourceTypes, @Param("limit") int limit);
/**
* Query the sources with status 20x by the given agent IP and agent UUID.
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index fddab4050..666bb6029 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -156,7 +156,24 @@
<foreach item="item" index="index" collection="list" open="(" close=")" separator=",">
#{item}
</foreach>
- limit 2
+ limit #{limit, jdbcType=INTEGER}
+ </where>
+ </select>
+ <select id="selectByStatusAndType" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from stream_source
+ <where>
+ is_deleted = 0
+ and status in
+ <foreach item="item" index="index" collection="list" open="(" close=")" separator=",">
+ #{item}
+ </foreach>
+ and source_type in
+ <foreach item="item" index="index" collection="sourceType" open="(" close=")" separator=",">
+ #{item}
+ </foreach>
+ limit #{limit, jdbcType=INTEGER}
</where>
</select>
<select id="selectByStatusAndIp" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 62723bb1a..feb3a1e40 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -32,13 +32,12 @@ import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
import org.apache.inlong.manager.common.enums.SourceState;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.source.file.FileSourceDTO;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogRequest;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.DataSourceCmdConfigEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
-import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper;
-import org.apache.inlong.manager.dao.mapper.SourceFileDetailEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.service.core.AgentService;
import org.apache.inlong.manager.service.core.StreamConfigLogService;
@@ -65,18 +64,15 @@ public class AgentServiceImpl implements AgentService {
private static final int UNISSUED_STATUS = 2;
private static final int ISSUED_STATUS = 3;
private static final int MODULUS_100 = 100;
+ private static final int TASK_FETCH_SIZE = 2;
@Autowired
private StreamSourceEntityMapper sourceMapper;
@Autowired
private SourceSnapshotOperation snapshotOperation;
@Autowired
- private SourceFileDetailEntityMapper fileDetailMapper;
- @Autowired
private DataSourceCmdConfigEntityMapper sourceCmdConfigMapper;
@Autowired
- private InlongStreamFieldEntityMapper streamFieldMapper;
- @Autowired
private InlongStreamEntityMapper streamMapper;
@Autowired
private StreamConfigLogService streamConfigLogService;
@@ -154,6 +150,8 @@ public class AgentServiceImpl implements AgentService {
throw new BusinessException("agent request or agent ip was empty, just return");
}
+ final String agentIp = request.getAgentIp();
+ final String uuid = request.getUuid();
// Query the tasks that needed to add or active - without agentIp and uuid
List<Integer> needAddStatusList;
if (PullJobTypeEnum.NEVER != PullJobTypeEnum.getPullJobType(request.getPullJobType())) {
@@ -163,10 +161,11 @@ public class AgentServiceImpl implements AgentService {
LOGGER.warn("agent pull job type is [NEVER], just pull to be active tasks");
needAddStatusList = Collections.singletonList(SourceState.TO_BE_ISSUED_ACTIVE.getCode());
}
- List<StreamSourceEntity> entityList = sourceMapper.selectByStatus(needAddStatusList);
+ List<String> sourceTypes = Lists.newArrayList(SourceType.BINLOG.getType(), SourceType.KAFKA.getType(),
+ SourceType.SQL.getType());
+ List<StreamSourceEntity> entityList = sourceMapper.selectByStatusAndType(needAddStatusList, sourceTypes,
+ TASK_FETCH_SIZE);
- String agentIp = request.getAgentIp();
- String uuid = request.getUuid();
// Query other tasks by agentIp and uuid - not included status with TO_BE_ISSUED_ADD and TO_BE_ISSUED_ACTIVE
List<Integer> statusList = Arrays.asList(SourceState.TO_BE_ISSUED_DELETE.getCode(),
SourceState.TO_BE_ISSUED_RETRY.getCode(), SourceState.TO_BE_ISSUED_BACKTRACK.getCode(),
@@ -175,6 +174,15 @@ public class AgentServiceImpl implements AgentService {
List<StreamSourceEntity> needIssuedList = sourceMapper.selectByStatusAndIp(statusList, agentIp, uuid);
entityList.addAll(needIssuedList);
+ List<StreamSourceEntity> fileEntityList = sourceMapper.selectByStatusAndType(needAddStatusList,
+ Lists.newArrayList(SourceType.FILE.getType()), TASK_FETCH_SIZE * 2);
+ for (StreamSourceEntity fileEntity : fileEntityList) {
+ FileSourceDTO fileSourceDTO = FileSourceDTO.getFromJson(fileEntity.getExtParams());
+ if (agentIp.equals(fileSourceDTO.getIp())) {
+ entityList.add(fileEntity);
+ }
+ }
+
List<DataConfig> dataConfigs = Lists.newArrayList();
for (StreamSourceEntity entity : entityList) {
// Change 20x to 30x
@@ -208,6 +216,11 @@ public class AgentServiceImpl implements AgentService {
return TaskResult.builder().dataConfigs(dataConfigs).cmdConfigs(cmdConfigs).build();
}
+ /**
+ * If status of source is failed, record.
+ *
+ * @param entity
+ */
private void logFailedStreamSource(StreamSourceEntity entity) {
InlongStreamConfigLogRequest request = new InlongStreamConfigLogRequest();
request.setInlongGroupId(entity.getInlongGroupId());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
index 8c1dbbc0b..7ab709784 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
@@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupState;
import org.apache.inlong.manager.common.enums.SourceState;
+import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
@@ -109,7 +110,7 @@ public abstract class AbstractStreamSourceOperation implements StreamSourceOpera
Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
String existType = entity.getSourceType();
Preconditions.checkTrue(getSourceType().equals(existType),
- String.format(Constant.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
+ String.format(SourceType.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
return this.getFromEntity(entity, this::getResponse);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 40bf56b35..69b4c10b8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -295,7 +295,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
String streamId = request.getInlongStreamId();
Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
String sourceType = request.getSourceType();
- Preconditions.checkNotNull(sourceType, Constant.SOURCE_TYPE_IS_EMPTY);
+ Preconditions.checkNotNull(sourceType, SourceType.SOURCE_TYPE_IS_EMPTY);
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogStreamSourceOperation.java
index 6531900ba..2c0d296c6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogStreamSourceOperation.java
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageInfo;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -57,7 +56,7 @@ public class BinlogStreamSourceOperation extends AbstractStreamSourceOperation {
@Override
protected String getSourceType() {
- return Constant.SOURCE_BINLOG;
+ return SourceType.BINLOG.getType();
}
@Override
@@ -93,7 +92,7 @@ public class BinlogStreamSourceOperation extends AbstractStreamSourceOperation {
}
String existType = entity.getSourceType();
Preconditions.checkTrue(getSourceType().equals(existType),
- String.format(Constant.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
+ String.format(SourceType.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
BinlogSourceDTO dto = BinlogSourceDTO.getFromJson(entity.getExtParams());
CommonBeanUtils.copyProperties(entity, result, true);
CommonBeanUtils.copyProperties(dto, result, true);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileStreamSourceOperation.java
similarity index 64%
copy from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaStreamSourceOperation.java
copy to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileStreamSourceOperation.java
index a4562ee17..29e1e4938 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileStreamSourceOperation.java
@@ -15,23 +15,17 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.source.kafka;
+package org.apache.inlong.manager.service.source.file;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.github.pagehelper.Page;
-import com.github.pagehelper.PageInfo;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
-import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceDTO;
-import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse;
-import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceRequest;
-import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
+import org.apache.inlong.manager.common.pojo.source.file.FileSourceDTO;
+import org.apache.inlong.manager.common.pojo.source.file.FileSourceRequest;
+import org.apache.inlong.manager.common.pojo.source.file.FileSourceResponse;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
@@ -41,48 +35,37 @@ import org.springframework.stereotype.Service;
import java.util.function.Supplier;
-/**
- * kafka stream source operation.
- */
@Service
-public class KafkaStreamSourceOperation extends AbstractStreamSourceOperation {
+public class FileStreamSourceOperation extends AbstractStreamSourceOperation {
@Autowired
private ObjectMapper objectMapper;
@Override
- public Boolean accept(SourceType sourceType) {
- return SourceType.KAFKA == sourceType;
+ protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) {
+ FileSourceRequest sourceRequest = (FileSourceRequest) request;
+ CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true);
+ try {
+ FileSourceDTO dto = FileSourceDTO.getFromRequest(sourceRequest);
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ }
}
@Override
protected String getSourceType() {
- return Constant.SOURCE_KAFKA;
+ return SourceType.SOURCE_FILE;
}
@Override
protected SourceResponse getResponse() {
- return new KafkaSourceResponse();
+ return new FileSourceResponse();
}
@Override
- public PageInfo<? extends SourceListResponse> getPageInfo(Page<StreamSourceEntity> entityPage) {
- if (CollectionUtils.isEmpty(entityPage)) {
- return new PageInfo<>();
- }
- return entityPage.toPageInfo(entity -> this.getFromEntity(entity, KafkaSourceListResponse::new));
- }
-
- @Override
- protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) {
- KafkaSourceRequest sourceRequest = (KafkaSourceRequest) request;
- CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true);
- try {
- KafkaSourceDTO dto = KafkaSourceDTO.getFromRequest(sourceRequest);
- targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
- } catch (Exception e) {
- throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
- }
+ public Boolean accept(SourceType sourceType) {
+ return sourceType == SourceType.FILE;
}
@Override
@@ -93,8 +76,8 @@ public class KafkaStreamSourceOperation extends AbstractStreamSourceOperation {
}
String existType = entity.getSourceType();
Preconditions.checkTrue(getSourceType().equals(existType),
- String.format(Constant.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
- KafkaSourceDTO dto = KafkaSourceDTO.getFromJson(entity.getExtParams());
+ String.format(SourceType.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
+ FileSourceDTO dto = FileSourceDTO.getFromJson(entity.getExtParams());
CommonBeanUtils.copyProperties(entity, result, true);
CommonBeanUtils.copyProperties(dto, result, true);
return result;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaStreamSourceOperation.java
index a4562ee17..f54a3a782 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaStreamSourceOperation.java
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageInfo;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -57,7 +56,7 @@ public class KafkaStreamSourceOperation extends AbstractStreamSourceOperation {
@Override
protected String getSourceType() {
- return Constant.SOURCE_KAFKA;
+ return SourceType.KAFKA.getType();
}
@Override
@@ -93,7 +92,7 @@ public class KafkaStreamSourceOperation extends AbstractStreamSourceOperation {
}
String existType = entity.getSourceType();
Preconditions.checkTrue(getSourceType().equals(existType),
- String.format(Constant.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
+ String.format(SourceType.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
KafkaSourceDTO dto = KafkaSourceDTO.getFromJson(entity.getExtParams());
CommonBeanUtils.copyProperties(entity, result, true);
CommonBeanUtils.copyProperties(dto, result, true);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
index 003936bda..e387afef2 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.service.core.impl;
import org.apache.inlong.common.pojo.agent.TaskSnapshotMessage;
import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceRequest;
import org.apache.inlong.manager.service.ServiceBaseTest;
import org.apache.inlong.manager.service.core.AgentService;
@@ -49,7 +49,7 @@ public class AgentServiceTest extends ServiceBaseTest {
BinlogSourceRequest sourceInfo = new BinlogSourceRequest();
sourceInfo.setInlongGroupId(globalGroupId);
sourceInfo.setInlongStreamId(globalStreamId);
- sourceInfo.setSourceType(Constant.SOURCE_BINLOG);
+ sourceInfo.setSourceType(SourceType.BINLOG.getType());
return sourceService.save(sourceInfo, globalOperator);
}
@@ -70,7 +70,7 @@ public class AgentServiceTest extends ServiceBaseTest {
Boolean result = agentService.reportSnapshot(request);
Assert.assertTrue(result);
- sourceService.delete(id, Constant.SOURCE_BINLOG, globalOperator);
+ sourceService.delete(id, SourceType.BINLOG.getType(), globalOperator);
}
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
index 4b84ecad0..38e14ac92 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.core.source;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceRequest;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
@@ -51,7 +51,7 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
sourceInfo.setInlongGroupId(globalGroupId);
sourceInfo.setInlongStreamId(globalStreamId);
sourceInfo.setSourceName(sourceName);
- sourceInfo.setSourceType(Constant.SOURCE_BINLOG);
+ sourceInfo.setSourceType(SourceType.BINLOG.getType());
return sourceService.save(sourceInfo, globalOperator);
}
@@ -60,7 +60,7 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
Integer id = this.saveSource();
Assert.assertNotNull(id);
- boolean result = sourceService.delete(id, Constant.SOURCE_BINLOG, globalOperator);
+ boolean result = sourceService.delete(id, SourceType.BINLOG.getType(), globalOperator);
Assert.assertTrue(result);
}
@@ -68,16 +68,16 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
public void testListByIdentifier() {
Integer id = this.saveSource();
- SourceResponse source = sourceService.get(id, Constant.SOURCE_BINLOG);
+ SourceResponse source = sourceService.get(id, SourceType.BINLOG.getType());
Assert.assertEquals(globalGroupId, source.getInlongGroupId());
- sourceService.delete(id, Constant.SOURCE_BINLOG, globalOperator);
+ sourceService.delete(id, SourceType.BINLOG.getType(), globalOperator);
}
@Test
public void testGetAndUpdate() {
Integer id = this.saveSource();
- SourceResponse response = sourceService.get(id, Constant.SOURCE_BINLOG);
+ SourceResponse response = sourceService.get(id, SourceType.BINLOG.getType());
Assert.assertEquals(globalGroupId, response.getInlongGroupId());
BinlogSourceResponse binlogResponse = (BinlogSourceResponse) response;
@@ -86,7 +86,7 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
boolean result = sourceService.update(request, globalOperator);
Assert.assertTrue(result);
- sourceService.delete(id, Constant.SOURCE_BINLOG, globalOperator);
+ sourceService.delete(id, SourceType.BINLOG.getType(), globalOperator);
}
}
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/CommonDBServerController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/CommonDBServerController.java
index 37e1ceb0d..55f60ab0f 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/CommonDBServerController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/CommonDBServerController.java
@@ -66,6 +66,7 @@ import org.springframework.web.multipart.MultipartFile;
@RestController
@RequestMapping("commonserver/db")
@Api(tags = "Common Server - DB")
+@Deprecated
public class CommonDBServerController {
private static final Logger LOGGER = LoggerFactory.getLogger(CommonDBServerController.class);
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/CommonFileServerController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/CommonFileServerController.java
index 09e2cb5b4..d1164b1d4 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/CommonFileServerController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/CommonFileServerController.java
@@ -67,6 +67,7 @@ import org.springframework.web.multipart.MultipartFile;
@RestController
@RequestMapping("commonserver/file")
@Api(tags = "Common Server - File")
+@Deprecated
public class CommonFileServerController {
private static final Logger LOGGER = LoggerFactory.getLogger(CommonFileServerController.class);