You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/30 07:38:37 UTC

[inlong] branch master updated: [INLONG-7688][Manager] Creating schema of StreamSource by SQL (#7689)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ce14049c [INLONG-7688][Manager] Creating schema of StreamSource by SQL (#7689)
3ce14049c is described below

commit 3ce14049c8dd9c7d102608ba08a28046a91f76b2
Author: feat <fe...@outlook.com>
AuthorDate: Thu Mar 30 15:38:30 2023 +0800

    [INLONG-7688][Manager] Creating schema of StreamSource by SQL (#7689)
---
 .../api/inner/client/InlongStreamClient.java       | 25 +++++++-
 .../client/api/inner/client/StreamSinkClient.java  | 24 +++++++-
 .../client/api/service/InlongStreamApi.java        |  3 +-
 .../manager/client/api/service/StreamSinkApi.java  |  3 +-
 .../client/api/inner/ClientFactoryTest.java        |  5 +-
 .../manager/common/consts/InlongConstants.java     |  5 ++
 .../manager/pojo/sink/ParseFieldRequest.java       | 37 ++++++++++++
 .../manager/pojo/sort/util/FieldInfoUtils.java     | 43 +++++++++++--
 .../manager/service/sink/StreamSinkService.java    |  7 ++-
 .../service/sink/StreamSinkServiceImpl.java        | 65 +++++++++++++++++---
 .../service/stream/InlongStreamService.java        |  7 ++-
 .../service/stream/InlongStreamServiceImpl.java    | 70 +++++++++++++++++++---
 .../manager/service/stream/InlongStreamTest.java   | 53 ++++++++++++++--
 .../web/controller/InlongStreamController.java     | 10 ++--
 .../web/controller/StreamSinkController.java       |  9 +--
 15 files changed, 319 insertions(+), 47 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
index 0213ca436..c5a2f780b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
@@ -25,6 +25,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
 import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
@@ -32,6 +33,9 @@ import org.apache.inlong.manager.pojo.stream.StreamField;
 
 import java.util.List;
 
+import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
+import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
+
 /**
  * Client for {@link InlongStreamApi}.
  */
@@ -218,12 +222,27 @@ public class InlongStreamClient {
     /**
      * Converts a json string to a streamFields
      *
-     * @param fieldsJson JSON string for the field information
+     * @param parseFieldRequest the request for the field information
      * @return list of stream field
      */
-    public List<StreamField> parseFields(String fieldsJson) {
-        Response<List<StreamField>> response = ClientUtils.executeHttpCall(inlongStreamApi.parseFields(fieldsJson));
+    public List<StreamField> parseFields(ParseFieldRequest parseFieldRequest) {
+        Response<List<StreamField>> response =
+                ClientUtils.executeHttpCall(inlongStreamApi.parseFields(parseFieldRequest));
         ClientUtils.assertRespSuccess(response);
         return response.getData();
     }
+
+    /**
+     * Converts a json string to a streamFields
+     *     @param method the method for the field information: json or sql
+     * @param statement the statement for the field information
+     * @return list of stream field
+     */
+    public List<StreamField> parseFields(String method, String statement) {
+        Preconditions.expectTrue(STATEMENT_TYPE_JSON.equals(method) || STATEMENT_TYPE_SQL.equals(method),
+                "Unsupported parse field method: '" + method + "'");
+        Preconditions.expectNotBlank(statement, "The statement must not empty");
+        ParseFieldRequest request = ParseFieldRequest.builder().method(method).statement(statement).build();
+        return parseFields(request);
+    }
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
index e8f032c61..6cf9ccc03 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
@@ -25,6 +25,7 @@ import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.common.UpdateResult;
+import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
 import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
@@ -32,6 +33,9 @@ import org.apache.inlong.manager.pojo.sink.StreamSink;
 
 import java.util.List;
 
+import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
+import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
+
 /**
  * Client for {@link StreamSinkApi}.
  */
@@ -137,12 +141,26 @@ public class StreamSinkClient {
     /**
      * Converts a json string to a sinkFields
      *
-     * @param fieldsJson JSON string for the field information
+     * @param parseFieldRequest the request for the field information
      * @return list of sink field
      */
-    public List<SinkField> parseFields(String fieldsJson) {
-        Response<List<SinkField>> response = ClientUtils.executeHttpCall(streamSinkApi.parseFields(fieldsJson));
+    public List<SinkField> parseFields(ParseFieldRequest parseFieldRequest) {
+        Response<List<SinkField>> response = ClientUtils.executeHttpCall(streamSinkApi.parseFields(parseFieldRequest));
         ClientUtils.assertRespSuccess(response);
         return response.getData();
     }
+
+    /**
+     * Converts a json string to a streamFields
+     *     @param method the method for the field information: json or sql
+     * @param statement the statement for the field information
+     * @return list of stream field
+     */
+    public List<SinkField> parseFields(String method, String statement) {
+        Preconditions.expectTrue(STATEMENT_TYPE_JSON.equals(method) || STATEMENT_TYPE_SQL.equals(method),
+                "Unsupported parse field method: '" + method + "'");
+        Preconditions.expectNotBlank(statement, "The statement must not empty");
+        ParseFieldRequest request = ParseFieldRequest.builder().method(method).statement(statement).build();
+        return parseFields(request);
+    }
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
index 2db47beda..0a9c2edac 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.client.api.service;
 
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
 import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
@@ -70,5 +71,5 @@ public interface InlongStreamApi {
     Call<Response<Boolean>> delete(@Query("groupId") String groupId, @Query("streamId") String streamId);
 
     @POST("stream/parseFields")
-    Call<Response<List<StreamField>>> parseFields(@Body String fieldsJson);
+    Call<Response<List<StreamField>>> parseFields(@Body ParseFieldRequest parseFieldRequest);
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
index a28242f71..4dfc7de14 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.client.api.service;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.common.UpdateResult;
+import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
 import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
@@ -59,6 +60,6 @@ public interface StreamSinkApi {
     Call<Response<PageResult<StreamSink>>> list(@Body SinkPageRequest request);
 
     @POST("sink/parseFields")
-    Call<Response<List<SinkField>>> parseFields(@Body String fieldsJson);
+    Call<Response<List<SinkField>>> parseFields(@Body ParseFieldRequest parseFieldRequest);
 
 }
diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
index eeaed841e..6e519df7c 100644
--- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
+++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
@@ -106,6 +106,7 @@ import static com.github.tomakehurst.wiremock.client.WireMock.post;
 import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
 import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
 import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
+import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
 
 /**
  * Unit test for {@link ClientFactory}.
@@ -1034,7 +1035,7 @@ class ClientFactoryTest {
                                 okJson(JsonUtils.toJsonString(
                                         Response.success(Lists.newArrayList(streamFieldList))))));
 
-        List<StreamField> responseList = streamClient.parseFields("{\"test_name\":\"string\"}");
+        List<StreamField> responseList = streamClient.parseFields(STATEMENT_TYPE_JSON, "{\"test_name\":\"string\"}");
         Assertions.assertEquals(JsonUtils.toJsonString(responseList), JsonUtils.toJsonString(streamFieldList));
 
     }
@@ -1052,7 +1053,7 @@ class ClientFactoryTest {
                                 okJson(JsonUtils.toJsonString(
                                         Response.success(Lists.newArrayList(sinkFieldList))))));
 
-        List<SinkField> responseList = sinkClient.parseFields("{\"test_name\":\"string\"}");
+        List<SinkField> responseList = sinkClient.parseFields(STATEMENT_TYPE_JSON, "{\"test_name\":\"string\"}");
         Assertions.assertEquals(JsonUtils.toJsonString(responseList), JsonUtils.toJsonString(sinkFieldList));
 
     }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index bf643bf22..708042fca 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -148,4 +148,9 @@ public class InlongConstants {
 
     public static final int DEFAULT_ENABLE_VALUE = 1;
 
+    public static final String STATEMENT_TYPE_SQL = "sql";
+    public static final String STATEMENT_TYPE_JSON = "json";
+
+    public static final String SORT_TYPE_INFO_SUFFIX = "TypeInfo";
+
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ParseFieldRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ParseFieldRequest.java
new file mode 100644
index 000000000..206a4b822
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ParseFieldRequest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * Parse field request - with stream
+ */
+@Data
+@Builder
+@ApiModel("Parse field request - with stream")
+public class ParseFieldRequest {
+
+    @ApiModelProperty("Parse method: json or sql")
+    private String method;
+    @ApiModelProperty("Statement for create fields")
+    private String statement;
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java
index b1d297030..f1cdac202 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.manager.pojo.sort.util;
 
 import com.google.common.collect.Lists;
-import lombok.extern.slf4j.Slf4j;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.enums.MetaField;
 import org.apache.inlong.manager.common.enums.FieldType;
@@ -53,6 +53,8 @@ import org.apache.inlong.sort.formats.common.VarBinaryFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.MetaFieldInfo;
 
+import lombok.extern.slf4j.Slf4j;
+import java.math.BigDecimal;
 import java.util.List;
 
 import static org.apache.inlong.manager.common.consts.InlongConstants.LEFT_BRACKET;
@@ -95,11 +97,11 @@ public class FieldInfoUtils {
     /*
      * public static List<FieldMappingUnit> createFieldInfo( List<StreamField> streamFieldList, List<SinkField>
      * fieldList, List<FieldInfo> sourceFields, List<FieldInfo> sinkFields) {
-     * 
+     *
      * // Set source field info list. for (StreamField field : streamFieldList) { FieldInfo sourceField =
      * getFieldInfo(field.getFieldName(), field.getFieldType(), field.getIsMetaField() == 1, field.getMetaFieldName(),
      * field.getFieldFormat()); sourceFields.add(sourceField); }
-     * 
+     *
      * List<FieldMappingUnit> mappingUnitList = new ArrayList<>(); // Get sink field info list, if the field name equals
      * to build-in field, new a build-in field info for (SinkField field : fieldList) { FieldInfo sinkField =
      * getFieldInfo(field.getFieldName(), field.getFieldType(), field.getIsMetaField() == 1, field.getMetaFieldName(),
@@ -107,7 +109,7 @@ public class FieldInfoUtils {
      * FieldInfo sourceField = getFieldInfo(field.getSourceFieldName(), field.getSourceFieldType(),
      * field.getIsMetaField() == 1, field.getMetaFieldName(), field.getFieldFormat()); mappingUnitList.add(new
      * FieldMappingUnit(sourceField, sinkField)); } }
-     * 
+     *
      * return mappingUnitList; }
      */
 
@@ -137,7 +139,7 @@ public class FieldInfoUtils {
      * : MetaField.values()) { MetaFieldInfo fieldInfo = new MetaFieldInfo(metaField.name(), metaField);
      * sourceFields.add(fieldInfo); sinkFields.add(fieldInfo); mappingUnitList.add(new FieldMappingUnit(fieldInfo,
      * fieldInfo)); }
-     * 
+     *
      * return mappingUnitList; }
      */
 
@@ -166,6 +168,37 @@ public class FieldInfoUtils {
         return formatInfo;
     }
 
+    /**
+     * Convert SQL type names to Java types.
+     */
+    public static Class<?> sqlTypeToJavaType(String type) {
+        FieldType fieldType = FieldType.forName(StringUtils.substringBefore(type, LEFT_BRACKET));
+        switch (fieldType) {
+            case BOOLEAN:
+                return Boolean.class;
+            case TINYINT:
+            case SMALLINT:
+            case INT:
+                return int.class;
+            case BIGINT:
+                return Long.class;
+            case FLOAT:
+                return Float.class;
+            case DOUBLE:
+                return Double.class;
+            case DECIMAL:
+                return BigDecimal.class;
+            case VARCHAR:
+                return String.class;
+            case DATE:
+            case TIME:
+            case TIMESTAMP:
+                return java.util.Date.class;
+            default:
+                return Object.class;
+        }
+    }
+
     /**
      * Get the FieldFormat of Sort according to type string
      *
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index caa99873b..bfaebf883 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.sink;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.UpdateResult;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
 import org.apache.inlong.manager.pojo.sink.SinkApproveDTO;
 import org.apache.inlong.manager.pojo.sink.SinkBriefInfo;
 import org.apache.inlong.manager.pojo.sink.SinkField;
@@ -246,10 +247,10 @@ public interface StreamSinkService {
     Boolean updateAfterApprove(List<SinkApproveDTO> sinkApproveList, String operator);
 
     /**
-     * Converts a json string to a sinkFields
+     * Converts a statement to a sinkFields
      *
-     * @param fieldsJson JSON string for the field information
+     * @param parseFieldRequest the request for parse field
      * @return list of sink field
      */
-    List<SinkField> parseFields(String fieldsJson);
+    List<SinkField> parseFields(ParseFieldRequest parseFieldRequest);
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 6ed96ee84..e11532de7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.sink;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.github.pagehelper.Page;
@@ -45,6 +46,7 @@ import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.UpdateResult;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
 import org.apache.inlong.manager.pojo.sink.SinkApproveDTO;
 import org.apache.inlong.manager.pojo.sink.SinkBriefInfo;
 import org.apache.inlong.manager.pojo.sink.SinkField;
@@ -63,14 +65,25 @@ import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
+import net.sf.jsqlparser.JSQLParserException;
+import net.sf.jsqlparser.parser.CCJSqlParserManager;
+import net.sf.jsqlparser.statement.Statement;
+import net.sf.jsqlparser.statement.create.table.ColDataType;
+import net.sf.jsqlparser.statement.create.table.ColumnDefinition;
+import net.sf.jsqlparser.statement.create.table.CreateTable;
+import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.inlong.manager.common.consts.InlongConstants.LEFT_BRACKET;
+import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
+
 /**
  * Implementation of sink service interface
  */
@@ -687,17 +700,24 @@ public class StreamSinkServiceImpl implements StreamSinkService {
     }
 
     @Override
-    public List<SinkField> parseFields(String fieldsJson) {
+    public List<SinkField> parseFields(ParseFieldRequest parseFieldRequest) {
         try {
-            Map<String, String> fieldsMap = objectMapper.readValue(fieldsJson,
-                    new TypeReference<Map<String, String>>() {
-                    });
-            return fieldsMap.keySet().stream().map(fieldName -> {
+            String method = parseFieldRequest.getMethod();
+            String statement = parseFieldRequest.getStatement();
+
+            Map<String, String> fieldsMap;
+            if (STATEMENT_TYPE_JSON.equals(method)) {
+                fieldsMap = parseFieldsByJson(statement);
+            } else {
+                fieldsMap = parseFieldsBySql(statement);
+            }
+            return fieldsMap.entrySet().stream().map(entry -> {
                 SinkField field = new SinkField();
-                field.setFieldName(fieldName);
-                field.setFieldType(fieldsMap.get(fieldName));
+                field.setFieldName(entry.getKey());
+                field.setFieldType(entry.getValue());
                 return field;
             }).collect(Collectors.toList());
+
         } catch (Exception e) {
             LOGGER.error("parse sink fields error", e);
             throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
@@ -705,6 +725,37 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         }
     }
 
+    private Map<String, String> parseFieldsBySql(String sql) throws JSQLParserException {
+        CCJSqlParserManager pm = new CCJSqlParserManager();
+        Statement statement = pm.parse(new StringReader(sql));
+        LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+        if (statement instanceof CreateTable) {
+            CreateTable createTable = (CreateTable) statement;
+            List<ColumnDefinition> columnDefinitions = createTable.getColumnDefinitions();
+            // get column definition
+            for (ColumnDefinition definition : columnDefinitions) {
+                // get field name
+                String columnName = definition.getColumnName();
+                ColDataType colDataType = definition.getColDataType();
+                String sqlDataType = colDataType.getDataType();
+                // get field type
+                String realDataType = StringUtils.substringBefore(sqlDataType, LEFT_BRACKET).toLowerCase();
+                fields.put(columnName, realDataType);
+            }
+        } else {
+            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+                    "The SQL statement must be a table creation statement");
+        }
+        return fields;
+    }
+
+    private Map<String, String> parseFieldsByJson(String statement) throws JsonProcessingException {
+        // Use LinkedHashMap deserialization to keep the order of the fields
+        return objectMapper.readValue(statement,
+                new TypeReference<LinkedHashMap<String, String>>() {
+                });
+    }
+
     private void checkSinkRequestParams(SinkRequest request) {
         // check request parameter
         // check group id
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
index 51b15be14..b13a05206 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.service.stream;
 
 import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
 import org.apache.inlong.manager.pojo.stream.InlongStreamApproveRequest;
 import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
@@ -232,10 +233,10 @@ public interface InlongStreamService {
     void logicDeleteDlqOrRlq(String bid, String topicName, String operator);
 
     /**
-     * Converts a json string to a streamFields
+     * Converts a statement to a streamFields
      *
-     * @param fieldsJson JSON string for the field information
+     * @param parseFieldRequest    parse field request
      * @return list of stream field
      */
-    List<StreamField> parseFields(String fieldsJson);
+    List<StreamField> parseFields(ParseFieldRequest parseFieldRequest);
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 91db42763..e50a4c4be 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.stream;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.github.pagehelper.Page;
@@ -41,8 +42,10 @@ import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper;
 import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
 import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
 import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
 import org.apache.inlong.manager.pojo.sink.SinkBriefInfo;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
 import org.apache.inlong.manager.pojo.source.StreamSource;
 import org.apache.inlong.manager.pojo.stream.InlongStreamApproveRequest;
 import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
@@ -63,14 +66,23 @@ import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
+import net.sf.jsqlparser.JSQLParserException;
+import net.sf.jsqlparser.parser.CCJSqlParserManager;
+import net.sf.jsqlparser.statement.Statement;
+import net.sf.jsqlparser.statement.create.table.ColDataType;
+import net.sf.jsqlparser.statement.create.table.ColumnDefinition;
+import net.sf.jsqlparser.statement.create.table.CreateTable;
+import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
+import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
 import static org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.packExtParams;
 import static org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.unpackExtParams;
 
@@ -713,17 +725,24 @@ public class InlongStreamServiceImpl implements InlongStreamService {
     }
 
     @Override
-    public List<StreamField> parseFields(String fieldsJson) {
+    public List<StreamField> parseFields(ParseFieldRequest parseFieldRequest) {
         try {
-            Map<String, String> fieldsMap = objectMapper.readValue(fieldsJson,
-                    new TypeReference<Map<String, String>>() {
-                    });
-            return fieldsMap.keySet().stream().map(fieldName -> {
+            String method = parseFieldRequest.getMethod();
+            String statement = parseFieldRequest.getStatement();
+
+            Map<String, String> fieldsMap;
+            if (STATEMENT_TYPE_JSON.equals(method)) {
+                fieldsMap = parseFieldsByJson(statement);
+            } else {
+                fieldsMap = parseFieldsBySql(statement);
+            }
+            return fieldsMap.entrySet().stream().map(entry -> {
                 StreamField field = new StreamField();
-                field.setFieldName(fieldName);
-                field.setFieldType(fieldsMap.get(fieldName));
+                field.setFieldName(entry.getKey());
+                field.setFieldType(entry.getValue());
                 return field;
             }).collect(Collectors.toList());
+
         } catch (Exception e) {
             LOGGER.error("parse inlong stream fields error", e);
             throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
@@ -731,6 +750,43 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         }
     }
 
+    private Map<String, String> parseFieldsBySql(String sql) throws JSQLParserException {
+        CCJSqlParserManager pm = new CCJSqlParserManager();
+        Statement statement = pm.parse(new StringReader(sql));
+        LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+        if (statement instanceof CreateTable) {
+            CreateTable createTable = (CreateTable) statement;
+            List<ColumnDefinition> columnDefinitions = createTable.getColumnDefinitions();
+            // get column definition
+            for (int i = 0; i < columnDefinitions.size(); i++) {
+                ColumnDefinition definition = columnDefinitions.get(i);
+                // get field name
+                String columnName = definition.getColumnName();
+                ColDataType colDataType = definition.getColDataType();
+                String sqlDataType = colDataType.getDataType();
+                // convert SQL type to Java type
+                Class<?> clazz = FieldInfoUtils.sqlTypeToJavaType(sqlDataType);
+                if (clazz == Object.class) {
+                    throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+                            "Unrecognized SQL field type, line: " + (i + 1) + ", type: " + sqlDataType);
+                }
+                String type = clazz.getSimpleName().toLowerCase();
+                fields.put(columnName, type);
+            }
+        } else {
+            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+                    "The SQL statement must be a table creation statement");
+        }
+        return fields;
+    }
+
+    private Map<String, String> parseFieldsByJson(String statement) throws JsonProcessingException {
+        // Use LinkedHashMap deserialization to keep the order of the fields
+        return objectMapper.readValue(statement,
+                new TypeReference<LinkedHashMap<String, String>>() {
+                });
+    }
+
     /**
      * Update field information
      * <p/>
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java
index 09b36f3b8..ca089815b 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.stream;
 
+import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.service.ServiceBaseTest;
@@ -28,13 +29,16 @@ import org.springframework.beans.factory.annotation.Autowired;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
+import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
+
 public class InlongStreamTest extends ServiceBaseTest {
 
     @Autowired
     protected StreamSinkService streamSinkService;
 
     @Test
-    public void testParseStreamFields() {
+    public void testParseStreamFieldsByJson() {
         String streamFieldsJson = "{\"name0\":\"string\",\"name1\":\"string\"}";
         List<StreamField> expectStreamFields = new ArrayList<>();
         for (int i = 0; i < 2; i++) {
@@ -44,13 +48,15 @@ public class InlongStreamTest extends ServiceBaseTest {
             expectStreamFields.add(field);
         }
         StreamField[] expectResult = expectStreamFields.toArray(new StreamField[0]);
-        List<StreamField> streamFields = streamService.parseFields(streamFieldsJson);
+        ParseFieldRequest request =
+                ParseFieldRequest.builder().method(STATEMENT_TYPE_JSON).statement(streamFieldsJson).build();
+        List<StreamField> streamFields = streamService.parseFields(request);
         StreamField[] result = streamFields.toArray(new StreamField[0]);
         Assertions.assertArrayEquals(expectResult, result);
     }
 
     @Test
-    public void testParseSinkFields() {
+    public void testParseSinkFieldsByJson() {
         String sinkFieldsJson = "{\"sinkFieldName0\":\"string\",\"sinkFieldName1\":\"string\"}";
         List<SinkField> expectSinkFields = new ArrayList<>();
         for (int i = 0; i < 2; i++) {
@@ -60,8 +66,47 @@ public class InlongStreamTest extends ServiceBaseTest {
             expectSinkFields.add(field);
         }
         SinkField[] expectResult = expectSinkFields.toArray(new SinkField[0]);
-        List<SinkField> sinkFields = streamSinkService.parseFields(sinkFieldsJson);
+        ParseFieldRequest parseFieldRequest =
+                ParseFieldRequest.builder().method(STATEMENT_TYPE_JSON).statement(sinkFieldsJson).build();
+        List<SinkField> sinkFields = streamSinkService.parseFields(parseFieldRequest);
         SinkField[] result = sinkFields.toArray(new SinkField[0]);
         Assertions.assertArrayEquals(expectResult, result);
     }
+
+    @Test
+    public void testParseStreamFieldsBySql() {
+        String streamFieldsSql = "CREATE TABLE my_table (name0 VARCHAR(50), name1 VARCHAR(50))";
+        List<StreamField> expectStreamFields = new ArrayList<>();
+        for (int i = 0; i < 2; i++) {
+            StreamField field = new StreamField();
+            field.setFieldName("name" + i);
+            field.setFieldType("string");
+            expectStreamFields.add(field);
+        }
+        StreamField[] expectResult = expectStreamFields.toArray(new StreamField[0]);
+        ParseFieldRequest request =
+                ParseFieldRequest.builder().method(STATEMENT_TYPE_SQL).statement(streamFieldsSql).build();
+        List<StreamField> streamFields = streamService.parseFields(request);
+        StreamField[] result = streamFields.toArray(new StreamField[0]);
+        Assertions.assertArrayEquals(expectResult, result);
+    }
+
+    @Test
+    public void testParseSinkFieldsBySql() {
+        String sinkFieldsSql = "CREATE TABLE my_table (sinkFieldName0 VARCHAR(50), sinkFieldName1 VARCHAR(50))";
+        List<SinkField> expectSinkFields = new ArrayList<>();
+        for (int i = 0; i < 2; i++) {
+            SinkField field = new SinkField();
+            field.setFieldName("sinkFieldName" + i);
+            field.setFieldType("varchar");
+            expectSinkFields.add(field);
+        }
+        SinkField[] expectResult = expectSinkFields.toArray(new SinkField[0]);
+        ParseFieldRequest parseFieldRequest =
+                ParseFieldRequest.builder().method(STATEMENT_TYPE_SQL).statement(sinkFieldsSql).build();
+        List<SinkField> sinkFields = streamSinkService.parseFields(parseFieldRequest);
+        SinkField[] result = sinkFields.toArray(new SinkField[0]);
+        Assertions.assertArrayEquals(expectResult, result);
+    }
+
 }
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
index 7f9b86f96..8015b47a9 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
@@ -26,6 +26,7 @@ import org.apache.inlong.manager.common.enums.OperationType;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
 import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
@@ -177,10 +178,11 @@ public class InlongStreamController {
     }
 
     @RequestMapping(value = "/stream/parseFields", method = RequestMethod.POST)
-    @ApiOperation(value = "Parse inlong stream fields from JSON string")
-    @ApiImplicitParam(name = "fieldsJson", dataTypeClass = String.class, required = true)
-    public Response<List<StreamField>> parseFields(@RequestBody String fieldsJson) {
-        return Response.success(streamService.parseFields(fieldsJson));
+    @ApiOperation(value = "Parse inlong stream fields from statement")
+    @ApiImplicitParam(name = "parseFieldRequest", dataTypeClass = ParseFieldRequest.class, required = true)
+
+    public Response<List<StreamField>> parseFields(@RequestBody ParseFieldRequest request) {
+        return Response.success(streamService.parseFields(request));
     }
 
 }
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
index cdc11351f..59e1a0c9f 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
@@ -27,6 +27,7 @@ import org.apache.inlong.manager.common.validation.UpdateByKeyValidation;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.common.UpdateResult;
+import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
@@ -119,10 +120,10 @@ public class StreamSinkController {
     }
 
     @RequestMapping(value = "/sink/parseFields", method = RequestMethod.POST)
-    @ApiOperation(value = "parse stream sink fields from JSON string")
-    @ApiImplicitParam(name = "fieldsJson", dataTypeClass = String.class, required = true)
-    public Response<List<SinkField>> parseFields(@RequestBody String fieldsJson) {
-        return Response.success(sinkService.parseFields(fieldsJson));
+    @ApiOperation(value = "parse stream sink fields from statement")
+    @ApiImplicitParam(name = "parseFieldRequest", dataTypeClass = ParseFieldRequest.class, required = true)
+    public Response<List<SinkField>> parseFields(@RequestBody ParseFieldRequest parseFieldRequest) {
+        return Response.success(sinkService.parseFields(parseFieldRequest));
     }
 
 }