You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/07/25 06:09:58 UTC

[flink] branch master updated: [FLINK-13409][python] Supported java UDFs in python API.

This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 366e916  [FLINK-13409][python] Supported java UDFs in python API.
366e916 is described below

commit 366e916e71172b1b73f802b5b4bd19ef252e68ea
Author: Wei Zhong <we...@gmail.com>
AuthorDate: Thu Jul 25 10:03:19 2019 +0800

    [FLINK-13409][python] Supported java UDFs in python API.
    
    This closes #9222
---
 docs/dev/table/tableApi.md                         |  22 +++-
 docs/dev/table/tableApi.zh.md                      |  22 +++-
 docs/dev/table/udfs.md                             | 135 +++++++++++++++++++++
 docs/dev/table/udfs.zh.md                          | 135 +++++++++++++++++++++
 flink-python/pyflink/table/table.py                |  51 ++++++++
 flink-python/pyflink/table/table_environment.py    |  33 +++++
 flink-python/pyflink/table/tests/test_correlate.py |  71 +++++++++++
 .../table/tests/test_environment_completeness.py   |   3 +-
 .../table/tests/test_table_environment_api.py      |  26 ++++
 .../java/org/apache/flink/table/api/Table.java     |   8 +-
 10 files changed, 496 insertions(+), 10 deletions(-)

diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 2a90811..36a9942 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -1336,7 +1336,16 @@ full_outer_result = left.full_outer_join(right, "a = d").select("a, b, e")
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
     	<td>
-        <p>Currently not supported in python API.</p>
+        <p>Joins a table with the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. A row of the left (outer) table is dropped, if its table function call returns an empty result.
+        </p>
+{% highlight python %}
+# register Java User-Defined Table Function
+table_env.register_java_function("split", "com.my.udf.MySplitUDTF")
+
+# join
+orders = table_env.scan("Orders")
+result = orders.join_lateral("split(c).as(s, t, v)").select("a, b, s, t, v")
+{% endhighlight %}
       </td>
     </tr>
     <tr>
@@ -1345,7 +1354,16 @@ full_outer_result = left.full_outer_join(right, "a = d").select("a, b, e")
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
       <td>
-        <p>Currently not supported in python API.</p>
+        <p>Joins a table with the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. If a table function call returns an empty result, the corresponding outer row is preserved and the result padded with null values.</p>
+        <p><b>Note:</b> Currently, the predicate of a table function left outer join can only be empty or literal <code>true</code>.</p>
+{% highlight python %}
+# register Java User-Defined Table Function
+table_env.register_java_function("split", "com.my.udf.MySplitUDTF")
+
+# join
+orders = table_env.scan("Orders")
+result = orders.left_outer_join_lateral("split(c).as(s, t, v)").select("a, b, s, t, v")
+{% endhighlight %}
       </td>
     </tr>
     <tr>
diff --git a/docs/dev/table/tableApi.zh.md b/docs/dev/table/tableApi.zh.md
index 729a531..28802a6 100644
--- a/docs/dev/table/tableApi.zh.md
+++ b/docs/dev/table/tableApi.zh.md
@@ -1335,7 +1335,16 @@ full_outer_result = left.full_outer_join(right, "a = d").select("a, b, e")
         <span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span>
       </td>
     	<td>
-        <p>Python API暂不支持。</p>
+        <p>将一张表与一个表函数的执行结果执行内连接操作。左表的每一行都会进行一次表函数调用,调用将会返回0个,1个或多个结果,再与这些结果执行连接操作。如果一行数据对应的表函数调用返回了一个空的结果集,则这行数据会被丢弃。
+        </p>
+{% highlight python %}
+# register Java User-Defined Table Function
+table_env.register_java_function("split", "com.my.udf.MySplitUDTF")
+
+# join
+orders = table_env.scan("Orders")
+result = orders.join_lateral("split(c).as(s, t, v)").select("a, b, s, t, v")
+{% endhighlight %}
       </td>
     </tr>
     <tr>
@@ -1344,7 +1353,16 @@ full_outer_result = left.full_outer_join(right, "a = d").select("a, b, e")
         <span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span>
       </td>
       <td>
