You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/04/03 07:51:32 UTC
[inlong] branch master updated: [INLONG-7690][Manager] Creating schema of StreamSource by CSV (#7740)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 73945a3f3 [INLONG-7690][Manager] Creating schema of StreamSource by CSV (#7740)
73945a3f3 is described below
commit 73945a3f37162c3e121f52b1a0d33977bf957fff
Author: feat <fe...@outlook.com>
AuthorDate: Mon Apr 3 15:51:25 2023 +0800
[INLONG-7690][Manager] Creating schema of StreamSource by CSV (#7740)
---
.../manager/common/consts/InlongConstants.java | 13 ++++
.../service/sink/StreamSinkServiceImpl.java | 44 ++++++++++++-
.../service/stream/InlongStreamServiceImpl.java | 74 ++++++++++++++++++----
3 files changed, 118 insertions(+), 13 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 708042fca..267edd393 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -17,6 +17,11 @@
package org.apache.inlong.manager.common.consts;
+import com.google.common.collect.Sets;
+
+import java.util.Set;
+import java.util.regex.Pattern;
+
/**
* Global constant for the Inlong system.
*/
@@ -49,6 +54,8 @@ public class InlongConstants {
public static final String QUESTION_MARK = "?";
+ public static final String NEW_LINE = "\n";
+
public static final String ADMIN_USER = "admin";
public static final Integer AFFECTED_ONE_ROW = 1;
@@ -150,7 +157,13 @@ public class InlongConstants {
public static final String STATEMENT_TYPE_SQL = "sql";
public static final String STATEMENT_TYPE_JSON = "json";
+ public static final String STATEMENT_TYPE_CSV = "csv";
public static final String SORT_TYPE_INFO_SUFFIX = "TypeInfo";
+ public static final Pattern PATTERN_NORMAL_CHARACTERS = Pattern.compile("^[a-zA-Z0-9_]*$");
+
+ public static final Set<String> STREAM_FORMAT_TYPES =
+ Sets.newHashSet("string", "int", "long", "float", "double", "date", "timestamp");
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index e11532de7..5db13d989 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -82,7 +82,9 @@ import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.inlong.manager.common.consts.InlongConstants.LEFT_BRACKET;
+import static org.apache.inlong.manager.common.consts.InlongConstants.PATTERN_NORMAL_CHARACTERS;
import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
+import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
/**
* Implementation of sink service interface
@@ -91,6 +93,9 @@ import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_
public class StreamSinkServiceImpl implements StreamSinkService {
private static final Logger LOGGER = LoggerFactory.getLogger(StreamSinkServiceImpl.class);
+ private static final String PARSE_FIELD_CSV_SPLITTER = "\t|\\s|,";
+ private static final int PARSE_FIELD_CSV_MAX_COLUMNS = 3;
+ private static final int PARSE_FIELD_CSV_MIN_COLUMNS = 2;
@Autowired
private SinkOperatorFactory operatorFactory;
@@ -708,8 +713,10 @@ public class StreamSinkServiceImpl implements StreamSinkService {
Map<String, String> fieldsMap;
if (STATEMENT_TYPE_JSON.equals(method)) {
fieldsMap = parseFieldsByJson(statement);
- } else {
+ } else if (STATEMENT_TYPE_SQL.equals(method)) {
fieldsMap = parseFieldsBySql(statement);
+ } else {
+ return parseFieldsByCsv(statement);
}
return fieldsMap.entrySet().stream().map(entry -> {
SinkField field = new SinkField();
@@ -725,6 +732,41 @@ public class StreamSinkServiceImpl implements StreamSinkService {
}
}
+ private List<SinkField> parseFieldsByCsv(String statement) {
+ String[] lines = statement.split(InlongConstants.NEW_LINE);
+ List<SinkField> fields = new ArrayList<>();
+ for (int i = 0; i < lines.length; i++) {
+ String line = lines[i];
+ if (StringUtils.isBlank(line)) {
+ continue;
+ }
+
+ String[] cols = line.split(PARSE_FIELD_CSV_SPLITTER, PARSE_FIELD_CSV_MAX_COLUMNS);
+ if (cols.length < PARSE_FIELD_CSV_MIN_COLUMNS) {
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+ "At least two fields are required, line number is " + (i + 1));
+ }
+ String fieldName = cols[0];
+ if (!PATTERN_NORMAL_CHARACTERS.matcher(fieldName).matches()) {
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "Field names in line " + (i + 1) +
+ " can only contain letters, underscores or numbers");
+ }
+ String fieldType = cols[1];
+
+ String comment = null;
+ if (cols.length == PARSE_FIELD_CSV_MAX_COLUMNS) {
+ comment = cols[PARSE_FIELD_CSV_MAX_COLUMNS - 1];
+ }
+
+ SinkField field = new SinkField();
+ field.setFieldName(fieldName);
+ field.setFieldType(fieldType);
+ field.setFieldComment(comment);
+ fields.add(field);
+ }
+ return fields;
+ }
+
private Map<String, String> parseFieldsBySql(String sql) throws JSQLParserException {
CCJSqlParserManager pm = new CCJSqlParserManager();
Statement statement = pm.parse(new StringReader(sql));
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index e50a4c4be..2f19c6bb7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -82,7 +82,10 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
+import static org.apache.inlong.manager.common.consts.InlongConstants.PATTERN_NORMAL_CHARACTERS;
import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
+import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
+import static org.apache.inlong.manager.common.consts.InlongConstants.STREAM_FORMAT_TYPES;
import static org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.packExtParams;
import static org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.unpackExtParams;
@@ -93,6 +96,9 @@ import static org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.unpackE
public class InlongStreamServiceImpl implements InlongStreamService {
private static final Logger LOGGER = LoggerFactory.getLogger(InlongStreamServiceImpl.class);
+ private static final String PARSE_FIELD_CSV_SPLITTER = "\t|\\s|,";
+ private static final int PARSE_FIELD_CSV_MAX_COLUMNS = 3;
+ private static final int PARSE_FIELD_CSV_MIN_COLUMNS = 2;
@Autowired
private InlongStreamEntityMapper streamMapper;
@@ -112,7 +118,6 @@ public class InlongStreamServiceImpl implements InlongStreamService {
private UserService userService;
@Transactional(rollbackFor = Throwable.class)
-
@Override
public Integer save(InlongStreamRequest request, String operator) {
LOGGER.debug("begin to save inlong stream info={}", request);
@@ -222,8 +227,8 @@ public class InlongStreamServiceImpl implements InlongStreamService {
streamInfo.setFieldList(streamFields);
// load ext infos
List<InlongStreamExtEntity> extEntities = streamExtMapper.selectByRelatedId(groupId, streamId);
- List<InlongStreamExtInfo> exts = CommonBeanUtils.copyListProperties(extEntities, InlongStreamExtInfo::new);
- streamInfo.setExtList(exts);
+ List<InlongStreamExtInfo> extInfos = CommonBeanUtils.copyListProperties(extEntities, InlongStreamExtInfo::new);
+ streamInfo.setExtList(extInfos);
// load extParams
unpackExtParams(streamEntity.getExtParams(), streamInfo);
@@ -255,8 +260,8 @@ public class InlongStreamServiceImpl implements InlongStreamService {
List<StreamField> streamFields = getStreamFields(groupId, streamId);
streamInfo.setFieldList(streamFields);
List<InlongStreamExtEntity> extEntities = streamExtMapper.selectByRelatedId(groupId, streamId);
- List<InlongStreamExtInfo> exts = CommonBeanUtils.copyListProperties(extEntities, InlongStreamExtInfo::new);
- streamInfo.setExtList(exts);
+ List<InlongStreamExtInfo> extInfos = CommonBeanUtils.copyListProperties(extEntities, InlongStreamExtInfo::new);
+ streamInfo.setExtList(extInfos);
List<StreamSink> sinkList = sinkService.listSink(groupId, streamId);
streamInfo.setSinkList(sinkList);
List<StreamSource> sourceList = sourceService.listSource(groupId, streamId);
@@ -733,8 +738,10 @@ public class InlongStreamServiceImpl implements InlongStreamService {
Map<String, String> fieldsMap;
if (STATEMENT_TYPE_JSON.equals(method)) {
fieldsMap = parseFieldsByJson(statement);
- } else {
+ } else if (STATEMENT_TYPE_SQL.equals(method)) {
fieldsMap = parseFieldsBySql(statement);
+ } else {
+ return parseFieldsByCsv(statement);
}
return fieldsMap.entrySet().stream().map(entry -> {
StreamField field = new StreamField();
@@ -749,7 +756,49 @@ public class InlongStreamServiceImpl implements InlongStreamService {
String.format("parse stream fields error : %s", e.getMessage()));
}
}
+ /**
+ * Parse fields from CSV format
+ * @param statement CSV statement
+ * @return List of StreamField
+ */
+ private List<StreamField> parseFieldsByCsv(String statement) {
+ String[] lines = statement.split(InlongConstants.NEW_LINE);
+ List<StreamField> fields = new ArrayList<>();
+ for (int i = 0; i < lines.length; i++) {
+ String line = lines[i];
+ if (StringUtils.isBlank(line)) {
+ continue;
+ }
+ String[] cols = line.split(PARSE_FIELD_CSV_SPLITTER, PARSE_FIELD_CSV_MAX_COLUMNS);
+ if (cols.length < PARSE_FIELD_CSV_MIN_COLUMNS) {
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+ "At least two fields are required, line number is " + (i + 1));
+ }
+ String fieldName = cols[0];
+ if (!PATTERN_NORMAL_CHARACTERS.matcher(fieldName).matches()) {
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "Field names in line " + (i + 1) +
+ " can only contain letters, underscores or numbers");
+ }
+ String fieldType = cols[1];
+ if (!STREAM_FORMAT_TYPES.contains(fieldType)) {
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "The field type in line" + (i + 1) +
+ " must be one of " + STREAM_FORMAT_TYPES);
+ }
+
+ String comment = null;
+ if (cols.length == PARSE_FIELD_CSV_MAX_COLUMNS) {
+ comment = cols[PARSE_FIELD_CSV_MAX_COLUMNS - 1];
+ }
+
+ StreamField field = new StreamField();
+ field.setFieldName(fieldName);
+ field.setFieldType(fieldType);
+ field.setFieldComment(comment);
+ fields.add(field);
+ }
+ return fields;
+ }
private Map<String, String> parseFieldsBySql(String sql) throws JSQLParserException {
CCJSqlParserManager pm = new CCJSqlParserManager();
Statement statement = pm.parse(new StringReader(sql));
@@ -793,7 +842,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
* First physically delete the existing field information, and then add the field information of this batch
*/
@Transactional(rollbackFor = Throwable.class)
- void updateField(String groupId, String streamId, List<StreamField> fieldList) {
+ public void updateField(String groupId, String streamId, List<StreamField> fieldList) {
LOGGER.debug("begin to update inlong stream field, groupId={}, streamId={}, field={}", groupId, streamId,
fieldList);
try {
@@ -807,7 +856,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
}
@Transactional(rollbackFor = Throwable.class)
- void saveField(String groupId, String streamId, List<StreamField> infoList) {
+ public void saveField(String groupId, String streamId, List<StreamField> infoList) {
if (CollectionUtils.isEmpty(infoList)) {
return;
}
@@ -823,14 +872,15 @@ public class InlongStreamServiceImpl implements InlongStreamService {
}
@Transactional(rollbackFor = Throwable.class)
- void saveOrUpdateExt(String groupId, String streamId, List<InlongStreamExtInfo> exts) {
+ public void saveOrUpdateExt(String groupId, String streamId, List<InlongStreamExtInfo> extInfos) {
LOGGER.info("begin to save or update inlong stream ext info, groupId={}, streamId={}, ext={}", groupId,
- streamId, exts);
- if (CollectionUtils.isEmpty(exts)) {
+ streamId, extInfos);
+ if (CollectionUtils.isEmpty(extInfos)) {
return;
}
- List<InlongStreamExtEntity> entityList = CommonBeanUtils.copyListProperties(exts, InlongStreamExtEntity::new);
+ List<InlongStreamExtEntity> entityList =
+ CommonBeanUtils.copyListProperties(extInfos, InlongStreamExtEntity::new);
entityList.forEach(streamEntity -> {
streamEntity.setInlongGroupId(groupId);
streamEntity.setInlongStreamId(streamId);