You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/08/18 06:10:41 UTC
[inlong] branch master updated: [INLONG-5577][Sort][Manager] Support function fieldType (#5579)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 281c6e7c5 [INLONG-5577][Sort][Manager] Support function fieldType (#5579)
281c6e7c5 is described below
commit 281c6e7c57c849715bbe0279ae92e5ae60e71047
Author: emhui <em...@foxmail.com>
AuthorDate: Thu Aug 18 14:10:35 2022 +0800
[INLONG-5577][Sort][Manager] Support function fieldType (#5579)
---
.../inlong/manager/common/enums/FieldType.java | 3 +-
.../manager/pojo/sort/util/FieldRelationUtils.java | 6 +-
.../manager/pojo/sort/util/LoadNodeUtils.java | 4 +
.../protocol/transformation/FunctionParam.java | 4 +-
.../transformation/function/CustomFunction.java | 71 +++++++++++
.../sort/parser/CustomFunctionSqlParseTest.java | 131 +++++++++++++++++++++
6 files changed, 214 insertions(+), 5 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
index 80a6501ed..cd54828b4 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
@@ -48,7 +48,8 @@ public enum FieldType {
LOCAL_ZONE_TIMESTAMP,
ARRAY,
MAP,
- STRUCT;
+ STRUCT,
+ FUNCTION;
public static FieldType forName(String name) {
Preconditions.checkNotNull(name, "FieldType should not be null");
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java
index 2713a7e13..870ad42ee 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java
@@ -199,7 +199,7 @@ public class FieldRelationUtils {
*/
private static List<FieldRelation> createEncryptFieldRelations(List<StreamField> fieldList, String transformName,
EncryptDefinition encryptDefinition, String preNodes, Map<String, StreamField> constantFieldMap) {
- Preconditions.checkNotEmpty(preNodes, "PreNodes of splitter should not be null");
+ Preconditions.checkNotEmpty(preNodes, "PreNodes of encrypt should not be null");
String preNode = preNodes.split(",")[0];
List<EncryptRule> encryptRules = encryptDefinition.getEncryptRules();
Set<String> encryptFields = Sets.newHashSet();
@@ -264,7 +264,7 @@ public class FieldRelationUtils {
/**
* Parse rule of encrypt.
*/
- private static FieldRelation parseEncryptRule(EncryptRule encryptRule, Set<String> replaceFields,
+ private static FieldRelation parseEncryptRule(EncryptRule encryptRule, Set<String> encryptFields,
String transformName, String preNode) {
StreamField sourceField = encryptRule.getSourceField();
final String fieldName = sourceField.getFieldName();
@@ -274,7 +274,7 @@ public class FieldRelationUtils {
fieldInfo.setNodeId(preNode);
FieldInfo targetFieldInfo = new FieldInfo(fieldName, transformName,
FieldInfoUtils.convertFieldFormat(FieldType.STRING.name()));
- replaceFields.add(fieldName);
+ encryptFields.add(fieldName);
EncryptFunction encryptFunction = new EncryptFunction(fieldInfo, new StringConstantParam(key),
new StringConstantParam(encrypt));
return new FieldRelation(encryptFunction, targetFieldInfo);
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
index f88803989..6e96d7bfd 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.FieldType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -75,6 +76,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import org.apache.inlong.sort.protocol.transformation.function.CustomFunction;
/**
* Util for load node info.
@@ -515,6 +517,8 @@ public class LoadNodeUtils {
} else {
inputField = new ConstantParam(constantField.getFieldValue());
}
+ } else if (FieldType.FUNCTION.name().equalsIgnoreCase(field.getSourceFieldType())) {
+ inputField = new CustomFunction(field.getSourceFieldName());
} else {
inputField = new FieldInfo(field.getSourceFieldName(), field.getOriginNodeName(),
FieldInfoUtils.convertFieldFormat(field.getSourceFieldType()));
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
index 8ea1b5261..effd06200 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
@@ -23,6 +23,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.MetaFieldInfo;
import org.apache.inlong.sort.protocol.transformation.function.CascadeFunctionWrapper;
+import org.apache.inlong.sort.protocol.transformation.function.CustomFunction;
import org.apache.inlong.sort.protocol.transformation.function.EncryptFunction;
import org.apache.inlong.sort.protocol.transformation.function.HopEndFunction;
import org.apache.inlong.sort.protocol.transformation.function.HopFunction;
@@ -98,7 +99,8 @@ import org.apache.inlong.sort.protocol.transformation.operator.OrOperator;
@JsonSubTypes.Type(value = RegexpReplaceFirstFunction.class, name = "regexpReplaceFirst"),
@JsonSubTypes.Type(value = CascadeFunctionWrapper.class, name = "cascadeFunctionWrapper"),
@JsonSubTypes.Type(value = EncryptFunction.class, name = "encrypt"),
- @JsonSubTypes.Type(value = JsonGetterFunction.class, name = "jsonGetterFunction")
+ @JsonSubTypes.Type(value = JsonGetterFunction.class, name = "jsonGetterFunction"),
+ @JsonSubTypes.Type(value = CustomFunction.class, name = "customFunction")
})
public interface FunctionParam {
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/CustomFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/CustomFunction.java
new file mode 100644
index 000000000..93e65c6d1
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/CustomFunction.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import java.util.Collections;
+import java.util.List;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
+
+/**
+ * CustomFunction class uses the content of field as a function
+ */
+@Data
+@JsonTypeName("customFunction")
+@EqualsAndHashCode(callSuper = false)
+public class CustomFunction implements Function {
+
+ private final String content;
+
+ @JsonCreator
+ public CustomFunction(@JsonProperty("content") String content) {
+ this.content = content;
+ }
+
+ @Override
+ public List<FunctionParam> getParams() {
+ return Collections.singletonList(new StringConstantParam(content));
+ }
+
+ /**
+ * Function param name
+ *
+ * @return The name of this function param
+ */
+ @Override
+ public String getName() {
+ throw new UnsupportedOperationException("Custom function is used to pass the function script that the user "
+ + "has organized, using content as the real function content, so there is no specific function name.");
+ }
+
+ /**
+ * Format used for content
+ *
+ * @return The format value in content
+ */
+ @Override
+ public String format() {
+ return content;
+ }
+}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/CustomFunctionSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/CustomFunctionSqlParseTest.java
new file mode 100644
index 000000000..771065890
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/CustomFunctionSqlParseTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.ParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.function.CustomFunction;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link CustomFunction}
+ */
+public class CustomFunctionSqlParseTest extends AbstractTestBase {
+
+ private MySqlExtractNode buildMySQLExtractNode() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()));
+ Map<String, String> map = new HashMap<>();
+ return new MySqlExtractNode("1", "mysql_input", fields,
+ null, map, "id",
+ Collections.singletonList("table_input"), "localhost", "inlong", "inlong",
+ "inlong", null, null,
+ null, null);
+ }
+
+ private Node buildMysqlLoadNode() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo())
+ );
+ List<FieldRelation> relations = buildFieldRelationByCustomFunction();
+ return new MySqlLoadNode("2", "mysql_output", fields, relations, null,
+ null, null, null, "jdbc:mysql://localhost:3306/inlong",
+ "inlong", "inlong", "table_output", "id");
+ }
+
+ /**
+ * Use content of field as a custom function
+ * @return field relation
+ */
+ private List<FieldRelation> buildFieldRelationByCustomFunction() {
+ return Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("id", new LongFormatInfo())),
+ new FieldRelation(new CustomFunction("`name`"),
+ new FieldInfo("name", new StringFormatInfo())),
+ new FieldRelation(new CustomFunction("ABS(age)"),
+ new FieldInfo("age", new IntFormatInfo()))
+ );
+ }
+
+ /**
+ * build node relation
+ *
+ * @param inputs extract node
+ * @param outputs load node
+ * @return node relation
+ */
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+ List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+ return new NodeRelation(inputIds, outputIds);
+ }
+
+ /**
+ * Test flink sql task for extract is mysql {@link MySqlExtractNode} and load is mysql {@link MySqlLoadNode}
+ *
+ * @throws Exception The exception may be thrown when executing
+ */
+ @Test
+ public void testCustomFunctionSqlParse() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ env.disableOperatorChaining();
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+ Node inputNode = buildMySQLExtractNode();
+ Node outputNode = buildMysqlLoadNode();
+ StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode),
+ Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+ Collections.singletonList(outputNode))));
+ GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+ ParseResult result = parser.parse();
+ Assert.assertTrue(result.tryExecute());
+ }
+
+}