-        <p>Python API暂不支持。</p>
+        <p>将一张表与一个表函数的执行结果执行左连接操作。左表的每一行都会进行一次表函数调用,调用将会返回0个,1个或多个结果,再与这些结果执行连接操作。如果一行数据对应的表函数调用返回了一个空的结果集,这行数据依然会被保留,对应的右表数值用null(python为None)填充。</p>
+        <p><b>注意:</b>目前,表函数的左连接操作的连接条件(join predicate)只能为空或者为"true"常量。</p>
+{% highlight python %}
+# register Java User-Defined Table Function
+table_env.register_java_function("split", "com.my.udf.MySplitUDTF")
+
+# join
+orders = table_env.scan("Orders")
+result = orders.left_outer_join_lateral("split(c).as(s, t, v)").select("a, b, s, t, v")
+{% endhighlight %}
       </td>
     </tr>
     <tr>
diff --git a/docs/dev/table/udfs.md b/docs/dev/table/udfs.md
index 87bc804..dbb9a97 100644
--- a/docs/dev/table/udfs.md
+++ b/docs/dev/table/udfs.md
@@ -96,6 +96,34 @@ tableEnv.registerFunction("hashCode", new HashCode(10))
 tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable")
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+'''
+Java code:
+
+// The java class must have a public no-argument constructor and can be founded in current java classloader.
+public class HashCode extends ScalarFunction {
+  private int factor = 12;
+
+  public int eval(String s) {
+      return s.hashCode() * factor;
+  }
+}
+'''
+
+table_env = BatchTableEnvironment.create(env)
+
+# register the java function
+table_env.register_java_function("hashCode", "my.java.function.HashCode")
+
+# use the function in Python Table API
+my_table.select("string, string.hashCode(), hashCode(string)")
+
+# use the function in SQL API
+table_env.sql_query("SELECT string, hashCode(string) FROM MyTable")
+{% endhighlight %}
+</div>
 </div>
 
 By default the result type of an evaluation method is determined by Flink's type extraction facilities. This is sufficient for basic types or simple POJOs but might be wrong for more complex, custom, or composite types. In these cases `TypeInformation` of the result type can be manually defined by overriding `ScalarFunction#getResultType()`.
