You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/30 07:38:37 UTC
[inlong] branch master updated: [INLONG-7688][Manager] Creating schema of StreamSource by SQL (#7689)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3ce14049c [INLONG-7688][Manager] Creating schema of StreamSource by SQL (#7689)
3ce14049c is described below
commit 3ce14049c8dd9c7d102608ba08a28046a91f76b2
Author: feat <fe...@outlook.com>
AuthorDate: Thu Mar 30 15:38:30 2023 +0800
[INLONG-7688][Manager] Creating schema of StreamSource by SQL (#7689)
---
.../api/inner/client/InlongStreamClient.java | 25 +++++++-
.../client/api/inner/client/StreamSinkClient.java | 24 +++++++-
.../client/api/service/InlongStreamApi.java | 3 +-
.../manager/client/api/service/StreamSinkApi.java | 3 +-
.../client/api/inner/ClientFactoryTest.java | 5 +-
.../manager/common/consts/InlongConstants.java | 5 ++
.../manager/pojo/sink/ParseFieldRequest.java | 37 ++++++++++++
.../manager/pojo/sort/util/FieldInfoUtils.java | 43 +++++++++++--
.../manager/service/sink/StreamSinkService.java | 7 ++-
.../service/sink/StreamSinkServiceImpl.java | 65 +++++++++++++++++---
.../service/stream/InlongStreamService.java | 7 ++-
.../service/stream/InlongStreamServiceImpl.java | 70 +++++++++++++++++++---
.../manager/service/stream/InlongStreamTest.java | 53 ++++++++++++++--
.../web/controller/InlongStreamController.java | 10 ++--
.../web/controller/StreamSinkController.java | 9 +--
15 files changed, 319 insertions(+), 47 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
index 0213ca436..c5a2f780b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
@@ -25,6 +25,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
@@ -32,6 +33,9 @@ import org.apache.inlong.manager.pojo.stream.StreamField;
import java.util.List;
+import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
+import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
+
/**
* Client for {@link InlongStreamApi}.
*/
@@ -218,12 +222,27 @@ public class InlongStreamClient {
/**
* Converts a json string to a streamFields
*
- * @param fieldsJson JSON string for the field information
+ * @param parseFieldRequest the request for the field information
* @return list of stream field
*/
- public List<StreamField> parseFields(String fieldsJson) {
- Response<List<StreamField>> response = ClientUtils.executeHttpCall(inlongStreamApi.parseFields(fieldsJson));
+ public List<StreamField> parseFields(ParseFieldRequest parseFieldRequest) {
+ Response<List<StreamField>> response =
+ ClientUtils.executeHttpCall(inlongStreamApi.parseFields(parseFieldRequest));
ClientUtils.assertRespSuccess(response);
return response.getData();
}
+
+ /**
+ * Converts a json string to a streamFields
+ * @param method the method for the field information: json or sql
+ * @param statement the statement for the field information
+ * @return list of stream field
+ */
+ public List<StreamField> parseFields(String method, String statement) {
+ Preconditions.expectTrue(STATEMENT_TYPE_JSON.equals(method) || STATEMENT_TYPE_SQL.equals(method),
+ "Unsupported parse field method: '" + method + "'");
+ Preconditions.expectNotBlank(statement, "The statement must not empty");
+ ParseFieldRequest request = ParseFieldRequest.builder().method(method).statement(statement).build();
+ return parseFields(request);
+ }
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
index e8f032c61..6cf9ccc03 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
@@ -25,6 +25,7 @@ import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.common.UpdateResult;
+import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
@@ -32,6 +33,9 @@ import org.apache.inlong.manager.pojo.sink.StreamSink;
import java.util.List;
+import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
+import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
+
/**
* Client for {@link StreamSinkApi}.
*/
@@ -137,12 +141,26 @@ public class StreamSinkClient {
/**
* Converts a json string to a sinkFields
*
- * @param fieldsJson JSON string for the field information
+ * @param parseFieldRequest the request for the field information
* @return list of sink field
*/
- public List<SinkField> parseFields(String fieldsJson) {
- Response<List<SinkField>> response = ClientUtils.executeHttpCall(streamSinkApi.parseFields(fieldsJson));
+ public List<SinkField> parseFields(ParseFieldRequest parseFieldRequest) {
+ Response<List<SinkField>> response = ClientUtils.executeHttpCall(streamSinkApi.parseFields(parseFieldRequest));
ClientUtils.assertRespSuccess(response);
return response.getData();
}
+
+ /**
+ * Converts a json string to a streamFields
+ * @param method the method for the field information: json or sql
+ * @param statement the statement for the field information
+ * @return list of stream field
+ */
+ public List<SinkField> parseFields(String method, String statement) {
+ Preconditions.expectTrue(STATEMENT_TYPE_JSON.equals(method) || STATEMENT_TYPE_SQL.equals(method),
+ "Unsupported parse field method: '" + method + "'");
+ Preconditions.expectNotBlank(statement, "The statement must not empty");
+ ParseFieldRequest request = ParseFieldRequest.builder().method(method).statement(statement).build();
+ return parseFields(request);
+ }
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
index 2db47beda..0a9c2edac 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.client.api.service;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
@@ -70,5 +71,5 @@ public interface InlongStreamApi {
Call<Response<Boolean>> delete(@Query("groupId") String groupId, @Query("streamId") String streamId);
@POST("stream/parseFields")
- Call<Response<List<StreamField>>> parseFields(@Body String fieldsJson);
+ Call<Response<List<StreamField>>> parseFields(@Body ParseFieldRequest parseFieldRequest);
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
index a28242f71..4dfc7de14 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.client.api.service;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.common.UpdateResult;
+import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
@@ -59,6 +60,6 @@ public interface StreamSinkApi {
Call<Response<PageResult<StreamSink>>> list(@Body SinkPageRequest request);
@POST("sink/parseFields")
- Call<Response<List<SinkField>>> parseFields(@Body String fieldsJson);
+ Call<Response<List<SinkField>>> parseFields(@Body ParseFieldRequest parseFieldRequest);
}
diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
index eeaed841e..6e519df7c 100644
--- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
+++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
@@ -106,6 +106,7 @@ import static com.github.tomakehurst.wiremock.client.WireMock.post;
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
+import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
/**
* Unit test for {@link ClientFactory}.
@@ -1034,7 +1035,7 @@ class ClientFactoryTest {
okJson(JsonUtils.toJsonString(
Response.success(Lists.newArrayList(streamFieldList))))));
- List<StreamField> responseList = streamClient.parseFields("{\"test_name\":\"string\"}");
+ List<StreamField> responseList = streamClient.parseFields(STATEMENT_TYPE_JSON, "{\"test_name\":\"string\"}");
Assertions.assertEquals(JsonUtils.toJsonString(responseList), JsonUtils.toJsonString(streamFieldList));
}
@@ -1052,7 +1053,7 @@ class ClientFactoryTest {
okJson(JsonUtils.toJsonString(
Response.success(Lists.newArrayList(sinkFieldList))))));
- List<SinkField> responseList = sinkClient.parseFields("{\"test_name\":\"string\"}");
+ List<SinkField> responseList = sinkClient.parseFields(STATEMENT_TYPE_JSON, "{\"test_name\":\"string\"}");
Assertions.assertEquals(JsonUtils.toJsonString(responseList), JsonUtils.toJsonString(sinkFieldList));
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index bf643bf22..708042fca 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -148,4 +148,9 @@ public class InlongConstants {
public static final int DEFAULT_ENABLE_VALUE = 1;
+ public static final String STATEMENT_TYPE_SQL = "sql";
+ public static final String STATEMENT_TYPE_JSON = "json";
+
+ public static final String SORT_TYPE_INFO_SUFFIX = "TypeInfo";
+
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ParseFieldRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ParseFieldRequest.java
new file mode 100644
index 000000000..206a4b822
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ParseFieldRequest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * Parse field request - with stream
+ */
+@Data
+@Builder
+@ApiModel("Parse field request - with stream")
+public class ParseFieldRequest {
+
+ @ApiModelProperty("Parse method: json or sql")
+ private String method;
+ @ApiModelProperty("Statement for create fields")
+ private String statement;
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java
index b1d297030..f1cdac202 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java
@@ -18,7 +18,7 @@
package org.apache.inlong.manager.pojo.sort.util;
import com.google.common.collect.Lists;
-import lombok.extern.slf4j.Slf4j;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.enums.MetaField;
import org.apache.inlong.manager.common.enums.FieldType;
@@ -53,6 +53,8 @@ import org.apache.inlong.sort.formats.common.VarBinaryFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.MetaFieldInfo;
+import lombok.extern.slf4j.Slf4j;
+import java.math.BigDecimal;
import java.util.List;
import static org.apache.inlong.manager.common.consts.InlongConstants.LEFT_BRACKET;
@@ -95,11 +97,11 @@ public class FieldInfoUtils {
/*
* public static List<FieldMappingUnit> createFieldInfo( List<StreamField> streamFieldList, List<SinkField>
* fieldList, List<FieldInfo> sourceFields, List<FieldInfo> sinkFields) {
- *
+ *
* // Set source field info list. for (StreamField field : streamFieldList) { FieldInfo sourceField =
* getFieldInfo(field.getFieldName(), field.getFieldType(), field.getIsMetaField() == 1, field.getMetaFieldName(),
* field.getFieldFormat()); sourceFields.add(sourceField); }
- *
+ *
* List<FieldMappingUnit> mappingUnitList = new ArrayList<>(); // Get sink field info list, if the field name equals
* to build-in field, new a build-in field info for (SinkField field : fieldList) { FieldInfo sinkField =
* getFieldInfo(field.getFieldName(), field.getFieldType(), field.getIsMetaField() == 1, field.getMetaFieldName(),
@@ -107,7 +109,7 @@ public class FieldInfoUtils {
* FieldInfo sourceField = getFieldInfo(field.getSourceFieldName(), field.getSourceFieldType(),
* field.getIsMetaField() == 1, field.getMetaFieldName(), field.getFieldFormat()); mappingUnitList.add(new
* FieldMappingUnit(sourceField, sinkField)); } }
- *
+ *
* return mappingUnitList; }
*/
@@ -137,7 +139,7 @@ public class FieldInfoUtils {
* : MetaField.values()) { MetaFieldInfo fieldInfo = new MetaFieldInfo(metaField.name(), metaField);
* sourceFields.add(fieldInfo); sinkFields.add(fieldInfo); mappingUnitList.add(new FieldMappingUnit(fieldInfo,
* fieldInfo)); }
- *
+ *
* return mappingUnitList; }
*/
@@ -166,6 +168,37 @@ public class FieldInfoUtils {
return formatInfo;
}
+ /**
+ * Convert SQL type names to Java types.
+ */
+ public static Class<?> sqlTypeToJavaType(String type) {
+ FieldType fieldType = FieldType.forName(StringUtils.substringBefore(type, LEFT_BRACKET));
+ switch (fieldType) {
+ case BOOLEAN:
+ return Boolean.class;
+ case TINYINT:
+ case SMALLINT:
+ case INT:
+ return int.class;
+ case BIGINT:
+ return Long.class;
+ case FLOAT:
+ return Float.class;
+ case DOUBLE:
+ return Double.class;
+ case DECIMAL:
+ return BigDecimal.class;
+ case VARCHAR:
+ return String.class;
+ case DATE:
+ case TIME:
+ case TIMESTAMP:
+ return java.util.Date.class;
+ default:
+ return Object.class;
+ }
+ }
+
/**
* Get the FieldFormat of Sort according to type string
*
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index caa99873b..bfaebf883 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.sink;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
import org.apache.inlong.manager.pojo.sink.SinkApproveDTO;
import org.apache.inlong.manager.pojo.sink.SinkBriefInfo;
import org.apache.inlong.manager.pojo.sink.SinkField;
@@ -246,10 +247,10 @@ public interface StreamSinkService {
Boolean updateAfterApprove(List<SinkApproveDTO> sinkApproveList, String operator);
/**
- * Converts a json string to a sinkFields
+ * Converts a statement to a sinkFields
*
- * @param fieldsJson JSON string for the field information
+ * @param parseFieldRequest the request for parse field
* @return list of sink field
*/
- List<SinkField> parseFields(String fieldsJson);
+ List<SinkField> parseFields(ParseFieldRequest parseFieldRequest);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 6ed96ee84..e11532de7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.sink;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.pagehelper.Page;
@@ -45,6 +46,7 @@ import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
import org.apache.inlong.manager.pojo.sink.SinkApproveDTO;
import org.apache.inlong.manager.pojo.sink.SinkBriefInfo;
import org.apache.inlong.manager.pojo.sink.SinkField;
@@ -63,14 +65,25 @@ import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
+import net.sf.jsqlparser.JSQLParserException;
+import net.sf.jsqlparser.parser.CCJSqlParserManager;
+import net.sf.jsqlparser.statement.Statement;
+import net.sf.jsqlparser.statement.create.table.ColDataType;
+import net.sf.jsqlparser.statement.create.table.ColumnDefinition;
+import net.sf.jsqlparser.statement.create.table.CreateTable;
+import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.inlong.manager.common.consts.InlongConstants.LEFT_BRACKET;
+import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
+
/**
* Implementation of sink service interface
*/
@@ -687,17 +700,24 @@ public class StreamSinkServiceImpl implements StreamSinkService {
}
@Override
- public List<SinkField> parseFields(String fieldsJson) {
+ public List<SinkField> parseFields(ParseFieldRequest parseFieldRequest) {
try {
- Map<String, String> fieldsMap = objectMapper.readValue(fieldsJson,
- new TypeReference<Map<String, String>>() {
- });
- return fieldsMap.keySet().stream().map(fieldName -> {
+ String method = parseFieldRequest.getMethod();
+ String statement = parseFieldRequest.getStatement();
+
+ Map<String, String> fieldsMap;
+ if (STATEMENT_TYPE_JSON.equals(method)) {
+ fieldsMap = parseFieldsByJson(statement);
+ } else {
+ fieldsMap = parseFieldsBySql(statement);
+ }
+ return fieldsMap.entrySet().stream().map(entry -> {
SinkField field = new SinkField();
- field.setFieldName(fieldName);
- field.setFieldType(fieldsMap.get(fieldName));
+ field.setFieldName(entry.getKey());
+ field.setFieldType(entry.getValue());
return field;
}).collect(Collectors.toList());
+
} catch (Exception e) {
LOGGER.error("parse sink fields error", e);
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
@@ -705,6 +725,37 @@ public class StreamSinkServiceImpl implements StreamSinkService {
}
}
+ private Map<String, String> parseFieldsBySql(String sql) throws JSQLParserException {
+ CCJSqlParserManager pm = new CCJSqlParserManager();
+ Statement statement = pm.parse(new StringReader(sql));
+ LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+ if (statement instanceof CreateTable) {
+ CreateTable createTable = (CreateTable) statement;
+ List<ColumnDefinition> columnDefinitions = createTable.getColumnDefinitions();
+ // get column definition
+ for (ColumnDefinition definition : columnDefinitions) {
+ // get field name
+ String columnName = definition.getColumnName();
+ ColDataType colDataType = definition.getColDataType();
+ String sqlDataType = colDataType.getDataType();
+ // get field type
+ String realDataType = StringUtils.substringBefore(sqlDataType, LEFT_BRACKET).toLowerCase();
+ fields.put(columnName, realDataType);
+ }
+ } else {
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+ "The SQL statement must be a table creation statement");
+ }
+ return fields;
+ }
+
+ private Map<String, String> parseFieldsByJson(String statement) throws JsonProcessingException {
+ // Use LinkedHashMap deserialization to keep the order of the fields
+ return objectMapper.readValue(statement,
+ new TypeReference<LinkedHashMap<String, String>>() {
+ });
+ }
+
private void checkSinkRequestParams(SinkRequest request) {
// check request parameter
// check group id
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
index 51b15be14..b13a05206 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.service.stream;
import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamApproveRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
@@ -232,10 +233,10 @@ public interface InlongStreamService {
void logicDeleteDlqOrRlq(String bid, String topicName, String operator);
/**
- * Converts a json string to a streamFields
+ * Converts a statement to a streamFields
*
- * @param fieldsJson JSON string for the field information
+ * @param parseFieldRequest parse field request
* @return list of stream field
*/
- List<StreamField> parseFields(String fieldsJson);
+ List<StreamField> parseFields(ParseFieldRequest parseFieldRequest);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 91db42763..e50a4c4be 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.stream;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.pagehelper.Page;
@@ -41,8 +42,10 @@ import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
import org.apache.inlong.manager.pojo.sink.SinkBriefInfo;
import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.stream.InlongStreamApproveRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
@@ -63,14 +66,23 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
+import net.sf.jsqlparser.JSQLParserException;
+import net.sf.jsqlparser.parser.CCJSqlParserManager;
+import net.sf.jsqlparser.statement.Statement;
+import net.sf.jsqlparser.statement.create.table.ColDataType;
+import net.sf.jsqlparser.statement.create.table.ColumnDefinition;
+import net.sf.jsqlparser.statement.create.table.CreateTable;
+import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
+import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
import static org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.packExtParams;
import static org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.unpackExtParams;
@@ -713,17 +725,24 @@ public class InlongStreamServiceImpl implements InlongStreamService {
}
@Override
- public List<StreamField> parseFields(String fieldsJson) {
+ public List<StreamField> parseFields(ParseFieldRequest parseFieldRequest) {
try {
- Map<String, String> fieldsMap = objectMapper.readValue(fieldsJson,
- new TypeReference<Map<String, String>>() {
- });
- return fieldsMap.keySet().stream().map(fieldName -> {
+ String method = parseFieldRequest.getMethod();
+ String statement = parseFieldRequest.getStatement();
+
+ Map<String, String> fieldsMap;
+ if (STATEMENT_TYPE_JSON.equals(method)) {
+ fieldsMap = parseFieldsByJson(statement);
+ } else {
+ fieldsMap = parseFieldsBySql(statement);
+ }
+ return fieldsMap.entrySet().stream().map(entry -> {
StreamField field = new StreamField();
- field.setFieldName(fieldName);
- field.setFieldType(fieldsMap.get(fieldName));
+ field.setFieldName(entry.getKey());
+ field.setFieldType(entry.getValue());
return field;
}).collect(Collectors.toList());
+
} catch (Exception e) {
LOGGER.error("parse inlong stream fields error", e);
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
@@ -731,6 +750,43 @@ public class InlongStreamServiceImpl implements InlongStreamService {
}
}
+ private Map<String, String> parseFieldsBySql(String sql) throws JSQLParserException {
+ CCJSqlParserManager pm = new CCJSqlParserManager();
+ Statement statement = pm.parse(new StringReader(sql));
+ LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+ if (statement instanceof CreateTable) {
+ CreateTable createTable = (CreateTable) statement;
+ List<ColumnDefinition> columnDefinitions = createTable.getColumnDefinitions();
+ // get column definition
+ for (int i = 0; i < columnDefinitions.size(); i++) {
+ ColumnDefinition definition = columnDefinitions.get(i);
+ // get field name
+ String columnName = definition.getColumnName();
+ ColDataType colDataType = definition.getColDataType();
+ String sqlDataType = colDataType.getDataType();
+ // convert SQL type to Java type
+ Class<?> clazz = FieldInfoUtils.sqlTypeToJavaType(sqlDataType);
+ if (clazz == Object.class) {
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+ "Unrecognized SQL field type, line: " + (i + 1) + ", type: " + sqlDataType);
+ }
+ String type = clazz.getSimpleName().toLowerCase();
+ fields.put(columnName, type);
+ }
+ } else {
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+ "The SQL statement must be a table creation statement");
+ }
+ return fields;
+ }
+
+ private Map<String, String> parseFieldsByJson(String statement) throws JsonProcessingException {
+ // Use LinkedHashMap deserialization to keep the order of the fields
+ return objectMapper.readValue(statement,
+ new TypeReference<LinkedHashMap<String, String>>() {
+ });
+ }
+
/**
* Update field information
* <p/>
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java
index 09b36f3b8..ca089815b 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.stream;
+import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.service.ServiceBaseTest;
@@ -28,13 +29,16 @@ import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
+import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
+
public class InlongStreamTest extends ServiceBaseTest {
@Autowired
protected StreamSinkService streamSinkService;
@Test
- public void testParseStreamFields() {
+ public void testParseStreamFieldsByJson() {
String streamFieldsJson = "{\"name0\":\"string\",\"name1\":\"string\"}";
List<StreamField> expectStreamFields = new ArrayList<>();
for (int i = 0; i < 2; i++) {
@@ -44,13 +48,15 @@ public class InlongStreamTest extends ServiceBaseTest {
expectStreamFields.add(field);
}
StreamField[] expectResult = expectStreamFields.toArray(new StreamField[0]);
- List<StreamField> streamFields = streamService.parseFields(streamFieldsJson);
+ ParseFieldRequest request =
+ ParseFieldRequest.builder().method(STATEMENT_TYPE_JSON).statement(streamFieldsJson).build();
+ List<StreamField> streamFields = streamService.parseFields(request);
StreamField[] result = streamFields.toArray(new StreamField[0]);
Assertions.assertArrayEquals(expectResult, result);
}
@Test
- public void testParseSinkFields() {
+ public void testParseSinkFieldsByJson() {
String sinkFieldsJson = "{\"sinkFieldName0\":\"string\",\"sinkFieldName1\":\"string\"}";
List<SinkField> expectSinkFields = new ArrayList<>();
for (int i = 0; i < 2; i++) {
@@ -60,8 +66,47 @@ public class InlongStreamTest extends ServiceBaseTest {
expectSinkFields.add(field);
}
SinkField[] expectResult = expectSinkFields.toArray(new SinkField[0]);
- List<SinkField> sinkFields = streamSinkService.parseFields(sinkFieldsJson);
+ ParseFieldRequest parseFieldRequest =
+ ParseFieldRequest.builder().method(STATEMENT_TYPE_JSON).statement(sinkFieldsJson).build();
+ List<SinkField> sinkFields = streamSinkService.parseFields(parseFieldRequest);
SinkField[] result = sinkFields.toArray(new SinkField[0]);
Assertions.assertArrayEquals(expectResult, result);
}
+
+ @Test
+ public void testParseStreamFieldsBySql() {
+ String streamFieldsSql = "CREATE TABLE my_table (name0 VARCHAR(50), name1 VARCHAR(50))";
+ List<StreamField> expectStreamFields = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ StreamField field = new StreamField();
+ field.setFieldName("name" + i);
+ field.setFieldType("string");
+ expectStreamFields.add(field);
+ }
+ StreamField[] expectResult = expectStreamFields.toArray(new StreamField[0]);
+ ParseFieldRequest request =
+ ParseFieldRequest.builder().method(STATEMENT_TYPE_SQL).statement(streamFieldsSql).build();
+ List<StreamField> streamFields = streamService.parseFields(request);
+ StreamField[] result = streamFields.toArray(new StreamField[0]);
+ Assertions.assertArrayEquals(expectResult, result);
+ }
+
+ @Test
+ public void testParseSinkFieldsBySql() {
+ String sinkFieldsSql = "CREATE TABLE my_table (sinkFieldName0 VARCHAR(50), sinkFieldName1 VARCHAR(50))";
+ List<SinkField> expectSinkFields = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ SinkField field = new SinkField();
+ field.setFieldName("sinkFieldName" + i);
+ field.setFieldType("varchar");
+ expectSinkFields.add(field);
+ }
+ SinkField[] expectResult = expectSinkFields.toArray(new SinkField[0]);
+ ParseFieldRequest parseFieldRequest =
+ ParseFieldRequest.builder().method(STATEMENT_TYPE_SQL).statement(sinkFieldsSql).build();
+ List<SinkField> sinkFields = streamSinkService.parseFields(parseFieldRequest);
+ SinkField[] result = sinkFields.toArray(new SinkField[0]);
+ Assertions.assertArrayEquals(expectResult, result);
+ }
+
}
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
index 7f9b86f96..8015b47a9 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
@@ -26,6 +26,7 @@ import org.apache.inlong.manager.common.enums.OperationType;
import org.apache.inlong.manager.common.validation.UpdateValidation;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
@@ -177,10 +178,11 @@ public class InlongStreamController {
}
@RequestMapping(value = "/stream/parseFields", method = RequestMethod.POST)
- @ApiOperation(value = "Parse inlong stream fields from JSON string")
- @ApiImplicitParam(name = "fieldsJson", dataTypeClass = String.class, required = true)
- public Response<List<StreamField>> parseFields(@RequestBody String fieldsJson) {
- return Response.success(streamService.parseFields(fieldsJson));
+ @ApiOperation(value = "Parse inlong stream fields from statement")
+ @ApiImplicitParam(name = "parseFieldRequest", dataTypeClass = ParseFieldRequest.class, required = true)
+
+ public Response<List<StreamField>> parseFields(@RequestBody ParseFieldRequest request) {
+ return Response.success(streamService.parseFields(request));
}
}
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
index cdc11351f..59e1a0c9f 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
@@ -27,6 +27,7 @@ import org.apache.inlong.manager.common.validation.UpdateByKeyValidation;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.common.UpdateResult;
+import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
@@ -119,10 +120,10 @@ public class StreamSinkController {
}
@RequestMapping(value = "/sink/parseFields", method = RequestMethod.POST)
- @ApiOperation(value = "parse stream sink fields from JSON string")
- @ApiImplicitParam(name = "fieldsJson", dataTypeClass = String.class, required = true)
- public Response<List<SinkField>> parseFields(@RequestBody String fieldsJson) {
- return Response.success(sinkService.parseFields(fieldsJson));
+ @ApiOperation(value = "parse stream sink fields from statement")
+ @ApiImplicitParam(name = "parseFieldRequest", dataTypeClass = ParseFieldRequest.class, required = true)
+ public Response<List<SinkField>> parseFields(@RequestBody ParseFieldRequest parseFieldRequest) {
+ return Response.success(sinkService.parseFields(parseFieldRequest));
}
}