You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2023/03/24 07:50:07 UTC

[incubator-seatunnel] branch dev updated: [Feature][Transform2] Add UDF SPI and a example implement for SQL Transform plugin (#4392)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new afee5de55 [Feature][Transform2] Add UDF SPI and a example implement for SQL Transform plugin (#4392)
afee5de55 is described below

commit afee5de55dde69d258678f2e9a0e92df66ba4d0a
Author: Marvin <29...@qq.com>
AuthorDate: Fri Mar 24 15:49:59 2023 +0800

    [Feature][Transform2] Add UDF SPI and a example implement for SQL Transform plugin (#4392)
    
    
    ---------
    
    Co-authored-by: mcy <re...@163.com>
---
 docs/en/transform-v2/sql-udf.md                    | 118 +++++++++++++++++++++
 .../test/resources/sql_transform/func_string.conf  |   9 +-
 .../transform/sqlengine/zeta/ZetaSQLEngine.java    |  10 +-
 .../transform/sqlengine/zeta/ZetaSQLFunction.java  |  11 +-
 .../transform/sqlengine/zeta/ZetaSQLType.java      |  22 +++-
 .../transform/sqlengine/zeta/ZetaUDF.java          |  47 ++++++++
 .../sqlengine/zeta/functions/udf/DESUtil.java      |  81 ++++++++++++++
 .../sqlengine/zeta/functions/udf/DesDecrypt.java   |  50 +++++++++
 .../sqlengine/zeta/functions/udf/DesEncrypt.java   |  50 +++++++++
 9 files changed, 393 insertions(+), 5 deletions(-)

diff --git a/docs/en/transform-v2/sql-udf.md b/docs/en/transform-v2/sql-udf.md
new file mode 100644
index 000000000..c0beca721
--- /dev/null
+++ b/docs/en/transform-v2/sql-udf.md
@@ -0,0 +1,118 @@
+# SQL UDF
+
+> UDF of SQL transform plugin
+
+## Description
+
+Use UDF SPI to extends the SQL transform functions lib.
+
+## UDF API
+
+```java
+package org.apache.seatunnel.transform.sqlengine.zeta;
+
+public interface ZetaUDF {
+    /**
+     * Function name
+     *
+     * @return function name
+     */
+    String functionName();
+
+    /**
+     * The type of function result
+     *
+     * @param argsType input arguments type
+     * @return result type
+     */
+    SeaTunnelDataType<?> resultType(List<SeaTunnelDataType<?>> argsType);
+
+    /**
+     * Evaluate
+     *
+     * @param args input arguments
+     * @return result value
+     */
+    Object evaluate(List<Object> args);
+}
+```
+
+## UDF Implements Example
+
+Add the dependency of transform-v2 and provided scope to your maven project:
+
+```xml
+
+<dependency>
+    <groupId>org.apache.seatunnel</groupId>
+    <artifactId>seatunnel-transforms-v2</artifactId>
+    <version>2.3.x</version>
+    <scope>provided</scope>
+</dependency>
+```
+
+Add a Java Class implements of ZetaUDF like this:
+
+```java
+
+@AutoService(ZetaUDF.class)
+public class ExampleUDF implements ZetaUDF {
+    @Override
+    public String functionName() {
+        return "EXAMPLE";
+    }
+
+    @Override
+    public SeaTunnelDataType<?> resultType(List<SeaTunnelDataType<?>> argsType) {
+        return BasicType.STRING_TYPE;
+    }
+
+    @Override
+    public Object evaluate(List<Object> args) {
+        String arg = (String) args.get(0);
+        if (arg == null) return null;
+        return "UDF: " + arg;
+    }
+}
+```
+
+Package the UDF project and copy the jar to the path: ${SEATUNNEL_HOME}/lib
+
+## Example
+
+The data read from source is a table like this:
+
+| id |   name   | age |
+|----|----------|-----|
+| 1  | Joy Ding | 20  |
+| 2  | May Ding | 21  |
+| 3  | Kin Dom  | 24  |
+| 4  | Joy Dom  | 22  |
+
+We use UDF of SQL query to transform the source data like this:
+
+```
+transform {
+  Sql {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    query = "select id, example(name) as name, age from fake"
+  }
+}
+```
+
+Then the data in result table `fake1` will update to
+
+| id |     name      | age |
+|----|---------------|-----|
+| 1  | UDF: Joy Ding | 20  |
+| 2  | UDF: May Ding | 21  |
+| 3  | UDF: Kin Dom  | 24  |
+| 4  | UDF: Joy Dom  | 22  |
+
+## Changelog
+
+### new version
+
+- Add UDF of SQL Transform Connector
+
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_string.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_string.conf
index 4a3cd84da..c752ff37b 100644
--- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_string.conf
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_string.conf
@@ -54,7 +54,7 @@ transform {
   sql {
     source_table_name = "fake"
     result_table_name = "fake1"
-    query = "select ascii(c1) as c1_1, ascii(c2) as c2_1, bit_length(c4) as c4_1, length(c4) as c4_2, octet_length(c4) as c4_3, char(c5) as c5_1, concat(c1,id,'!') as c1_2, hextoraw(c6) as c6_1, rawtohex(c7) as c7_1, insert(name,2,2,'**') as name1, lower(name) as name2, upper(name) as name3, left(name, 3) as name4, right(name, 4) as name5, lpad(name, 10, '*') as name6, rpad(name, 10, '*') as name7, ltrim(c8, '*') as c8_1, rtrim(c8, '*') as c8_2, trim(c8, '*') as c8_3, regexp_replace(c9,  [...]
+    query = "select ascii(c1) as c1_1, ascii(c2) as c2_1, bit_length(c4) as c4_1, length(c4) as c4_2, octet_length(c4) as c4_3, char(c5) as c5_1, concat(c1,id,'!') as c1_2, hextoraw(c6) as c6_1, rawtohex(c7) as c7_1, insert(name,2,2,'**') as name1, lower(name) as name2, upper(name) as name3, left(name, 3) as name4, right(name, 4) as name5, lpad(name, 10, '*') as name6, rpad(name, 10, '*') as name7, ltrim(c8, '*') as c8_1, rtrim(c8, '*') as c8_2, trim(c8, '*') as c8_3, regexp_replace(c9,  [...]
   }
 }
 
@@ -282,6 +282,13 @@ sink {
           field_value = [
             { equals_to = "Joy DING" }
           ]
+        },
+        {
+          field_name = "name15"
+          field_type = "string"
+          field_value = [
+            { equals_to = "Joy Ding" }
+          ]
         }
       ]
     }
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLEngine.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLEngine.java
index 4e6d6eec4..a1a77788c 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLEngine.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLEngine.java
@@ -37,7 +37,9 @@ import net.sf.jsqlparser.statement.select.Select;
 import net.sf.jsqlparser.statement.select.SelectExpressionItem;
 import net.sf.jsqlparser.statement.select.SelectItem;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.ServiceLoader;
 
 public class ZetaSQLEngine implements SQLEngine {
     private String inputTableName;
@@ -56,8 +58,12 @@ public class ZetaSQLEngine implements SQLEngine {
         this.inputTableName = inputTableName;
         this.sql = sql;
 
-        this.zetaSQLType = new ZetaSQLType(inputRowType);
-        this.zetaSQLFunction = new ZetaSQLFunction(inputRowType, zetaSQLType);
+        List<ZetaUDF> udfList = new ArrayList<>();
+        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+        ServiceLoader.load(ZetaUDF.class, classLoader).forEach(udfList::add);
+
+        this.zetaSQLType = new ZetaSQLType(inputRowType, udfList);
+        this.zetaSQLFunction = new ZetaSQLFunction(inputRowType, zetaSQLType, udfList);
         this.zetaSQLFilter = new ZetaSQLFilter(zetaSQLFunction);
 
         parseSQL();
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFunction.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFunction.java
index f73b60a89..c5d34f5ee 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFunction.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFunction.java
@@ -164,9 +164,13 @@ public class ZetaSQLFunction {
     private final SeaTunnelRowType inputRowType;
     private final ZetaSQLType zetaSQLType;
 
-    public ZetaSQLFunction(SeaTunnelRowType inputRowType, ZetaSQLType zetaSQLType) {
+    private final List<ZetaUDF> udfList;
+
+    public ZetaSQLFunction(
+            SeaTunnelRowType inputRowType, ZetaSQLType zetaSQLType, List<ZetaUDF> udfList) {
         this.inputRowType = inputRowType;
         this.zetaSQLType = zetaSQLType;
+        this.udfList = udfList;
     }
 
     public Object computeForValue(Expression expression, Object[] inputFields) {
@@ -403,6 +407,11 @@ public class ZetaSQLFunction {
             case NULLIF:
                 return SystemFunction.nullif(args);
             default:
+                for (ZetaUDF udf : udfList) {
+                    if (udf.functionName().equalsIgnoreCase(functionName)) {
+                        return udf.evaluate(args);
+                    }
+                }
                 throw new TransformException(
                         CommonErrorCode.UNSUPPORTED_OPERATION,
                         String.format("Unsupported function: %s", functionName));
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLType.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLType.java
index 4d58f0670..4d30b02b7 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLType.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLType.java
@@ -38,8 +38,10 @@ import net.sf.jsqlparser.expression.Parenthesis;
 import net.sf.jsqlparser.expression.StringValue;
 import net.sf.jsqlparser.expression.TimeKeyExpression;
 import net.sf.jsqlparser.expression.operators.arithmetic.Concat;
+import net.sf.jsqlparser.expression.operators.relational.ExpressionList;
 import net.sf.jsqlparser.schema.Column;
 
+import java.util.ArrayList;
 import java.util.List;
 
 public class ZetaSQLType {
@@ -60,8 +62,11 @@ public class ZetaSQLType {
 
     private final SeaTunnelRowType inputRowType;
 
-    public ZetaSQLType(SeaTunnelRowType inputRowType) {
+    private final List<ZetaUDF> udfList;
+
+    public ZetaSQLType(SeaTunnelRowType inputRowType, List<ZetaUDF> udfList) {
         this.inputRowType = inputRowType;
+        this.udfList = udfList;
     }
 
     public SeaTunnelDataType<?> getExpressionType(Expression expression) {
@@ -300,6 +305,21 @@ public class ZetaSQLType {
                 // Result has the same type as second argument
                 return getExpressionType(function.getParameters().getExpressions().get(1));
             default:
+                for (ZetaUDF udf : udfList) {
+                    if (udf.functionName().equalsIgnoreCase(function.getName())) {
+                        List<SeaTunnelDataType<?>> argsType = new ArrayList<>();
+                        ExpressionList expressionList = function.getParameters();
+                        if (expressionList != null) {
+                            List<Expression> expressions = expressionList.getExpressions();
+                            if (expressions != null) {
+                                for (Expression expression : expressions) {
+                                    argsType.add(getExpressionType(expression));
+                                }
+                            }
+                        }
+                        return udf.resultType(argsType);
+                    }
+                }
                 throw new TransformException(
                         CommonErrorCode.UNSUPPORTED_OPERATION,
                         String.format("Unsupported function: %s ", function.getName()));
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaUDF.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaUDF.java
new file mode 100644
index 000000000..bc935fa8d
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaUDF.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.sqlengine.zeta;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import java.util.List;
+
+public interface ZetaUDF {
+    /**
+     * Function name
+     *
+     * @return function name
+     */
+    String functionName();
+
+    /**
+     * The type of function result
+     *
+     * @param argsType input arguments type
+     * @return result type
+     */
+    SeaTunnelDataType<?> resultType(List<SeaTunnelDataType<?>> argsType);
+
+    /**
+     * Evaluate
+     *
+     * @param args input arguments
+     * @return result value
+     */
+    Object evaluate(List<Object> args);
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/functions/udf/DESUtil.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/functions/udf/DESUtil.java
new file mode 100644
index 000000000..ed50f7027
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/functions/udf/DESUtil.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.sqlengine.zeta.functions.udf;
+
+import javax.crypto.Cipher;
+import javax.crypto.SecretKeyFactory;
+import javax.crypto.spec.DESKeySpec;
+import javax.crypto.spec.IvParameterSpec;
+
+import java.security.Key;
+import java.util.Base64;
+
+public class DESUtil {
+
+    private static final String IV_PARAMETER = "12345678";
+
+    private static final String ALGORITHM = "DES";
+
+    private static final String CIPHER_ALGORITHM = "DES/CBC/PKCS5Padding";
+
+    private static final String CHARSET = "utf-8";
+
+    private static Key generateKey(String password) throws Exception {
+        DESKeySpec dks = new DESKeySpec(password.getBytes(CHARSET));
+        SecretKeyFactory keyFactory = SecretKeyFactory.getInstance(ALGORITHM);
+        return keyFactory.generateSecret(dks);
+    }
+
+    public static String encrypt(String password, String data) {
+        if (password == null || password.length() < 8) {
+            throw new RuntimeException("Encrypt failed, password length must greater than 8");
+        }
+        if (data == null) return null;
+        try {
+            Key secretKey = generateKey(password);
+            Cipher cipher = Cipher.getInstance(CIPHER_ALGORITHM);
+            IvParameterSpec iv = new IvParameterSpec(IV_PARAMETER.getBytes(CHARSET));
+            cipher.init(Cipher.ENCRYPT_MODE, secretKey, iv);
+            byte[] bytes = cipher.doFinal(data.getBytes(CHARSET));
+
+            return new String(Base64.getEncoder().encode(bytes));
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            return data;
+        }
+    }
+
+    public static String decrypt(String password, String data) {
+        if (password == null || password.length() < 8) {
+            throw new RuntimeException("Encrypt failed, password length must greater than 8");
+        }
+        if (data == null) return null;
+        try {
+            Key secretKey = generateKey(password);
+            Cipher cipher = Cipher.getInstance(CIPHER_ALGORITHM);
+            IvParameterSpec iv = new IvParameterSpec(IV_PARAMETER.getBytes(CHARSET));
+            cipher.init(Cipher.DECRYPT_MODE, secretKey, iv);
+            return new String(
+                    cipher.doFinal(Base64.getDecoder().decode(data.getBytes(CHARSET))), CHARSET);
+        } catch (Exception e) {
+            e.printStackTrace();
+            return data;
+        }
+    }
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/functions/udf/DesDecrypt.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/functions/udf/DesDecrypt.java
new file mode 100644
index 000000000..84df51fee
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/functions/udf/DesDecrypt.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.sqlengine.zeta.functions.udf;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.transform.sqlengine.zeta.ZetaUDF;
+
+import com.google.auto.service.AutoService;
+
+import java.util.List;
+
+@AutoService(ZetaUDF.class)
+public class DesDecrypt implements ZetaUDF {
+
+    @Override
+    public String functionName() {
+        return "DES_DECRYPT";
+    }
+
+    @Override
+    public SeaTunnelDataType<?> resultType(List<SeaTunnelDataType<?>> argsType) {
+        return BasicType.STRING_TYPE;
+    }
+
+    @Override
+    public Object evaluate(List<Object> args) {
+        String password = (String) args.get(0);
+        String data = (String) args.get(1);
+        if (password == null || data == null) {
+            return null;
+        }
+        return DESUtil.decrypt(password, data);
+    }
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/functions/udf/DesEncrypt.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/functions/udf/DesEncrypt.java
new file mode 100644
index 000000000..05c133297
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/functions/udf/DesEncrypt.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.sqlengine.zeta.functions.udf;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.transform.sqlengine.zeta.ZetaUDF;
+
+import com.google.auto.service.AutoService;
+
+import java.util.List;
+
+@AutoService(ZetaUDF.class)
+public class DesEncrypt implements ZetaUDF {
+
+    @Override
+    public String functionName() {
+        return "DES_ENCRYPT";
+    }
+
+    @Override
+    public SeaTunnelDataType<?> resultType(List<SeaTunnelDataType<?>> argsType) {
+        return BasicType.STRING_TYPE;
+    }
+
+    @Override
+    public Object evaluate(List<Object> args) {
+        String password = (String) args.get(0);
+        String data = (String) args.get(1);
+        if (password == null || data == null) {
+            return null;
+        }
+        return DESUtil.encrypt(password, data);
+    }
+}