You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/04/03 07:51:32 UTC

[inlong] branch master updated: [INLONG-7690][Manager] Creating schema of StreamSource by CSV (#7740)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 73945a3f3 [INLONG-7690][Manager] Creating schema of StreamSource by CSV (#7740)
73945a3f3 is described below

commit 73945a3f37162c3e121f52b1a0d33977bf957fff
Author: feat <fe...@outlook.com>
AuthorDate: Mon Apr 3 15:51:25 2023 +0800

    [INLONG-7690][Manager] Creating schema of StreamSource by CSV (#7740)
---
 .../manager/common/consts/InlongConstants.java     | 13 ++++
 .../service/sink/StreamSinkServiceImpl.java        | 44 ++++++++++++-
 .../service/stream/InlongStreamServiceImpl.java    | 74 ++++++++++++++++++----
 3 files changed, 118 insertions(+), 13 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 708042fca..267edd393 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -17,6 +17,11 @@
 
 package org.apache.inlong.manager.common.consts;
 
+import com.google.common.collect.Sets;
+
+import java.util.Set;
+import java.util.regex.Pattern;
+
 /**
  * Global constant for the Inlong system.
  */
@@ -49,6 +54,8 @@ public class InlongConstants {
 
     public static final String QUESTION_MARK = "?";
 
+    public static final String NEW_LINE = "\n";
+
     public static final String ADMIN_USER = "admin";
 
     public static final Integer AFFECTED_ONE_ROW = 1;
@@ -150,7 +157,13 @@ public class InlongConstants {
 
     public static final String STATEMENT_TYPE_SQL = "sql";
     public static final String STATEMENT_TYPE_JSON = "json";
+    public static final String STATEMENT_TYPE_CSV = "csv";
 
     public static final String SORT_TYPE_INFO_SUFFIX = "TypeInfo";
 
+    public static final Pattern PATTERN_NORMAL_CHARACTERS = Pattern.compile("^[a-zA-Z0-9_]*$");
+
+    public static final Set<String> STREAM_FORMAT_TYPES =
+            Sets.newHashSet("string", "int", "long", "float", "double", "date", "timestamp");
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index e11532de7..5db13d989 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -82,7 +82,9 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static org.apache.inlong.manager.common.consts.InlongConstants.LEFT_BRACKET;
+import static org.apache.inlong.manager.common.consts.InlongConstants.PATTERN_NORMAL_CHARACTERS;
 import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
+import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
 
 /**
  * Implementation of sink service interface
@@ -91,6 +93,9 @@ import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_
 public class StreamSinkServiceImpl implements StreamSinkService {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(StreamSinkServiceImpl.class);
+    private static final String PARSE_FIELD_CSV_SPLITTER = "\t|\\s|,";
+    private static final int PARSE_FIELD_CSV_MAX_COLUMNS = 3;
+    private static final int PARSE_FIELD_CSV_MIN_COLUMNS = 2;
 
     @Autowired
     private SinkOperatorFactory operatorFactory;
@@ -708,8 +713,10 @@ public class StreamSinkServiceImpl implements StreamSinkService {
             Map<String, String> fieldsMap;
             if (STATEMENT_TYPE_JSON.equals(method)) {
                 fieldsMap = parseFieldsByJson(statement);
-            } else {
+            } else if (STATEMENT_TYPE_SQL.equals(method)) {
                 fieldsMap = parseFieldsBySql(statement);
+            } else {
+                return parseFieldsByCsv(statement);
             }
             return fieldsMap.entrySet().stream().map(entry -> {
                 SinkField field = new SinkField();
@@ -725,6 +732,41 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         }
     }
 
+    private List<SinkField> parseFieldsByCsv(String statement) {
+        String[] lines = statement.split(InlongConstants.NEW_LINE);
+        List<SinkField> fields = new ArrayList<>();
+        for (int i = 0; i < lines.length; i++) {
+            String line = lines[i];
+            if (StringUtils.isBlank(line)) {
+                continue;
+            }
+
+            String[] cols = line.split(PARSE_FIELD_CSV_SPLITTER, PARSE_FIELD_CSV_MAX_COLUMNS);
+            if (cols.length < PARSE_FIELD_CSV_MIN_COLUMNS) {
+                throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+                        "At least two fields are required, line number is " + (i + 1));
+            }
+            String fieldName = cols[0];
+            if (!PATTERN_NORMAL_CHARACTERS.matcher(fieldName).matches()) {
+                throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "Field names in line " + (i + 1) +
+                        " can only contain letters, underscores or numbers");
+            }
+            String fieldType = cols[1];
+
+            String comment = null;
+            if (cols.length == PARSE_FIELD_CSV_MAX_COLUMNS) {
+                comment = cols[PARSE_FIELD_CSV_MAX_COLUMNS - 1];
+            }
+
+            SinkField field = new SinkField();
+            field.setFieldName(fieldName);
+            field.setFieldType(fieldType);
+            field.setFieldComment(comment);
+            fields.add(field);
+        }
+        return fields;
+    }
+
     private Map<String, String> parseFieldsBySql(String sql) throws JSQLParserException {
         CCJSqlParserManager pm = new CCJSqlParserManager();
         Statement statement = pm.parse(new StringReader(sql));
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index e50a4c4be..2f19c6bb7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -82,7 +82,10 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
+import static org.apache.inlong.manager.common.consts.InlongConstants.PATTERN_NORMAL_CHARACTERS;
 import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
+import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
+import static org.apache.inlong.manager.common.consts.InlongConstants.STREAM_FORMAT_TYPES;
 import static org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.packExtParams;
 import static org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.unpackExtParams;
 
@@ -93,6 +96,9 @@ import static org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.unpackE
 public class InlongStreamServiceImpl implements InlongStreamService {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(InlongStreamServiceImpl.class);
+    private static final String PARSE_FIELD_CSV_SPLITTER = "\t|\\s|,";
+    private static final int PARSE_FIELD_CSV_MAX_COLUMNS = 3;
+    private static final int PARSE_FIELD_CSV_MIN_COLUMNS = 2;
 
     @Autowired
     private InlongStreamEntityMapper streamMapper;
@@ -112,7 +118,6 @@ public class InlongStreamServiceImpl implements InlongStreamService {
     private UserService userService;
 
     @Transactional(rollbackFor = Throwable.class)
-
     @Override
     public Integer save(InlongStreamRequest request, String operator) {
         LOGGER.debug("begin to save inlong stream info={}", request);
@@ -222,8 +227,8 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         streamInfo.setFieldList(streamFields);
         // load ext infos
         List<InlongStreamExtEntity> extEntities = streamExtMapper.selectByRelatedId(groupId, streamId);
-        List<InlongStreamExtInfo> exts = CommonBeanUtils.copyListProperties(extEntities, InlongStreamExtInfo::new);
-        streamInfo.setExtList(exts);
+        List<InlongStreamExtInfo> extInfos = CommonBeanUtils.copyListProperties(extEntities, InlongStreamExtInfo::new);
+        streamInfo.setExtList(extInfos);
         // load extParams
         unpackExtParams(streamEntity.getExtParams(), streamInfo);
 
@@ -255,8 +260,8 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         List<StreamField> streamFields = getStreamFields(groupId, streamId);
         streamInfo.setFieldList(streamFields);
         List<InlongStreamExtEntity> extEntities = streamExtMapper.selectByRelatedId(groupId, streamId);
-        List<InlongStreamExtInfo> exts = CommonBeanUtils.copyListProperties(extEntities, InlongStreamExtInfo::new);
-        streamInfo.setExtList(exts);
+        List<InlongStreamExtInfo> extInfos = CommonBeanUtils.copyListProperties(extEntities, InlongStreamExtInfo::new);
+        streamInfo.setExtList(extInfos);
         List<StreamSink> sinkList = sinkService.listSink(groupId, streamId);
         streamInfo.setSinkList(sinkList);
         List<StreamSource> sourceList = sourceService.listSource(groupId, streamId);
@@ -733,8 +738,10 @@ public class InlongStreamServiceImpl implements InlongStreamService {
             Map<String, String> fieldsMap;
             if (STATEMENT_TYPE_JSON.equals(method)) {
                 fieldsMap = parseFieldsByJson(statement);
-            } else {
+            } else if (STATEMENT_TYPE_SQL.equals(method)) {
                 fieldsMap = parseFieldsBySql(statement);
+            } else {
+                return parseFieldsByCsv(statement);
             }
             return fieldsMap.entrySet().stream().map(entry -> {
                 StreamField field = new StreamField();
@@ -749,7 +756,49 @@ public class InlongStreamServiceImpl implements InlongStreamService {
                     String.format("parse stream fields error : %s", e.getMessage()));
         }
     }
+    /**
+     * Parse fields from CSV format
+     * @param statement CSV statement
+     * @return List of StreamField
+     */
+    private List<StreamField> parseFieldsByCsv(String statement) {
+        String[] lines = statement.split(InlongConstants.NEW_LINE);
+        List<StreamField> fields = new ArrayList<>();
+        for (int i = 0; i < lines.length; i++) {
+            String line = lines[i];
+            if (StringUtils.isBlank(line)) {
+                continue;
+            }
 
+            String[] cols = line.split(PARSE_FIELD_CSV_SPLITTER, PARSE_FIELD_CSV_MAX_COLUMNS);
+            if (cols.length < PARSE_FIELD_CSV_MIN_COLUMNS) {
+                throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+                        "At least two fields are required, line number is " + (i + 1));
+            }
+            String fieldName = cols[0];
+            if (!PATTERN_NORMAL_CHARACTERS.matcher(fieldName).matches()) {
+                throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "Field names in line " + (i + 1) +
+                        " can only contain letters, underscores or numbers");
+            }
+            String fieldType = cols[1];
+            if (!STREAM_FORMAT_TYPES.contains(fieldType)) {
+                throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "The field type in line" + (i + 1) +
+                        " must be one of " + STREAM_FORMAT_TYPES);
+            }
+
+            String comment = null;
+            if (cols.length == PARSE_FIELD_CSV_MAX_COLUMNS) {
+                comment = cols[PARSE_FIELD_CSV_MAX_COLUMNS - 1];
+            }
+
+            StreamField field = new StreamField();
+            field.setFieldName(fieldName);
+            field.setFieldType(fieldType);
+            field.setFieldComment(comment);
+            fields.add(field);
+        }
+        return fields;
+    }
     private Map<String, String> parseFieldsBySql(String sql) throws JSQLParserException {
         CCJSqlParserManager pm = new CCJSqlParserManager();
         Statement statement = pm.parse(new StringReader(sql));
@@ -793,7 +842,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
      * First physically delete the existing field information, and then add the field information of this batch
      */
     @Transactional(rollbackFor = Throwable.class)
-    void updateField(String groupId, String streamId, List<StreamField> fieldList) {
+    public void updateField(String groupId, String streamId, List<StreamField> fieldList) {
         LOGGER.debug("begin to update inlong stream field, groupId={}, streamId={}, field={}", groupId, streamId,
                 fieldList);
         try {
@@ -807,7 +856,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
     }
 
     @Transactional(rollbackFor = Throwable.class)
-    void saveField(String groupId, String streamId, List<StreamField> infoList) {
+    public void saveField(String groupId, String streamId, List<StreamField> infoList) {
         if (CollectionUtils.isEmpty(infoList)) {
             return;
         }
@@ -823,14 +872,15 @@ public class InlongStreamServiceImpl implements InlongStreamService {
     }
 
     @Transactional(rollbackFor = Throwable.class)
-    void saveOrUpdateExt(String groupId, String streamId, List<InlongStreamExtInfo> exts) {
+    public void saveOrUpdateExt(String groupId, String streamId, List<InlongStreamExtInfo> extInfos) {
         LOGGER.info("begin to save or update inlong stream ext info, groupId={}, streamId={}, ext={}", groupId,
-                streamId, exts);
-        if (CollectionUtils.isEmpty(exts)) {
+                streamId, extInfos);
+        if (CollectionUtils.isEmpty(extInfos)) {
             return;
         }
 
-        List<InlongStreamExtEntity> entityList = CommonBeanUtils.copyListProperties(exts, InlongStreamExtEntity::new);
+        List<InlongStreamExtEntity> entityList =
+                CommonBeanUtils.copyListProperties(extInfos, InlongStreamExtEntity::new);
         entityList.forEach(streamEntity -> {
             streamEntity.setInlongGroupId(groupId);
             streamEntity.setInlongStreamId(streamId);