You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/08/08 02:33:42 UTC

[inlong] branch master updated: [INLONG-5382][Manager][Agent] Optimized the file collection configuration (#5383)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1915e9e90 [INLONG-5382][Manager][Agent] Optimized the file collection configuration (#5383)
1915e9e90 is described below

commit 1915e9e902264c2abcf5987eb12e4fdf47abd3d0
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Mon Aug 8 10:33:38 2022 +0800

    [INLONG-5382][Manager][Agent] Optimized the file collection configuration (#5383)
---
 .../apache/inlong/agent/constant/JobConstants.java |  1 +
 .../java/org/apache/inlong/agent/pojo/FileJob.java | 14 ++++++++++-
 .../apache/inlong/agent/pojo/JobProfileDto.java    |  1 +
 .../manager/pojo/source/file/FileSource.java       | 21 +++++++++++++++++
 .../manager/pojo/source/file/FileSourceDTO.java    | 27 ++++++++++++++++++++++
 .../pojo/source/file/FileSourceRequest.java        | 21 +++++++++++++++++
 .../service/core/impl/AgentServiceImpl.java        | 19 ++++++++++++++-
 7 files changed, 102 insertions(+), 2 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index 9ca8b4d0f..27330859d 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -54,6 +54,7 @@ public class JobConstants extends CommonConstants {
     public static final String JOB_FILE_LINE_END_PATTERN = "job.fileJob.line.endPattern";
     public static final String JOB_FILE_CONTENT_COLLECT_TYPE = "job.fileJob.contentCollectType";
     public static final String JOB_FILE_META_ENV_LIST = "job.fileJob.envList";
+    public static final String JOB_FILE_DATA_SOURCE_COLUMN_SEPARATOR = "job.fileJob.dataSeparator";
 
     //Binlog job
     public static final String JOB_DATABASE_USER = "job.binlogJob.user";
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
index 2780366b3..a8bbfceb4 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
@@ -43,6 +43,8 @@ public class FileJob {
 
     private List<Map<String, String>> metaFields;
 
+    private String dataSeparator;
+
     @Data
     public static class Dir {
 
@@ -83,11 +85,21 @@ public class FileJob {
 
         private String lineEndPattern;
 
+        // Type of file content, for example: FULL, INCREMENT
         private String contentCollectType;
 
+        // File needs to collect environment information, for example: kubernetes
         private String envList;
-
+        // Metadata of data, for example:
+        // [{data:field1,field2},{kubernetes:namespace,labels,name,uuid}] and so on
         private List<Map<String, String>> metaFields;
+        // Type of data result for column separator
+        // CSV format, set this parameter to a custom separator: , | :
+        // Json format, set this parameter to json 
+        private String dataContentStyle;
+
+        // Column separator of data source 
+        private String dataSeparator;
     }
 
 }
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 02beda28a..0679a384f 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -106,6 +106,7 @@ public class JobProfileDto {
         fileJob.setDir(dir);
         fileJob.setCollectType(fileJobTaskConfig.getCollectType());
         fileJob.setContentCollectType(fileJobTaskConfig.getContentCollectType());
+        fileJob.setDataSeparator(fileJobTaskConfig.getDataSeparator());
 
         if (fileJobTaskConfig.getTimeOffset() != null) {
             fileJob.setTimeOffset(fileJobTaskConfig.getTimeOffset());
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java
index d01c3e8e3..61577bf58 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java
@@ -30,6 +30,9 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 import org.apache.inlong.manager.pojo.source.StreamSource;
 
+import java.util.List;
+import java.util.Map;
+
 /**
  * File source info
  */
@@ -58,6 +61,24 @@ public class FileSource extends StreamSource {
             + "Null or blank means from current timestamp")
     private String timeOffset;
 
+    @ApiModelProperty("Line end regex pattern, for example: &end&")
+    private String lineEndPattern;
+
+    @ApiModelProperty("Type of file content, for example: FULL, INCREMENT")
+    private String contentCollectType;
+
+    @ApiModelProperty("File needs to collect environment information, for example: kubernetes")
+    private String envList;
+
+    @ApiModelProperty("Metadata of data, for example: "
+            + "[{data:field1,field2},{kubernetes:namespace,labels,name,uuid}] and so on")
+    private List<Map<String, String>> metaFields;
+
+    @ApiModelProperty(" Type of data result for column separator"
+            + "         CSV format, set this parameter to a custom separator: , | : "
+            + "         Json format, set this parameter to json ")
+    private String dataContentStyle;
+
     public FileSource() {
         this.setSourceType(SourceType.FILE);
     }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
index 21b186767..f481b77d4 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
@@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 
 import javax.validation.constraints.NotNull;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -60,11 +61,37 @@ public class FileSourceDTO {
     @ApiModelProperty("Properties for File")
     private Map<String, Object> properties;
 
+    @ApiModelProperty("Line end regex pattern, for example: &end&")
+    private String lineEndPattern;
+
+    @ApiModelProperty("Type of file content, for example: FULL, INCREMENT")
+    private String contentCollectType;
+
+    @ApiModelProperty("File needs to collect environment information, for example: kubernetes")
+    private String envList;
+
+    @ApiModelProperty("Metadata of data, for example: [{data:field1,field2},"
+            + "{kubernetes:namespace,labels,name,uuid}] and so on")
+    private List<Map<String, String>> metaFields;
+
+    @ApiModelProperty(" Type of data result for column separator"
+            + "         CSV format, set this parameter to a custom separator: , | : "
+            + "         Json format, set this parameter to json ")
+    private String dataContentStyle;
+
+    @ApiModelProperty("Column separator of data source ")
+    private String dataSeparator;
+
     public static FileSourceDTO getFromRequest(@NotNull FileSourceRequest fileSourceRequest) {
         return FileSourceDTO.builder()
                 .clusterTag(fileSourceRequest.getClusterTag())
                 .ip(fileSourceRequest.getIp())
                 .pattern(fileSourceRequest.getPattern())
+                .lineEndPattern(fileSourceRequest.getLineEndPattern())
+                .contentCollectType(fileSourceRequest.getContentCollectType())
+                .envList(fileSourceRequest.getEnvList())
+                .dataContentStyle(fileSourceRequest.getDataContentStyle())
+                .metaFields(fileSourceRequest.getMetaFields())
                 .timeOffset(fileSourceRequest.getTimeOffset())
                 .properties(fileSourceRequest.getProperties())
                 .build();
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java
index 39d45eab6..c68bce85e 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java
@@ -27,6 +27,9 @@ import org.apache.inlong.manager.common.enums.DataFormat;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 
+import java.util.List;
+import java.util.Map;
+
 /**
  * File source request
  */
@@ -53,6 +56,24 @@ public class FileSourceRequest extends SourceRequest {
             + "Null or blank means from current timestamp")
     private String timeOffset;
 
+    @ApiModelProperty("Line end regex pattern, for example: &end&")
+    private String lineEndPattern;
+
+    @ApiModelProperty("Type of file content, for example: FULL, INCREMENT")
+    private String contentCollectType;
+
+    @ApiModelProperty("File needs to collect environment information, for example: kubernetes")
+    private String envList;
+
+    @ApiModelProperty("Metadata of data, for example: "
+            + "[{data:field1,field2},{kubernetes:namespace,labels,name,uuid}] and so on")
+    private List<Map<String, String>> metaFields;
+
+    @ApiModelProperty(" Type of data result for column separator"
+            + "         CSV format, set this parameter to a custom separator: , | : "
+            + "         Json format, set this parameter to json ")
+    private String dataContentStyle;
+
     public FileSourceRequest() {
         this.setSourceType(SourceType.FILE);
         this.setSerializationType(DataFormat.CSV.getName());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 1d001e107..06185049c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -311,7 +311,6 @@ public class AgentServiceImpl implements AgentService {
         dataConfig.setTaskType(getTaskType(entity));
         dataConfig.setTaskName(entity.getSourceName());
         dataConfig.setSnapshot(entity.getSnapshot());
-        dataConfig.setExtParams(entity.getExtParams());
         dataConfig.setVersion(entity.getVersion());
 
         String groupId = entity.getInlongGroupId();
@@ -319,15 +318,33 @@ public class AgentServiceImpl implements AgentService {
         dataConfig.setInlongGroupId(groupId);
         dataConfig.setInlongStreamId(streamId);
         InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
+        String extParams = entity.getExtParams();
         if (streamEntity != null) {
             dataConfig.setSyncSend(streamEntity.getSyncSend());
+            if (SourceType.FILE.equalsIgnoreCase(streamEntity.getDataType())) {
+                String dataSeparator = streamEntity.getDataSeparator();
+                extParams = null != dataSeparator ? getExtParams(extParams, dataSeparator) : extParams;
+            }
         } else {
             dataConfig.setSyncSend(0);
             LOGGER.warn("set syncSend=[0] as the stream not exists for groupId={}, streamId={}", groupId, streamId);
         }
+        dataConfig.setExtParams(extParams);
         return dataConfig;
     }
 
+    private String getExtParams(String extParams, String dataSeparator) {
+        if (Objects.isNull(extParams)) {
+            return null;
+        }
+        FileSourceDTO fileSourceDTO = JsonUtils.parseObject(extParams, FileSourceDTO.class);
+        if (Objects.nonNull(fileSourceDTO)) {
+            fileSourceDTO.setDataSeparator(dataSeparator);
+            return JsonUtils.toJsonString(fileSourceDTO);
+        }
+        return extParams;
+    }
+
     /**
      * Get the Task type from the stream source entity.
      *