You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2023/03/24 07:50:07 UTC
[incubator-seatunnel] branch dev updated: [Feature][Transform2] Add UDF SPI and a example implement for SQL Transform plugin (#4392)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new afee5de55 [Feature][Transform2] Add UDF SPI and a example implement for SQL Transform plugin (#4392)
afee5de55 is described below
commit afee5de55dde69d258678f2e9a0e92df66ba4d0a
Author: Marvin <29...@qq.com>
AuthorDate: Fri Mar 24 15:49:59 2023 +0800
[Feature][Transform2] Add UDF SPI and a example implement for SQL Transform plugin (#4392)
---------
Co-authored-by: mcy <re...@163.com>
---
docs/en/transform-v2/sql-udf.md | 118 +++++++++++++++++++++
.../test/resources/sql_transform/func_string.conf | 9 +-
.../transform/sqlengine/zeta/ZetaSQLEngine.java | 10 +-
.../transform/sqlengine/zeta/ZetaSQLFunction.java | 11 +-
.../transform/sqlengine/zeta/ZetaSQLType.java | 22 +++-
.../transform/sqlengine/zeta/ZetaUDF.java | 47 ++++++++
.../sqlengine/zeta/functions/udf/DESUtil.java | 81 ++++++++++++++
.../sqlengine/zeta/functions/udf/DesDecrypt.java | 50 +++++++++
.../sqlengine/zeta/functions/udf/DesEncrypt.java | 50 +++++++++
9 files changed, 393 insertions(+), 5 deletions(-)
diff --git a/docs/en/transform-v2/sql-udf.md b/docs/en/transform-v2/sql-udf.md
new file mode 100644
index 000000000..c0beca721
--- /dev/null
+++ b/docs/en/transform-v2/sql-udf.md
@@ -0,0 +1,118 @@
+# SQL UDF
+
+> UDF of SQL transform plugin
+
+## Description
+
+Use UDF SPI to extends the SQL transform functions lib.
+
+## UDF API
+
+```java
+package org.apache.seatunnel.transform.sqlengine.zeta;
+
+public interface ZetaUDF {
+ /**
+ * Function name
+ *
+ * @return function name
+ */
+ String functionName();
+
+ /**
+ * The type of function result
+ *
+ * @param argsType input arguments type
+ * @return result type
+ */
+ SeaTunnelDataType<?> resultType(List<SeaTunnelDataType<?>> argsType);
+
+ /**
+ * Evaluate
+ *
+ * @param args input arguments
+ * @return result value
+ */
+ Object evaluate(List<Object> args);
+}
+```
+
+## UDF Implements Example
+
+Add the dependency of transform-v2 and provided scope to your maven project:
+
+```xml
+
+<dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-transforms-v2</artifactId>
+ <version>2.3.x</version>
+ <scope>provided</scope>
+</dependency>
+```
+
+Add a Java Class implements of ZetaUDF like this:
+
+```java
+
+@AutoService(ZetaUDF.class)
+public class ExampleUDF implements ZetaUDF {
+ @Override
+ public String functionName() {
+ return "EXAMPLE";
+ }
+
+ @Override
+ public SeaTunnelDataType<?> resultType(List<SeaTunnelDataType<?>> argsType) {
+ return BasicType.STRING_TYPE;
+ }
+
+ @Override
+ public Object evaluate(List<Object> args) {
+ String arg = (String) args.get(0);
+ if (arg == null) return null;
+ return "UDF: " + arg;
+ }
+}
+```
+
+Package the UDF project and copy the jar to the path: ${SEATUNNEL_HOME}/lib
+
+## Example
+
+The data read from source is a table like this:
+
+| id | name | age |
+|----|----------|-----|
+| 1 | Joy Ding | 20 |
+| 2 | May Ding | 21 |
+| 3 | Kin Dom | 24 |
+| 4 | Joy Dom | 22 |
+
+We use UDF of SQL query to transform the source data like this:
+
+```
+transform {
+ Sql {
+ source_table_name = "fake"
+ result_table_name = "fake1"
+ query = "select id, example(name) as name, age from fake"
+ }
+}
+```
+
+Then the data in result table `fake1` will update to
+
+| id | name | age |
+|----|---------------|-----|
+| 1 | UDF: Joy Ding | 20 |
+| 2 | UDF: May Ding | 21 |
+| 3 | UDF: Kin Dom | 24 |
+| 4 | UDF: Joy Dom | 22 |
+
+## Changelog
+
+### new version
+
+- Add UDF of SQL Transform Connector
+
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_string.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_string.conf
index 4a3cd84da..c752ff37b 100644
--- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_string.conf
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_string.conf
@@ -54,7 +54,7 @@ transform {
sql {
source_table_name = "fake"
result_table_name = "fake1"
- query = "select ascii(c1) as c1_1, ascii(c2) as c2_1, bit_length(c4) as c4_1, length(c4) as c4_2, octet_length(c4) as c4_3, char(c5) as c5_1, concat(c1,id,'!') as c1_2, hextoraw(c6) as c6_1, rawtohex(c7) as c7_1, insert(name,2,2,'**') as name1, lower(name) as name2, upper(name) as name3, left(name, 3) as name4, right(name, 4) as name5, lpad(name, 10, '*') as name6, rpad(name, 10, '*') as name7, ltrim(c8, '*') as c8_1, rtrim(c8, '*') as c8_2, trim(c8, '*') as c8_3, regexp_replace(c9, [...]
+ query = "select ascii(c1) as c1_1, ascii(c2) as c2_1, bit_length(c4) as c4_1, length(c4) as c4_2, octet_length(c4) as c4_3, char(c5) as c5_1, concat(c1,id,'!') as c1_2, hextoraw(c6) as c6_1, rawtohex(c7) as c7_1, insert(name,2,2,'**') as name1, lower(name) as name2, upper(name) as name3, left(name, 3) as name4, right(name, 4) as name5, lpad(name, 10, '*') as name6, rpad(name, 10, '*') as name7, ltrim(c8, '*') as c8_1, rtrim(c8, '*') as c8_2, trim(c8, '*') as c8_3, regexp_replace(c9, [...]
}
}
@@ -282,6 +282,13 @@ sink {
field_value = [
{ equals_to = "Joy DING" }
]
+ },
+ {
+ field_name = "name15"
+ field_type = "string"
+ field_value = [
+ { equals_to = "Joy Ding" }
+ ]
}
]
}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLEngine.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLEngine.java
index 4e6d6eec4..a1a77788c 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLEngine.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLEngine.java
@@ -37,7 +37,9 @@ import net.sf.jsqlparser.statement.select.Select;
import net.sf.jsqlparser.statement.select.SelectExpressionItem;
import net.sf.jsqlparser.statement.select.SelectItem;
+import java.util.ArrayList;
import java.util.List;
+import java.util.ServiceLoader;
public class ZetaSQLEngine implements SQLEngine {
private String inputTableName;
@@ -56,8 +58,12 @@ public class ZetaSQLEngine implements SQLEngine {
this.inputTableName = inputTableName;
this.sql = sql;
- this.zetaSQLType = new ZetaSQLType(inputRowType);
- this.zetaSQLFunction = new ZetaSQLFunction(inputRowType, zetaSQLType);
+ List<ZetaUDF> udfList = new ArrayList<>();
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ ServiceLoader.load(ZetaUDF.class, classLoader).forEach(udfList::add);
+
+ this.zetaSQLType = new ZetaSQLType(inputRowType, udfList);
+ this.zetaSQLFunction = new ZetaSQLFunction(inputRowType, zetaSQLType, udfList);
this.zetaSQLFilter = new ZetaSQLFilter(zetaSQLFunction);
parseSQL();
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFunction.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFunction.java
index f73b60a89..c5d34f5ee 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFunction.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFunction.java
@@ -164,9 +164,13 @@ public class ZetaSQLFunction {
private final SeaTunnelRowType inputRowType;
private final ZetaSQLType zetaSQLType;
- public ZetaSQLFunction(SeaTunnelRowType inputRowType, ZetaSQLType zetaSQLType) {
+ private final List<ZetaUDF> udfList;
+
+ public ZetaSQLFunction(
+ SeaTunnelRowType inputRowType, ZetaSQLType zetaSQLType, List<ZetaUDF> udfList) {
this.inputRowType = inputRowType;
this.zetaSQLType = zetaSQLType;
+ this.udfList = udfList;
}
public Object computeForValue(Expression expression, Object[] inputFields) {
@@ -403,6 +407,11 @@ public class ZetaSQLFunction {
case NULLIF:
return SystemFunction.nullif(args);
default:
+ for (ZetaUDF udf : udfList) {
+ if (udf.functionName().equalsIgnoreCase(functionName)) {
+ return udf.evaluate(args);
+ }
+ }
throw new TransformException(
CommonErrorCode.UNSUPPORTED_OPERATION,
String.format("Unsupported function: %s", functionName));
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLType.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLType.java
index 4d58f0670..4d30b02b7 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLType.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLType.java
@@ -38,8 +38,10 @@ import net.sf.jsqlparser.expression.Parenthesis;
import net.sf.jsqlparser.expression.StringValue;
import net.sf.jsqlparser.expression.TimeKeyExpression;
import net.sf.jsqlparser.expression.operators.arithmetic.Concat;
+import net.sf.jsqlparser.expression.operators.relational.ExpressionList;
import net.sf.jsqlparser.schema.Column;
+import java.util.ArrayList;
import java.util.List;
public class ZetaSQLType {
@@ -60,8 +62,11 @@ public class ZetaSQLType {
private final SeaTunnelRowType inputRowType;
- public ZetaSQLType(SeaTunnelRowType inputRowType) {
+ private final List<ZetaUDF> udfList;
+
+ public ZetaSQLType(SeaTunnelRowType inputRowType, List<ZetaUDF> udfList) {
this.inputRowType = inputRowType;
+ this.udfList = udfList;
}
public SeaTunnelDataType<?> getExpressionType(Expression expression) {
@@ -300,6 +305,21 @@ public class ZetaSQLType {
// Result has the same type as second argument
return getExpressionType(function.getParameters().getExpressions().get(1));
default:
+ for (ZetaUDF udf : udfList) {
+ if (udf.functionName().equalsIgnoreCase(function.getName())) {
+ List<SeaTunnelDataType<?>> argsType = new ArrayList<>();
+ ExpressionList expressionList = function.getParameters();
+ if (expressionList != null) {
+ List<Expression> expressions = expressionList.getExpressions();
+ if (expressions != null) {
+ for (Expression expression : expressions) {
+ argsType.add(getExpressionType(expression));
+ }
+ }
+ }
+ return udf.resultType(argsType);
+ }
+ }
throw new TransformException(
CommonErrorCode.UNSUPPORTED_OPERATION,
String.format("Unsupported function: %s ", function.getName()));
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaUDF.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaUDF.java
new file mode 100644
index 000000000..bc935fa8d
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaUDF.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.sqlengine.zeta;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import java.util.List;
+
+public interface ZetaUDF {
+ /**
+ * Function name
+ *
+ * @return function name
+ */
+ String functionName();
+
+ /**
+ * The type of function result
+ *
+ * @param argsType input arguments type
+ * @return result type
+ */
+ SeaTunnelDataType<?> resultType(List<SeaTunnelDataType<?>> argsType);
+
+ /**
+ * Evaluate
+ *
+ * @param args input arguments
+ * @return result value
+ */
+ Object evaluate(List<Object> args);
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/functions/udf/DESUtil.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/functions/udf/DESUtil.java
new file mode 100644
index 000000000..ed50f7027
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/functions/udf/DESUtil.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.sqlengine.zeta.functions.udf;
+
+import javax.crypto.Cipher;
+import javax.crypto.SecretKeyFactory;
+import javax.crypto.spec.DESKeySpec;
+import javax.crypto.spec.IvParameterSpec;
+
+import java.security.Key;
+import java.util.Base64;
+
+public class DESUtil {
+
+ private static final String IV_PARAMETER = "12345678";
+
+ private static final String ALGORITHM = "DES";
+
+ private static final String CIPHER_ALGORITHM = "DES/CBC/PKCS5Padding";
+
+ private static final String CHARSET = "utf-8";
+
+ private static Key generateKey(String password) throws Exception {
+ DESKeySpec dks = new DESKeySpec(password.getBytes(CHARSET));
+ SecretKeyFactory keyFactory = SecretKeyFactory.getInstance(ALGORITHM);
+ return keyFactory.generateSecret(dks);
+ }
+
+ public static String encrypt(String password, String data) {
+ if (password == null || password.length() < 8) {
+ throw new RuntimeException("Encrypt failed, password length must greater than 8");
+ }
+ if (data == null) return null;
+ try {
+ Key secretKey = generateKey(password);
+ Cipher cipher = Cipher.getInstance(CIPHER_ALGORITHM);
+ IvParameterSpec iv = new IvParameterSpec(IV_PARAMETER.getBytes(CHARSET));
+ cipher.init(Cipher.ENCRYPT_MODE, secretKey, iv);
+ byte[] bytes = cipher.doFinal(data.getBytes(CHARSET));
+
+ return new String(Base64.getEncoder().encode(bytes));
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ return data;
+ }
+ }
+
+ public static String decrypt(String password, String data) {
+ if (password == null || password.length() < 8) {
+ throw new RuntimeException("Encrypt failed, password length must greater than 8");
+ }
+ if (data == null) return null;
+ try {
+ Key secretKey = generateKey(password);
+ Cipher cipher = Cipher.getInstance(CIPHER_ALGORITHM);
+ IvParameterSpec iv = new IvParameterSpec(IV_PARAMETER.getBytes(CHARSET));
+ cipher.init(Cipher.DECRYPT_MODE, secretKey, iv);
+ return new String(
+ cipher.doFinal(Base64.getDecoder().decode(data.getBytes(CHARSET))), CHARSET);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return data;
+ }
+ }
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/functions/udf/DesDecrypt.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/functions/udf/DesDecrypt.java
new file mode 100644
index 000000000..84df51fee
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/functions/udf/DesDecrypt.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.sqlengine.zeta.functions.udf;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.transform.sqlengine.zeta.ZetaUDF;
+
+import com.google.auto.service.AutoService;
+
+import java.util.List;
+
+@AutoService(ZetaUDF.class)
+public class DesDecrypt implements ZetaUDF {
+
+ @Override
+ public String functionName() {
+ return "DES_DECRYPT";
+ }
+
+ @Override
+ public SeaTunnelDataType<?> resultType(List<SeaTunnelDataType<?>> argsType) {
+ return BasicType.STRING_TYPE;
+ }
+
+ @Override
+ public Object evaluate(List<Object> args) {
+ String password = (String) args.get(0);
+ String data = (String) args.get(1);
+ if (password == null || data == null) {
+ return null;
+ }
+ return DESUtil.decrypt(password, data);
+ }
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/functions/udf/DesEncrypt.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/functions/udf/DesEncrypt.java
new file mode 100644
index 000000000..05c133297
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/functions/udf/DesEncrypt.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.sqlengine.zeta.functions.udf;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.transform.sqlengine.zeta.ZetaUDF;
+
+import com.google.auto.service.AutoService;
+
+import java.util.List;
+
+@AutoService(ZetaUDF.class)
+public class DesEncrypt implements ZetaUDF {
+
+ @Override
+ public String functionName() {
+ return "DES_ENCRYPT";
+ }
+
+ @Override
+ public SeaTunnelDataType<?> resultType(List<SeaTunnelDataType<?>> argsType) {
+ return BasicType.STRING_TYPE;
+ }
+
+ @Override
+ public Object evaluate(List<Object> args) {
+ String password = (String) args.get(0);
+ String data = (String) args.get(1);
+ if (password == null || data == null) {
+ return null;
+ }
+ return DESUtil.encrypt(password, data);
+ }
+}