You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/08/08 02:33:42 UTC
[inlong] branch master updated: [INLONG-5382][Manager][Agent] Optimized the file collection configuration (#5383)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1915e9e90 [INLONG-5382][Manager][Agent] Optimized the file collection configuration (#5383)
1915e9e90 is described below
commit 1915e9e902264c2abcf5987eb12e4fdf47abd3d0
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Mon Aug 8 10:33:38 2022 +0800
[INLONG-5382][Manager][Agent] Optimized the file collection configuration (#5383)
---
.../apache/inlong/agent/constant/JobConstants.java | 1 +
.../java/org/apache/inlong/agent/pojo/FileJob.java | 14 ++++++++++-
.../apache/inlong/agent/pojo/JobProfileDto.java | 1 +
.../manager/pojo/source/file/FileSource.java | 21 +++++++++++++++++
.../manager/pojo/source/file/FileSourceDTO.java | 27 ++++++++++++++++++++++
.../pojo/source/file/FileSourceRequest.java | 21 +++++++++++++++++
.../service/core/impl/AgentServiceImpl.java | 19 ++++++++++++++-
7 files changed, 102 insertions(+), 2 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index 9ca8b4d0f..27330859d 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -54,6 +54,7 @@ public class JobConstants extends CommonConstants {
public static final String JOB_FILE_LINE_END_PATTERN = "job.fileJob.line.endPattern";
public static final String JOB_FILE_CONTENT_COLLECT_TYPE = "job.fileJob.contentCollectType";
public static final String JOB_FILE_META_ENV_LIST = "job.fileJob.envList";
+ public static final String JOB_FILE_DATA_SOURCE_COLUMN_SEPARATOR = "job.fileJob.dataSeparator";
//Binlog job
public static final String JOB_DATABASE_USER = "job.binlogJob.user";
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
index 2780366b3..a8bbfceb4 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
@@ -43,6 +43,8 @@ public class FileJob {
private List<Map<String, String>> metaFields;
+ private String dataSeparator;
+
@Data
public static class Dir {
@@ -83,11 +85,21 @@ public class FileJob {
private String lineEndPattern;
+ // Type of file content, for example: FULL, INCREMENT
private String contentCollectType;
+ // File needs to collect environment information, for example: kubernetes
private String envList;
-
+ // Metadata of data, for example:
+ // [{data:field1,field2},{kubernetes:namespace,labels,name,uuid}] and so on
private List<Map<String, String>> metaFields;
+ // Type of data result for column separator
+ // CSV format, set this parameter to a custom separator: , | :
+ // Json format, set this parameter to json
+ private String dataContentStyle;
+
+ // Column separator of data source
+ private String dataSeparator;
}
}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 02beda28a..0679a384f 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -106,6 +106,7 @@ public class JobProfileDto {
fileJob.setDir(dir);
fileJob.setCollectType(fileJobTaskConfig.getCollectType());
fileJob.setContentCollectType(fileJobTaskConfig.getContentCollectType());
+ fileJob.setDataSeparator(fileJobTaskConfig.getDataSeparator());
if (fileJobTaskConfig.getTimeOffset() != null) {
fileJob.setTimeOffset(fileJobTaskConfig.getTimeOffset());
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java
index d01c3e8e3..61577bf58 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java
@@ -30,6 +30,9 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
+import java.util.List;
+import java.util.Map;
+
/**
* File source info
*/
@@ -58,6 +61,24 @@ public class FileSource extends StreamSource {
+ "Null or blank means from current timestamp")
private String timeOffset;
+ @ApiModelProperty("Line end regex pattern, for example: &end&")
+ private String lineEndPattern;
+
+ @ApiModelProperty("Type of file content, for example: FULL, INCREMENT")
+ private String contentCollectType;
+
+ @ApiModelProperty("File needs to collect environment information, for example: kubernetes")
+ private String envList;
+
+ @ApiModelProperty("Metadata of data, for example: "
+ + "[{data:field1,field2},{kubernetes:namespace,labels,name,uuid}] and so on")
+ private List<Map<String, String>> metaFields;
+
+ @ApiModelProperty(" Type of data result for column separator"
+ + " CSV format, set this parameter to a custom separator: , | : "
+ + " Json format, set this parameter to json ")
+ private String dataContentStyle;
+
public FileSource() {
this.setSourceType(SourceType.FILE);
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
index 21b186767..f481b77d4 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
@@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import javax.validation.constraints.NotNull;
+import java.util.List;
import java.util.Map;
/**
@@ -60,11 +61,37 @@ public class FileSourceDTO {
@ApiModelProperty("Properties for File")
private Map<String, Object> properties;
+ @ApiModelProperty("Line end regex pattern, for example: &end&")
+ private String lineEndPattern;
+
+ @ApiModelProperty("Type of file content, for example: FULL, INCREMENT")
+ private String contentCollectType;
+
+ @ApiModelProperty("File needs to collect environment information, for example: kubernetes")
+ private String envList;
+
+ @ApiModelProperty("Metadata of data, for example: [{data:field1,field2},"
+ + "{kubernetes:namespace,labels,name,uuid}] and so on")
+ private List<Map<String, String>> metaFields;
+
+ @ApiModelProperty(" Type of data result for column separator"
+ + " CSV format, set this parameter to a custom separator: , | : "
+ + " Json format, set this parameter to json ")
+ private String dataContentStyle;
+
+ @ApiModelProperty("Column separator of data source ")
+ private String dataSeparator;
+
public static FileSourceDTO getFromRequest(@NotNull FileSourceRequest fileSourceRequest) {
return FileSourceDTO.builder()
.clusterTag(fileSourceRequest.getClusterTag())
.ip(fileSourceRequest.getIp())
.pattern(fileSourceRequest.getPattern())
+ .lineEndPattern(fileSourceRequest.getLineEndPattern())
+ .contentCollectType(fileSourceRequest.getContentCollectType())
+ .envList(fileSourceRequest.getEnvList())
+ .dataContentStyle(fileSourceRequest.getDataContentStyle())
+ .metaFields(fileSourceRequest.getMetaFields())
.timeOffset(fileSourceRequest.getTimeOffset())
.properties(fileSourceRequest.getProperties())
.build();
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java
index 39d45eab6..c68bce85e 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java
@@ -27,6 +27,9 @@ import org.apache.inlong.manager.common.enums.DataFormat;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.source.SourceRequest;
+import java.util.List;
+import java.util.Map;
+
/**
* File source request
*/
@@ -53,6 +56,24 @@ public class FileSourceRequest extends SourceRequest {
+ "Null or blank means from current timestamp")
private String timeOffset;
+ @ApiModelProperty("Line end regex pattern, for example: &end&")
+ private String lineEndPattern;
+
+ @ApiModelProperty("Type of file content, for example: FULL, INCREMENT")
+ private String contentCollectType;
+
+ @ApiModelProperty("File needs to collect environment information, for example: kubernetes")
+ private String envList;
+
+ @ApiModelProperty("Metadata of data, for example: "
+ + "[{data:field1,field2},{kubernetes:namespace,labels,name,uuid}] and so on")
+ private List<Map<String, String>> metaFields;
+
+ @ApiModelProperty(" Type of data result for column separator"
+ + " CSV format, set this parameter to a custom separator: , | : "
+ + " Json format, set this parameter to json ")
+ private String dataContentStyle;
+
public FileSourceRequest() {
this.setSourceType(SourceType.FILE);
this.setSerializationType(DataFormat.CSV.getName());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 1d001e107..06185049c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -311,7 +311,6 @@ public class AgentServiceImpl implements AgentService {
dataConfig.setTaskType(getTaskType(entity));
dataConfig.setTaskName(entity.getSourceName());
dataConfig.setSnapshot(entity.getSnapshot());
- dataConfig.setExtParams(entity.getExtParams());
dataConfig.setVersion(entity.getVersion());
String groupId = entity.getInlongGroupId();
@@ -319,15 +318,33 @@ public class AgentServiceImpl implements AgentService {
dataConfig.setInlongGroupId(groupId);
dataConfig.setInlongStreamId(streamId);
InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
+ String extParams = entity.getExtParams();
if (streamEntity != null) {
dataConfig.setSyncSend(streamEntity.getSyncSend());
+ if (SourceType.FILE.equalsIgnoreCase(streamEntity.getDataType())) {
+ String dataSeparator = streamEntity.getDataSeparator();
+ extParams = null != dataSeparator ? getExtParams(extParams, dataSeparator) : extParams;
+ }
} else {
dataConfig.setSyncSend(0);
LOGGER.warn("set syncSend=[0] as the stream not exists for groupId={}, streamId={}", groupId, streamId);
}
+ dataConfig.setExtParams(extParams);
return dataConfig;
}
+ private String getExtParams(String extParams, String dataSeparator) {
+ if (Objects.isNull(extParams)) {
+ return null;
+ }
+ FileSourceDTO fileSourceDTO = JsonUtils.parseObject(extParams, FileSourceDTO.class);
+ if (Objects.nonNull(fileSourceDTO)) {
+ fileSourceDTO.setDataSeparator(dataSeparator);
+ return JsonUtils.toJsonString(fileSourceDTO);
+ }
+ return extParams;
+ }
+
/**
* Get the Task type from the stream source entity.
*