You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/08/18 06:10:41 UTC

[inlong] branch master updated: [INLONG-5577][Sort][Manager] Support function fieldType (#5579)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 281c6e7c5 [INLONG-5577][Sort][Manager] Support function fieldType (#5579)
281c6e7c5 is described below

commit 281c6e7c57c849715bbe0279ae92e5ae60e71047
Author: emhui <em...@foxmail.com>
AuthorDate: Thu Aug 18 14:10:35 2022 +0800

    [INLONG-5577][Sort][Manager] Support function fieldType (#5579)
---
 .../inlong/manager/common/enums/FieldType.java     |   3 +-
 .../manager/pojo/sort/util/FieldRelationUtils.java |   6 +-
 .../manager/pojo/sort/util/LoadNodeUtils.java      |   4 +
 .../protocol/transformation/FunctionParam.java     |   4 +-
 .../transformation/function/CustomFunction.java    |  71 +++++++++++
 .../sort/parser/CustomFunctionSqlParseTest.java    | 131 +++++++++++++++++++++
 6 files changed, 214 insertions(+), 5 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
index 80a6501ed..cd54828b4 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
@@ -48,7 +48,8 @@ public enum FieldType {
     LOCAL_ZONE_TIMESTAMP,
     ARRAY,
     MAP,
-    STRUCT;
+    STRUCT,
+    FUNCTION;
 
     public static FieldType forName(String name) {
         Preconditions.checkNotNull(name, "FieldType should not be null");
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java
index 2713a7e13..870ad42ee 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java
@@ -199,7 +199,7 @@ public class FieldRelationUtils {
      */
     private static List<FieldRelation> createEncryptFieldRelations(List<StreamField> fieldList, String transformName,
             EncryptDefinition encryptDefinition, String preNodes, Map<String, StreamField> constantFieldMap) {
-        Preconditions.checkNotEmpty(preNodes, "PreNodes of splitter should not be null");
+        Preconditions.checkNotEmpty(preNodes, "PreNodes of encrypt should not be null");
         String preNode = preNodes.split(",")[0];
         List<EncryptRule> encryptRules = encryptDefinition.getEncryptRules();
         Set<String> encryptFields = Sets.newHashSet();
@@ -264,7 +264,7 @@ public class FieldRelationUtils {
     /**
      * Parse rule of encrypt.
      */
-    private static FieldRelation parseEncryptRule(EncryptRule encryptRule, Set<String> replaceFields,
+    private static FieldRelation parseEncryptRule(EncryptRule encryptRule, Set<String> encryptFields,
             String transformName, String preNode) {
         StreamField sourceField = encryptRule.getSourceField();
         final String fieldName = sourceField.getFieldName();
@@ -274,7 +274,7 @@ public class FieldRelationUtils {
         fieldInfo.setNodeId(preNode);
         FieldInfo targetFieldInfo = new FieldInfo(fieldName, transformName,
                 FieldInfoUtils.convertFieldFormat(FieldType.STRING.name()));
-        replaceFields.add(fieldName);
+        encryptFields.add(fieldName);
         EncryptFunction encryptFunction = new EncryptFunction(fieldInfo, new StringConstantParam(key),
                 new StringConstantParam(encrypt));
         return new FieldRelation(encryptFunction, targetFieldInfo);
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
index f88803989..6e96d7bfd 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.FieldType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -75,6 +76,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import org.apache.inlong.sort.protocol.transformation.function.CustomFunction;
 
 /**
  * Util for load node info.
@@ -515,6 +517,8 @@ public class LoadNodeUtils {
                         } else {
                             inputField = new ConstantParam(constantField.getFieldValue());
                         }
+                    } else if (FieldType.FUNCTION.name().equalsIgnoreCase(field.getSourceFieldType())) {
+                        inputField = new CustomFunction(field.getSourceFieldName());
                     } else {
                         inputField = new FieldInfo(field.getSourceFieldName(), field.getOriginNodeName(),
                                 FieldInfoUtils.convertFieldFormat(field.getSourceFieldType()));
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
index 8ea1b5261..effd06200 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
@@ -23,6 +23,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.MetaFieldInfo;
 import org.apache.inlong.sort.protocol.transformation.function.CascadeFunctionWrapper;
+import org.apache.inlong.sort.protocol.transformation.function.CustomFunction;
 import org.apache.inlong.sort.protocol.transformation.function.EncryptFunction;
 import org.apache.inlong.sort.protocol.transformation.function.HopEndFunction;
 import org.apache.inlong.sort.protocol.transformation.function.HopFunction;
@@ -98,7 +99,8 @@ import org.apache.inlong.sort.protocol.transformation.operator.OrOperator;
         @JsonSubTypes.Type(value = RegexpReplaceFirstFunction.class, name = "regexpReplaceFirst"),
         @JsonSubTypes.Type(value = CascadeFunctionWrapper.class, name = "cascadeFunctionWrapper"),
         @JsonSubTypes.Type(value = EncryptFunction.class, name = "encrypt"),
-        @JsonSubTypes.Type(value = JsonGetterFunction.class, name = "jsonGetterFunction")
+        @JsonSubTypes.Type(value = JsonGetterFunction.class, name = "jsonGetterFunction"),
+        @JsonSubTypes.Type(value = CustomFunction.class, name = "customFunction")
 })
 public interface FunctionParam {
 
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/CustomFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/CustomFunction.java
new file mode 100644
index 000000000..93e65c6d1
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/CustomFunction.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import java.util.Collections;
+import java.util.List;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
+
+/**
+ * CustomFunction class uses the content of field as a function
+ */
+@Data
+@JsonTypeName("customFunction")
+@EqualsAndHashCode(callSuper = false)
+public class CustomFunction implements Function {
+
+    private final String content;
+
+    @JsonCreator
+    public CustomFunction(@JsonProperty("content") String content) {
+        this.content = content;
+    }
+
+    @Override
+    public List<FunctionParam> getParams() {
+        return Collections.singletonList(new StringConstantParam(content));
+    }
+
+    /**
+     * Function param name
+     *
+     * @return The name of this function param
+     */
+    @Override
+    public String getName() {
+        throw new UnsupportedOperationException("Custom function is used to pass the function script that the user "
+                + "has organized, using content as the real function content, so there is no specific function name.");
+    }
+
+    /**
+     * Format used for content
+     *
+     * @return The format value in content
+     */
+    @Override
+    public String format() {
+        return content;
+    }
+}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/CustomFunctionSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/CustomFunctionSqlParseTest.java
new file mode 100644
index 000000000..771065890
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/CustomFunctionSqlParseTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.ParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.function.CustomFunction;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link CustomFunction}
+ */
+public class CustomFunctionSqlParseTest extends AbstractTestBase {
+
+    private MySqlExtractNode buildMySQLExtractNode() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()));
+        Map<String, String> map = new HashMap<>();
+        return new MySqlExtractNode("1", "mysql_input", fields,
+                null, map, "id",
+                Collections.singletonList("table_input"), "localhost", "inlong", "inlong",
+                "inlong", null, null,
+                null, null);
+    }
+
+    private Node buildMysqlLoadNode() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo())
+        );
+        List<FieldRelation> relations = buildFieldRelationByCustomFunction();
+        return new MySqlLoadNode("2", "mysql_output", fields, relations, null,
+                null, null, null, "jdbc:mysql://localhost:3306/inlong",
+                "inlong", "inlong", "table_output", "id");
+    }
+
+    /**
+     * Use content of field as a custom function
+     * @return field relation
+     */
+    private List<FieldRelation> buildFieldRelationByCustomFunction() {
+        return Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
+                                new FieldInfo("id", new LongFormatInfo())),
+                        new FieldRelation(new CustomFunction("`name`"),
+                                new FieldInfo("name", new StringFormatInfo())),
+                        new FieldRelation(new CustomFunction("ABS(age)"),
+                                new FieldInfo("age", new IntFormatInfo()))
+                );
+    }
+
+    /**
+     * build node relation
+     *
+     * @param inputs  extract node
+     * @param outputs load node
+     * @return node relation
+     */
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+        List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+        List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+        return new NodeRelation(inputIds, outputIds);
+    }
+
+    /**
+     * Test flink sql task for extract is mysql {@link MySqlExtractNode} and load is mysql {@link MySqlLoadNode}
+     *
+     * @throws Exception The exception may be thrown when executing
+     */
+    @Test
+    public void testCustomFunctionSqlParse() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        env.disableOperatorChaining();
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+        Node inputNode = buildMySQLExtractNode();
+        Node outputNode = buildMysqlLoadNode();
+        StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode),
+                Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(outputNode))));
+        GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+        ParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+    }
+
+}