@@ -214,6 +242,43 @@ tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(s
 {% endhighlight %}
 **IMPORTANT:** Do not implement TableFunction as a Scala object. Scala object is a singleton and will cause concurrency issues.
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+'''
+Java code:
+
+// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).
+// The java class must have a public no-argument constructor and can be founded in current java classloader.
+public class Split extends TableFunction<Tuple2<String, Integer>> {
+    private String separator = " ";
+    
+    public void eval(String str) {
+        for (String s : str.split(separator)) {
+            // use collect(...) to emit a row
+            collect(new Tuple2<String, Integer>(s, s.length()));
+        }
+    }
+}
+'''
+
+table_env = BatchTableEnvironment.create(env)
+my_table = ...  # type: Table, table schema: [a: String]
+
+# Register the java function.
+table_env.register_java_function("split", "my.java.function.Split")
+
+# Use the table function in the Python Table API. "as" specifies the field names of the table.
+my_table.join_lateral("split(a) as (word, length)").select("a, word, length")
+my_table.left_outer_join_lateral("split(a) as (word, length)").select("a, word, length")
+
+# Use the table function in SQL with LATERAL and TABLE keywords.
+# CROSS JOIN a table function (equivalent to "join" in Table API).
+table_env.sql_query("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)")
+# LEFT JOIN a table function (equivalent to "left_outer_join" in Table API).
+table_env.sql_query("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE")
+{% endhighlight %}
+</div>
 </div>
 
 Please note that POJO types do not have a deterministic field order. Therefore, you cannot rename the fields of POJO returned by a table function using `AS`.
@@ -671,6 +736,76 @@ tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GRO
 
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+'''
+Java code:
+
+/**
+ * Accumulator for WeightedAvg.
+ */
+public static class WeightedAvgAccum {
+    public long sum = 0;
+    public int count = 0;
+}
+
+// The java class must have a public no-argument constructor and can be founded in current java classloader.
+
+/**
+ * Weighted Average user-defined aggregate function.
+ */
+public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {
+
+    @Override
+    public WeightedAvgAccum createAccumulator() {
+        return new WeightedAvgAccum();
+    }
+
+    @Override
+    public Long getValue(WeightedAvgAccum acc) {
+        if (acc.count == 0) {
+            return null;
+        } else {
+            return acc.sum / acc.count;
+        }
+    }
+
+    public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {
+        acc.sum += iValue * iWeight;
+        acc.count += iWeight;
+    }
+
+    public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {
+        acc.sum -= iValue * iWeight;
+        acc.count -= iWeight;
+    }
+    
+    public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
+        Iterator<WeightedAvgAccum> iter = it.iterator();
+        while (iter.hasNext()) {
+            WeightedAvgAccum a = iter.next();
+            acc.count += a.count;
+            acc.sum += a.sum;
+        }
+    }
+    
+    public void resetAccumulator(WeightedAvgAccum acc) {
+        acc.count = 0;
+        acc.sum = 0L;
+    }
+}
+'''
+
+# register function
+t_env = ...  # type: StreamTableEnvironment
+t_env.register_java_function("wAvg", "my.java.function.WeightedAvg")
+
+# use function
+t_env.sql_query("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user")
+
+{% endhighlight %}
+</div>
 </div>
 
 
diff --git a/docs/dev/table/udfs.zh.md b/docs/dev/table/udfs.zh.md
index 724e5bb..42623ce 100644
--- a/docs/dev/table/udfs.zh.md
+++ b/docs/dev/table/udfs.zh.md
@@ -96,6 +96,34 @@ tableEnv.registerFunction("hashCode", new HashCode(10))
 tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable")
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+'''
+Java code:
+
+// The java class must have a public no-argument constructor and can be founded in current java classloader.
+public class HashCode extends ScalarFunction {
+  private int factor = 12;
+
+  public int eval(String s) {
+      return s.hashCode() * factor;
+  }
+}
+'''
+
+table_env = BatchTableEnvironment.create(env)
+
+# register the java function
+table_env.register_java_function("hashCode", "my.java.function.HashCode")
+
+# use the function in Python Table API
+my_table.select("string, string.hashCode(), hashCode(string)")
+
+# use the function in SQL API
+table_env.sql_query("SELECT string, hashCode(string) FROM MyTable")
+{% endhighlight %}
+</div>
 </div>
 
 By default the result type of an evaluation method is determined by Flink's type extraction facilities. This is sufficient for basic types or simple POJOs but might be wrong for more complex, custom, or composite types. In these cases `TypeInformation` of the result type can be manually defined by overriding `ScalarFunction#getResultType()`.
@@ -214,6 +242,43 @@ tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(s
 {% endhighlight %}
 **IMPORTANT:** Do not implement TableFunction as a Scala object. Scala object is a singleton and will cause concurrency issues.
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+'''
+Java code:
+
+// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).
+// The java class must have a public no-argument constructor and can be founded in current java classloader.
+public class Split extends TableFunction<Tuple2<String, Integer>> {
+    private String separator = " ";
+    
+    public void eval(String str) {
+        for (String s : str.split(separator)) {
+            // use collect(...) to emit a row
+            collect(new Tuple2<String, Integer>(s, s.length()));
+        }
+    }
+}
+'''
+
+table_env = BatchTableEnvironment.create(env)
+my_table = ...  # type: Table, table schema: [a: String]
+
+# Register the java function.
+table_env.register_java_function("split", "my.java.function.Split")
+
+# Use the table function in the Python Table API. "as" specifies the field names of the table.
+my_table.join_lateral("split(a) as (word, length)").select("a, word, length")
+my_table.left_outer_join_lateral("split(a) as (word, length)").select("a, word, length")
+
+# Use the table function in SQL with LATERAL and TABLE keywords.
+# CROSS JOIN a table function (equivalent to "join" in Table API).
+table_env.sql_query("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)")
+# LEFT JOIN a table function (equivalent to "left_outer_join" in Table API).
+table_env.sql_query("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE")
+{% endhighlight %}
+</div>
 </div>
 
 Please note that POJO types do not have a deterministic field order. Therefore, you cannot rename the fields of POJO returned by a table function using `AS`.
@@ -671,6 +736,76 @@ tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GRO
 
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+'''
+Java code:
+
+/**
+ * Accumulator for WeightedAvg.
+ */
+public static class WeightedAvgAccum {
+    public long sum = 0;
+    public int count = 0;
+}
+
+// The java class must have a public no-argument constructor and can be founded in current java classloader.
+
+/**
+ * Weighted Average user-defined aggregate function.
+ */
+public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {
+
+    @Override
+    public WeightedAvgAccum createAccumulator() {
+        return new WeightedAvgAccum();
+    }
+
+    @Override
+    public Long getValue(WeightedAvgAccum acc) {
+        if (acc.count == 0) {
+            return null;
+        } else {
+            return acc.sum / acc.count;
+        }
+    }
+
+    public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {
+        acc.sum += iValue * iWeight;
+        acc.count += iWeight;
+    }
+
+    public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {
+        acc.sum -= iValue * iWeight;
+        acc.count -= iWeight;
+    }
+    
+    public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
+        Iterator<WeightedAvgAccum> iter = it.iterator();
+        while (iter.hasNext()) {
+            WeightedAvgAccum a = iter.next();
+            acc.count += a.count;
+            acc.sum += a.sum;
+        }
+    }
+    
+    public void resetAccumulator(WeightedAvgAccum acc) {
+        acc.count = 0;
+        acc.sum = 0L;
+    }
+}
+'''
+
+# register function
+t_env = ...  # type: StreamTableEnvironment
+t_env.register_java_function("wAvg", "my.java.function.WeightedAvg")
+
+# use function
+t_env.sql_query("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user")
+
+{% endhighlight %}
+</div>
 </div>
 
 
diff --git a/flink-python/pyflink/table/table.py b/flink-python/pyflink/table/table.py
index bb336f4..920a372 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -247,6 +247,57 @@ class Table(object):
         """
         return Table(self._j_table.fullOuterJoin(right._j_table, join_predicate))
 
+    def join_lateral(self, table_function_call, join_predicate=None):
+        """
+        Joins this Table with an user-defined TableFunction. This join is similar to a SQL inner
+        join but works with a table function. Each row of the table is joined with the rows
+        produced by the table function.
+
+        Example:
+        ::
+
+            >>> t_env.register_java_function("split", "java.table.function.class.name")
+            >>> tab.join_lateral("split(text, ' ') as (b)", "a = b")
+
+        :param table_function_call: An expression representing a table function call.
+        :type table_function_call: str
+        :param join_predicate: Optional, The join predicate expression string, join ON TRUE if not
+                               exist.
+        :type join_predicate: str
+        :return: The result Table.
+        :rtype: Table
+        """
+        if join_predicate is None:
+            return Table(self._j_table.joinLateral(table_function_call))
+        else:
+            return Table(self._j_table.joinLateral(table_function_call, join_predicate))
+
+    def left_outer_join_lateral(self, table_function_call, join_predicate=None):
+        """
+        Joins this Table with an user-defined TableFunction. This join is similar to
+        a SQL left outer join but works with a table function. Each row of the table is joined
+        with all rows produced by the table function. If the join does not produce any row, the
+        outer row is padded with nulls.
+
+        Example:
+        ::
+
+            >>> t_env.register_java_function("split", "java.table.function.class.name")
+            >>> tab.left_outer_join_lateral("split(text, ' ') as (b)")
+
+        :param table_function_call: An expression representing a table function call.
+        :type table_function_call: str
+        :param join_predicate: Optional, The join predicate expression string, join ON TRUE if not
+                               exist.
+        :type join_predicate: str
+        :return: The result Table.
+        :rtype: Table
+        """
+        if join_predicate is None:
+            return Table(self._j_table.leftOuterJoinLateral(table_function_call))
+        else:
+            return Table(self._j_table.leftOuterJoinLateral(table_function_call, join_predicate))
+
     def minus(self, right):
         """
         Minus of two :class:`Table` with duplicate records removed.
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 071c1f5..2c23deb 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -224,6 +224,16 @@ class TableEnvironment(object):
         j_table_name_array = self._j_tenv.listTables()
         return [item for item in j_table_name_array]
 
+    def list_user_defined_functions(self):
+        """
+        Gets the names of all user defined functions registered in this environment.
+
+        :return: List of the names of all user defined functions registered in this environment.
+        :rtype: list[str]
+        """
+        j_udf_name_array = self._j_tenv.listUserDefinedFunctions()
+        return [item for item in j_udf_name_array]
+
     def explain(self, table=None, extended=False):
         """
         Returns the AST of the specified Table API and SQL queries and the execution plan to compute
@@ -497,6 +507,29 @@ class TableEnvironment(object):
         """
         pass
 
+    def register_java_function(self, name, function_class_name):
+        """
+        Registers a java user defined function under a unique name. Replaces already existing
+        user-defined functions under this name. The acceptable function type contains
+        **ScalarFunction**, **TableFunction** and **AggregateFunction**.
+
+        Example:
+        ::
+
+            >>> table_env.register_java_function("func1", "java.user.defined.function.class.name")
+
+        :param name: The name under which the function is registered.
+        :type name: str
+        :param function_class_name: The java full qualified class name of the function to register.
+                                    The function must have a public no-argument constructor and can
+                                    be founded in current Java classloader.
+        :type function_class_name: str
+        """
+        gateway = get_gateway()
+        java_function = gateway.jvm.Thread.currentThread().getContextClassLoader()\
+            .loadClass(function_class_name).newInstance()
+        self._j_tenv.registerFunction(name, java_function)
+
     def execute(self, job_name):
         """
         Triggers the program execution. The environment will execute all parts of
diff --git a/flink-python/pyflink/table/tests/test_correlate.py b/flink-python/pyflink/table/tests/test_correlate.py
new file mode 100644
index 0000000..bfd70d5
--- /dev/null
+++ b/flink-python/pyflink/table/tests/test_correlate.py
@@ -0,0 +1,71 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase
+
+
+class CorrelateTests(PyFlinkStreamTableTestCase):
+
+    def test_join_lateral(self):
+        t_env = self.t_env
+        t_env.register_java_function("split", "org.apache.flink.table.utils.TableFunc1")
+        source = t_env.from_elements([("1", "1#3#5#7"), ("2", "2#4#6#8")], ["id", "words"])
+
+        result = source.join_lateral("split(words) as (word)")
+
+        query_operation = result._j_table.getQueryOperation()
+        self.assertEqual('INNER', query_operation.getJoinType().toString())
+        self.assertTrue(query_operation.isCorrelated())
+        self.assertEqual('true', query_operation.getCondition().toString())
+
+    def test_join_lateral_with_join_predicate(self):
+        t_env = self.t_env
+        t_env.register_java_function("split", "org.apache.flink.table.utils.TableFunc1")
+        source = t_env.from_elements([("1", "1#3#5#7"), ("2", "2#4#6#8")], ["id", "words"])
+
+        result = source.join_lateral("split(words) as (word)", "id = word")
+
+        query_operation = result._j_table.getQueryOperation()
+        self.assertEqual('INNER', query_operation.getJoinType().toString())
+        self.assertTrue(query_operation.isCorrelated())
+        self.assertEqual('`default_catalog`.`default_database`.`equals`(id, word)',
+                         query_operation.getCondition().toString())
+
+    def test_left_outer_join_lateral(self):
+        t_env = self.t_env
+        t_env.register_java_function("split", "org.apache.flink.table.utils.TableFunc1")
+        source = t_env.from_elements([("1", "1#3#5#7"), ("2", "2#4#6#8")], ["id", "words"])
+
+        result = source.left_outer_join_lateral("split(words) as (word)")
+
+        query_operation = result._j_table.getQueryOperation()
+        self.assertEqual('LEFT_OUTER', query_operation.getJoinType().toString())
+        self.assertTrue(query_operation.isCorrelated())
+        self.assertEqual('true', query_operation.getCondition().toString())
+
+    def test_left_outer_join_lateral_with_join_predicate(self):
+        t_env = self.t_env
+        t_env.register_java_function("split", "org.apache.flink.table.utils.TableFunc1")
+        source = t_env.from_elements([("1", "1#3#5#7"), ("2", "2#4#6#8")], ["id", "words"])
+
+        # only support "true" as the join predicate currently
+        result = source.left_outer_join_lateral("split(words) as (word)", "true")
+
+        query_operation = result._j_table.getQueryOperation()
+        self.assertEqual('LEFT_OUTER', query_operation.getJoinType().toString())
+        self.assertTrue(query_operation.isCorrelated())
+        self.assertEqual('true', query_operation.getCondition().toString())
diff --git a/flink-python/pyflink/table/tests/test_environment_completeness.py b/flink-python/pyflink/table/tests/test_environment_completeness.py
index 1f82445..18e7ed2 100644
--- a/flink-python/pyflink/table/tests/test_environment_completeness.py
+++ b/flink-python/pyflink/table/tests/test_environment_completeness.py
@@ -43,8 +43,7 @@ class EnvironmentAPICompletenessTests(PythonAPICompletenessTestCase, unittest.Te
         # getCompletionHints has been deprecated. It will be removed in the next release.
         # TODO add TableEnvironment#create method with EnvironmentSettings as a parameter
         return {'registerExternalCatalog', 'getRegisteredExternalCatalog', 'registerCatalog',
-                'getCatalog', 'registerFunction', 'listUserDefinedFunctions', 'listTables',
-                'getCompletionHints', 'create'}
+                'getCatalog', 'registerFunction', 'listTables', 'getCompletionHints', 'create'}
 
 
 if __name__ == '__main__':
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 3a2ac8b..3e2dd6f 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -169,6 +169,19 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
         expected = ['1,Hi,Hello', '2,Hello,Hello']
         self.assert_equals(actual, expected)
 
+    def test_register_java_function(self):
+        t_env = self.t_env
+
+        t_env.register_java_function("scalar_func",
+                                     "org.apache.flink.table.expressions.utils.RichFunc0")
+        t_env.register_java_function(
+            "agg_func", "org.apache.flink.table.functions.aggfunctions.ByteMaxAggFunction")
+        t_env.register_java_function("table_func", "org.apache.flink.table.utils.TableFunc1")
+
+        actual = t_env.list_user_defined_functions()
+        expected = ['scalar_func', 'agg_func', 'table_func']
+        self.assert_equals(actual, expected)
+
     def test_create_table_environment(self):
         table_config = TableConfig()
         table_config.set_max_generated_code_length(32000)
@@ -233,6 +246,19 @@ class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase):
         with self.assertRaises(TableException):
             t_env.explain(extended=True)
 
+    def test_register_java_function(self):
+        t_env = self.t_env
+
+        t_env.register_java_function("scalar_func",
+                                     "org.apache.flink.table.expressions.utils.RichFunc0")
+        t_env.register_java_function(
+            "agg_func", "org.apache.flink.table.functions.aggfunctions.ByteMaxAggFunction")
+        t_env.register_java_function("table_func", "org.apache.flink.table.utils.TableFunc1")
+
+        actual = t_env.list_user_defined_functions()
+        expected = ['scalar_func', 'agg_func', 'table_func']
+        self.assert_equals(actual, expected)
+
     def test_create_table_environment(self):
         table_config = TableConfig()
         table_config.set_max_generated_code_length(32000)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index 3d03e3d..0087f94 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -499,8 +499,8 @@ public interface Table {
 
 	/**
 	 * Joins this {@link Table} with an user-defined {@link TableFunction}. This join is similar to
-	 * a SQL inner join with ON TRUE predicate but works with a table function. Each row of the
-	 * table is joined with all rows produced by the table function.
+	 * a SQL inner join but works with a table function. Each row of the table is joined with all
+	 * rows produced by the table function.
 	 *
 	 * <p>Example:
 	 *
@@ -522,8 +522,8 @@ public interface Table {
 
 	/**
 	 * Joins this {@link Table} with an user-defined {@link TableFunction}. This join is similar to
-	 * a SQL inner join with ON TRUE predicate but works with a table function. Each row of the
-	 * table is joined with all rows produced by the table function.
+	 * a SQL inner join but works with a table function. Each row of the table is joined with all
+	 * rows produced by the table function.
 	 *
 	 * <p>Scala Example:
 	 *