You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/06/16 05:34:05 UTC

[zeppelin] branch master updated: [ZEPPELIN-4837]. Add property to only check some packages for udf finding

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 37953f4  [ZEPPELIN-4837]. Add property to only check some packages for udf finding
37953f4 is described below

commit 37953f4f4757831a29436cfe6aa49cf0e449ae98
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Thu Jun 11 13:06:15 2020 +0800

    [ZEPPELIN-4837]. Add property to only check some packages for udf finding
    
    ### What is this PR for?
    
    Add property `flink.udf.jars.packages` to specify the packages that would be searched for udf, otherwise all the classes will be check, which might be very time consuming especially when the udf jars are large (if your udf depends on other third party libraries)
    
    ### What type of PR is it?
    [Feature ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4837
    
    ### How should this be tested?
    * CI pass and manually tested
    
    ### Screenshots (if appropriate)
    ![image](https://user-images.githubusercontent.com/164491/84347276-b3767700-abe4-11ea-9688-f4334dc4d395.png)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3798 from zjffdu/ZEPPELIN-4837 and squashes the following commits:
    
    78816e548 [Jeff Zhang] [ZEPPELIN-4837]. Add property to only check some packages for udf finding
---
 docs/interpreter/flink.md                          |  5 ++++
 .../src/main/resources/interpreter-setting.json    |  7 +++++
 .../zeppelin/flink/FlinkScalaInterpreter.scala     | 35 ++++++++++++----------
 3 files changed, 31 insertions(+), 16 deletions(-)

diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md
index 57bc7f8..23973a8 100644
--- a/docs/interpreter/flink.md
+++ b/docs/interpreter/flink.md
@@ -147,6 +147,11 @@ You can also set other flink properties which are not listed in the table. For a
     <td>Flink udf jars (comma separated), zeppelin will register udf in this jar automatically for user. The udf name is the class name.</td>
   </tr>
   <tr>
+    <td>flink.udf.jars.packages</td>
+    <td></td>
+    <td>Packages (comma separated) that would be searched for the udf defined in `flink.udf.jars`.</td>
+  </tr>
+  <tr>
     <td>flink.execution.jars</td>
     <td></td>
     <td>Additional user jars (comma separated)</td>
diff --git a/flink/interpreter/src/main/resources/interpreter-setting.json b/flink/interpreter/src/main/resources/interpreter-setting.json
index 00625b5..cba12c3 100644
--- a/flink/interpreter/src/main/resources/interpreter-setting.json
+++ b/flink/interpreter/src/main/resources/interpreter-setting.json
@@ -103,6 +103,13 @@
         "description": "Flink udf jars (comma separated), Zeppelin will register udfs in this jar for user automatically",
         "type": "string"
       },
+      "flink.udf.jars.packages": {
+        "envName": null,
+        "propertyName": null,
+        "defaultValue": "",
+        "description": "Packages (comma separated) that would be searched for the udf defined in `flink.udf.jars`",
+        "type": "string"
+      },
       "flink.execution.jars": {
         "envName": null,
         "propertyName": null,
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 3c3162e..aaedc82 100644
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -463,6 +463,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
     val jarFile = new JarFile(jar)
     val entries = jarFile.entries
 
+    val udfPackages = properties.getProperty("flink.udf.jars.packages", "").split(",").toSet
     val urls = Array(new URL("jar:file:" + jar + "!/"))
     val cl = new URLClassLoader(urls)
 
@@ -473,22 +474,24 @@ class FlinkScalaInterpreter(val properties: Properties) {
           // -6 because of .class
           var className = je.getName.substring(0, je.getName.length - 6)
           className = className.replace('/', '.')
-          val c = cl.loadClass(className)
-          val udf = c.newInstance()
-          if (udf.isInstanceOf[ScalarFunction]) {
-            val scalarUDF = udf.asInstanceOf[ScalarFunction]
-            btenv.registerFunction(c.getSimpleName, scalarUDF)
-          } else if (udf.isInstanceOf[TableFunction[_]]) {
-            val tableUDF = udf.asInstanceOf[TableFunction[_]]
-            flinkShims.registerTableFunction(btenv, c.getSimpleName, tableUDF)
-          } else if (udf.isInstanceOf[AggregateFunction[_,_]]) {
-            val aggregateUDF = udf.asInstanceOf[AggregateFunction[_,_]]
-            flinkShims.registerAggregateFunction(btenv, c.getSimpleName, aggregateUDF)
-          } else if (udf.isInstanceOf[TableAggregateFunction[_,_]]) {
-            val tableAggregateUDF = udf.asInstanceOf[TableAggregateFunction[_,_]]
-            flinkShims.registerTableAggregateFunction(btenv, c.getSimpleName, tableAggregateUDF)
-          } else {
-            LOGGER.warn("No UDF definition found in class file: " + je.getName)
+          if (udfPackages.isEmpty || udfPackages.exists( p => className.startsWith(p))) {
+            val c = cl.loadClass(className)
+            val udf = c.newInstance()
+            if (udf.isInstanceOf[ScalarFunction]) {
+              val scalarUDF = udf.asInstanceOf[ScalarFunction]
+              btenv.registerFunction(c.getSimpleName, scalarUDF)
+            } else if (udf.isInstanceOf[TableFunction[_]]) {
+              val tableUDF = udf.asInstanceOf[TableFunction[_]]
+              flinkShims.registerTableFunction(btenv, c.getSimpleName, tableUDF)
+            } else if (udf.isInstanceOf[AggregateFunction[_, _]]) {
+              val aggregateUDF = udf.asInstanceOf[AggregateFunction[_, _]]
+              flinkShims.registerAggregateFunction(btenv, c.getSimpleName, aggregateUDF)
+            } else if (udf.isInstanceOf[TableAggregateFunction[_, _]]) {
+              val tableAggregateUDF = udf.asInstanceOf[TableAggregateFunction[_, _]]
+              flinkShims.registerTableAggregateFunction(btenv, c.getSimpleName, tableAggregateUDF)
+            } else {
+              LOGGER.warn("No UDF definition found in class file: " + je.getName)
+            }
           }
         } catch {
           case e : Exception =>