You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/06/16 08:52:04 UTC

[flink] 01/02: [FLINK-18065][docs] Document FLIP-65 table and scalar functions

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c534c3e4d63fb4ad454f04a95225d2614aaaf8a6
Author: Timo Walther <tw...@apache.org>
AuthorDate: Fri Jun 12 16:06:02 2020 +0200

    [FLINK-18065][docs] Document FLIP-65 table and scalar functions
    
    This closes #12660.
---
 docs/dev/table/functions/udfs.md | 1021 +++++++++++++++++++++++++++-----------
 1 file changed, 730 insertions(+), 291 deletions(-)

diff --git a/docs/dev/table/functions/udfs.md b/docs/dev/table/functions/udfs.md
index c53a258..7839726 100644
--- a/docs/dev/table/functions/udfs.md
+++ b/docs/dev/table/functions/udfs.md
@@ -22,322 +22,862 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-User-defined functions are an important feature, because they significantly extend the expressiveness of queries.
+User-defined functions (UDFs) are extension points to call frequently used logic or custom logic that cannot be expressed otherwise in queries.
+
+User-defined functions can be implemented in a JVM language (such as Java or Scala) or Python. An implementer can use arbitrary third party libraries within a UDF. This page will focus on JVM-based languages.
 
 * This will be replaced by the TOC
 {:toc}
 
-Register User-Defined Functions
--------------------------------
-In most cases, a user-defined function must be registered before it can be used in an query. It is not necessary to register functions for the Scala Table API. 
-
-Functions are registered at the `TableEnvironment` by calling a `registerFunction()` method. When a user-defined function is registered, it is inserted into the function catalog of the `TableEnvironment` such that the Table API or SQL parser can recognize and properly translate it. 
+Overview
+--------
 
-Please find detailed examples of how to register and how to call each type of user-defined function 
-(`ScalarFunction`, `TableFunction`, and `AggregateFunction`) in the following sub-sessions.
+Currently, Flink distinguishes between the following kinds of functions:
 
+- *Scalar functions* map scalar values to a new scalar value.
+- *Table functions* map scalar values to new rows.
+- *Aggregate functions* map scalar values of multiple rows to a new scalar value.
+- *Table aggregate functions* map scalar values of multiple rows to new rows.
+- *Async table functions* are special functions for table sources that perform a lookup.
 
-{% top %}
+<span class="label label-danger">Attention</span> Scalar and table functions have been updated to the new type system based on [data types]({% link dev/table/types.md %}). Aggregating functions still use the old type system based on `TypeInformation`.
 
-Scalar Functions
-----------------
+The following example shows how to create a simple scalar function and how to call the function in both Table API and SQL.
 
-If a required scalar function is not contained in the built-in functions, it is possible to define custom, user-defined scalar functions for both the Table API and SQL. A user-defined scalar functions maps zero, one, or multiple scalar values to a new scalar value.
+For SQL queries, a function must always be registered under a name. For Table API, a function can be registered or directly used _inline_.
 
 <div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-In order to define a scalar function, one has to extend the base class `ScalarFunction` in `org.apache.flink.table.functions` and implement (one or more) evaluation methods. The behavior of a scalar function is determined by the evaluation method. An evaluation method must be declared publicly and named `eval`. The parameter types and return type of the evaluation method also determine the parameter and return types of the scalar function. Evaluation methods can also be overloaded by imp [...]
-
-The following example shows how to define your own hash code function, register it in the TableEnvironment, and call it in a query. Note that you can configure your scalar function via a constructor before it is registered:
 
+<div data-lang="Java" markdown="1">
 {% highlight java %}
-public class HashCode extends ScalarFunction {
-  private int factor = 12;
-  
-  public HashCode(int factor) {
-      this.factor = factor;
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.ScalarFunction;
+import static org.apache.flink.table.api.Expressions.*;
+
+// define function logic
+public static class SubstringFunction extends ScalarFunction {
+  public String eval(String s, Integer begin, Integer end) {
+    return s.substring(begin, end);
   }
-  
-  public int eval(String s) {
-      return s.hashCode() * factor;
+}
+
+TableEnvironment env = TableEnvironment.create(...);
+
+// call function "inline" without registration in Table API
+env.from("MyTable").select(call(SubstringFunction.class, $("myField"), 5, 12));
+
+// register function
+env.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);
+
+// call registered function in Table API
+env.from("MyTable").select(call("SubstringFunction", $("myField"), 5, 12));
+
+// call registered function in SQL
+env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable");
+
+{% endhighlight %}
+</div>
+
+<div data-lang="Scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.table.api._
+import org.apache.flink.table.functions.ScalarFunction
+
+// define function logic
+class SubstringFunction extends ScalarFunction {
+  def eval(s: String, begin: Integer, end: Integer): String = {
+    s.substring(begin, end)
   }
 }
 
-BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
+val env = TableEnvironment.create(...)
 
-// register the function
-tableEnv.registerFunction("hashCode", new HashCode(10));
+// call function "inline" without registration in Table API
+env.from("MyTable").select(call(classOf[SubstringFunction], $"myField", 5, 12))
+
+// register function
+env.createTemporarySystemFunction("SubstringFunction", classOf[SubstringFunction])
 
-// use the function in Java Table API
-myTable.select("string, string.hashCode(), hashCode(string)");
+// call registered function in Table API
+env.from("MyTable").select(call("SubstringFunction", $"myField", 5, 12))
+
+// call registered function in SQL
+env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable")
 
-// use the function in SQL API
-tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable");
 {% endhighlight %}
+</div>
 
-By default the result type of an evaluation method is determined by Flink's type extraction facilities. This is sufficient for basic types or simple POJOs but might be wrong for more complex, custom, or composite types. In these cases `TypeInformation` of the result type can be manually defined by overriding `ScalarFunction#getResultType()`.
+</div>
 
-The following example shows an advanced example which takes the internal timestamp representation and also returns the internal timestamp representation as a long value. By overriding `ScalarFunction#getResultType()` we define that the returned long value should be interpreted as a `Types.TIMESTAMP` by the code generation.
+For interactive sessions, it is also possible to parameterize functions before using or
+registering them. In this case, function _instances_ instead of function _classes_ can be
+used as temporary functions.
 
+It requires that the parameters are serializable for shipping
+function instances to the cluster.
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="Java" markdown="1">
 {% highlight java %}
-public static class TimestampModifier extends ScalarFunction {
-  public long eval(long t) {
-    return t % 1000;
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.ScalarFunction;
+import static org.apache.flink.table.api.Expressions.*;
+
+// define parameterizable function logic
+public static class SubstringFunction extends ScalarFunction {
+
+  private boolean endInclusive;
+
+  public SubstringFunction(boolean endInclusive) {
+    this.endInclusive = endInclusive;
   }
 
-  public TypeInformation<?> getResultType(Class<?>[] signature) {
-    return Types.SQL_TIMESTAMP;
+  public String eval(String s, Integer begin, Integer end) {
+    return s.substring(a, endInclusive ? end + 1 : end);
   }
 }
-{% endhighlight %}
-</div>
 
-<div data-lang="scala" markdown="1">
-In order to define a scalar function, one has to extend the base class `ScalarFunction` in `org.apache.flink.table.functions` and implement (one or more) evaluation methods. The behavior of a scalar function is determined by the evaluation method. An evaluation method must be declared publicly and named `eval`. The parameter types and return type of the evaluation method also determine the parameter and return types of the scalar function. Evaluation methods can also be overloaded by imp [...]
+TableEnvironment env = TableEnvironment.create(...);
+
+// call function "inline" without registration in Table API
+env.from("MyTable").select(call(new SubstringFunction(true), $("myField"), 5, 12));
 
-The following example shows how to define your own hash code function, register it in the TableEnvironment, and call it in a query. Note that you can configure your scalar function via a constructor before it is registered:
+// register function
+env.createTemporarySystemFunction("SubstringFunction", new SubstringFunction(true));
 
+{% endhighlight %}
+</div>
+
+<div data-lang="Scala" markdown="1">
 {% highlight scala %}
-// must be defined in static/object context
-class HashCode(factor: Int) extends ScalarFunction {
-  def eval(s: String): Int = {
-    s.hashCode() * factor
+import org.apache.flink.table.api._
+import org.apache.flink.table.functions.ScalarFunction
+
+// define parameterizable function logic
+class SubstringFunction(val endInclusive) extends ScalarFunction {
+  def eval(s: String, begin: Integer, end: Integer): String = {
+    s.substring(endInclusive ? end + 1 : end)
   }
 }
 
-val tableEnv = BatchTableEnvironment.create(env)
+val env = TableEnvironment.create(...)
 
-// use the function in Scala Table API
-val hashCode = new HashCode(10)
-myTable.select('string, hashCode('string))
+// call function "inline" without registration in Table API
+env.from("MyTable").select(call(new SubstringFunction(true), $"myField", 5, 12))
+
+// register function
+env.createTemporarySystemFunction("SubstringFunction", new SubstringFunction(true))
 
-// register and use the function in SQL
-tableEnv.registerFunction("hashCode", new HashCode(10))
-tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable")
 {% endhighlight %}
+</div>
+
+</div>
+
+{% top %}
+
+Implementation Guide
+--------------------
+
+<span class="label label-danger">Attention</span> This section only applies to scalar and table functions for now; until aggregate functions have been updated to the new type system.
+
+Independent of the kind of function, all user-defined functions follow some basic implementation principles.
+
+### Function Class
 
-By default the result type of an evaluation method is determined by Flink's type extraction facilities. This is sufficient for basic types or simple POJOs but might be wrong for more complex, custom, or composite types. In these cases `TypeInformation` of the result type can be manually defined by overriding `ScalarFunction#getResultType()`.
+An implementation class must extend from one of the available base classes (e.g. `org.apache.flink.table.functions.ScalarFunction`).
 
-The following example shows an advanced example which takes the internal timestamp representation and also returns the internal timestamp representation as a long value. By overriding `ScalarFunction#getResultType()` we define that the returned long value should be interpreted as a `Types.TIMESTAMP` by the code generation.
+The class must be declared `public`, not `abstract`, and should be globally accessible. Thus, non-static inner or anonymous classes are not allowed.
 
+For storing a user-defined function in a persistent catalog, the class must have a default constructor and must be instantiable during runtime.
+
+### Evaluation Methods
+
+The base class provides a set of methods that can be overridden such as `open()`, `close()`, or `isDeterministic()`.
+
+However, in addition to those declared methods, the main runtime logic that is applied to every incoming record must be implemented through specialized _evaluation methods_.
+
+Depending on the function kind, evaluation methods such as `eval()`, `accumulate()`, or `retract()` are called by code-generated operators during runtime.
+
+The methods must be declared `public` and take a well-defined set of arguments.
+
+Regular JVM method calling semantics apply. Therefore, it is possible to:
+- implement overloaded methods such as `eval(Integer)` and `eval(LocalDateTime)`,
+- use var-args such as `eval(Integer...)`,
+- use object inheritance such as `eval(Object)` that takes both `LocalDateTime` and `Integer`,
+- and combinations of the above such as `eval(Object...)` that takes all kinds of arguments.
+
+The following snippets shows an example of an overloaded function:
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="Java" markdown="1">
+{% highlight java %}
+import org.apache.flink.table.functions.ScalarFunction;
+
+// function with overloaded evaluation methods
+public static class SumFunction extends ScalarFunction {
+
+  public Integer eval(Integer a, Integer b) {
+    return a + b;
+  }
+
+  public Integer eval(String a, String b) {
+    return Integer.valueOf(a) + Integer.valueOf();
+  }
+
+  public Integer eval(Double... d) {
+    double result = 0;
+    for (double value : d)
+      result += value;
+    return (int) result;
+  }
+}
+
+{% endhighlight %}
+</div>
+
+<div data-lang="Scala" markdown="1">
 {% highlight scala %}
-object TimestampModifier extends ScalarFunction {
-  def eval(t: Long): Long = {
-    t % 1000
+import org.apache.flink.table.functions.ScalarFunction
+import scala.annotation.varargs
+
+// function with overloaded evaluation methods
+class SumFunction extends ScalarFunction {
+
+  def eval(a: Integer, b: Integer): Integer = {
+    a + b
+  }
+
+  def eval(a: String, b: String): Integer = {
+    Integer.valueOf(a) + Integer.valueOf(b)
   }
 
-  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
-    Types.TIMESTAMP
+  @varargs // generate var-args like Java
+  def eval(d: Double*): Integer = {
+    d.sum.toInt
   }
 }
+
 {% endhighlight %}
 </div>
 
-<div data-lang="python" markdown="1">
-In order to define a Python scalar function, one can extend the base class `ScalarFunction` in `pyflink.table.udf` and implement an evaluation method. The behavior of a Python scalar function is determined by the evaluation method which is named `eval`.
+</div>
 
-The following example shows how to define your own Python hash code function, register it in the TableEnvironment, and call it in a query. Note that you can configure your scalar function via a constructor before it is registered:
+### Type Inference
 
-{% highlight python %}
-class HashCode(ScalarFunction):
-  def __init__(self):
-    self.factor = 12
+The table ecosystem (similar to the SQL standard) is a strongly typed API. Therefore, both function parameters and return types must be mapped to a [data type]({% link dev/table/types.md %}).
+
+From a logical perspective, the planner needs information about expected types, precision, and scale. From a JVM perspective, the planner needs information about how internal data structures are represented as JVM objects when calling a user-defined function.
 
-  def eval(self, s):
-    return hash(s) * self.factor
+The logic for validating input arguments and deriving data types for both the parameters and the result of a function is summarized under the term _type inference_.
 
-table_env = BatchTableEnvironment.create(env)
+Flink's user-defined functions implement an automatic type inference extraction that derives data types from the function's class and its evaluation methods via reflection. If this implicit reflective extraction approach is not successful, the extraction process can be supported by annotating affected parameters, classes, or methods with `@DataTypeHint` and `@FunctionHint`. More examples on how to annotate functions are shown below.
 
-# register the Python function
-table_env.register_function("hash_code", udf(HashCode(), DataTypes.BIGINT(), DataTypes.BIGINT()))
+If more advanced type inference logic is required, an implementer can explicitly override the `getTypeInference()` method in every user-defined function. However, the annotation approach is recommended because it keeps custom type inference logic close to the affected locations and falls back to the default behavior for the remaining implementation.
 
-# use the function in Python Table API
-my_table.select("string, bigint, string.hash_code(), hash_code(string)")
+#### Automatic Type Inference
+
+The automatic type inference inspects the function's class and evaluation methods to derive data types for the arguments and result of a function. `@DataTypeHint` and `@FunctionHint` annotations support the automatic extraction.
+
+For a full list of classes that can be implicitly mapped to a data type, see the [data type section]({% link dev/table/types.md %}#data-type-annotations).
+
+**`@DataTypeHint`**
+
+In many scenarios, it is required to support the automatic extraction _inline_ for paramaters and return types of a function
+
+The following example shows how to use data type hints. More information can be found in the documentation of the annotation class.
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="Java" markdown="1">
+{% highlight java %}
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.InputGroup;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.types.Row;
+
+// function with overloaded evaluation methods
+public static class OverloadedFunction extends ScalarFunction {
+
+  // no hint required
+  public Long eval(long a, long b) {
+    return a + b;
+  }
+
+  // define the precision and scale of a decimal
+  public @DataTypeHint("DECIMAL(12, 3)") BigDecimal eval(double a, double b) {
+    return BigDecimal.valueOf(a + b);
+  }
+
+  // define a nested data type
+  @DataTypeHint("ROW<s STRING, t TIMESTAMP(3) WITH LOCAL TIME ZONE>")
+  public Row eval(int i) {
+    return Row.of(String.valueOf(i), Instant.ofEpochSecond(i));
+  }
+
+  // allow wildcard input and customly serialized output
+  @DataTypeHint(value = "RAW", bridgedTo = ByteBuffer.class)
+  public ByteBuffer eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
+    return MyUtils.serializeToByteBuffer(o);
+  }
+}
 
-# use the function in SQL API
-table_env.sql_query("SELECT string, bigint, hash_code(bigint) FROM MyTable")
 {% endhighlight %}
+</div>
+
+<div data-lang="Scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.table.annotation.DataTypeHint
+import org.apache.flink.table.annotation.InputGroup
+import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.types.Row
+import scala.annotation.varargs
+
+// function with overloaded evaluation methods
+class OverloadedFunction extends ScalarFunction {
+
+  // no hint required
+  def eval(a: Long, b: Long): Long = {
+    a + b
+  }
+
+  // define the precision and scale of a decimal
+  @DataTypeHint("DECIMAL(12, 3)")
+  def eval(double a, double b): BigDecimal = {
+    java.lang.BigDecimal.valueOf(a + b)
+  }
 
-There are many ways to define a Python scalar function besides extending the base class `ScalarFunction`.
-Please refer to the [Python Scalar Function]({{ site.baseurl }}/dev/table/python/python_udfs.html#scalar-functions) documentation for more details.
+  // define a nested data type
+  @DataTypeHint("ROW<s STRING, t TIMESTAMP(3) WITH LOCAL TIME ZONE>")
+  def eval(Int i): Row = {
+    Row.of(java.lang.String.valueOf(i), java.time.Instant.ofEpochSecond(i))
+  }
+
+  // allow wildcard input and customly serialized output
+  @DataTypeHint(value = "RAW", bridgedTo = classOf[java.nio.ByteBuffer])
+  def eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o): java.nio.ByteBuffer = {
+    MyUtils.serializeToByteBuffer(o)
+  }
+}
+
+{% endhighlight %}
 </div>
+
 </div>
 
-{% top %}
+**`@FunctionHint`**
 
-Table Functions
----------------
+In some scenarios, it is desirable that one evaluation method handles multiple different data types at the same time. Furthermore, in some scenarios, overloaded evaluation methods have a common result type that should be declared only once.
+
+The `@FunctionHint` annotation can provide a mapping from argument data types to a result data type. It enables annotating entire function classes or evaluation methods for input, accumulator, and result data types. One or more annotations can be declared on top of a class or individually for each evaluation method for overloading function signatures. All hint parameters are optional. If a parameter is not defined, the default reflection-based extraction is used. Hint parameters defined  [...]
 
-Similar to a user-defined scalar function, a user-defined table function takes zero, one, or multiple scalar values as input parameters. However in contrast to a scalar function, it can return an arbitrary number of rows as output instead of a single value. The returned rows may consist of one or more columns. 
+The following example shows how to use function hints. More information can be found in the documentation of the annotation class.
 
 <div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-In order to define a table function one has to extend the base class `TableFunction` in `org.apache.flink.table.functions` and implement (one or more) evaluation methods. The behavior of a table function is determined by its evaluation methods. An evaluation method must be declared `public` and named `eval`. The `TableFunction` can be overloaded by implementing multiple methods named `eval`. The parameter types of the evaluation methods determine all valid parameters of the table functio [...]
 
-In the Table API, a table function is used with `.joinLateral` or `.leftOuterJoinLateral`. The `joinLateral` operator (cross) joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator). The `leftOuterJoinLateral` operator joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the o [...]
+<div data-lang="Java" markdown="1">
+{% highlight java %}
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+// function with overloaded evaluation methods
+// but globally defined output type
+@FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
+public static class OverloadedFunction extends TableFunction<Row> {
+
+  public void eval(int a, int b) {
+    collect(Row.of("Sum", a + b));
+  }
 
-The following example shows how to define table-valued function, register it in the TableEnvironment, and call it in a query. Note that you can configure your table function via a constructor before it is registered: 
+  // overloading of arguments is still possible
+  public void eval() {
+    collect(Row.of("Empty args", -1));
+  }
+}
 
-{% highlight java %}
-// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).
-public class Split extends TableFunction<Tuple2<String, Integer>> {
-    private String separator = " ";
-    
-    public Split(String separator) {
-        this.separator = separator;
-    }
-    
-    public void eval(String str) {
-        for (String s : str.split(separator)) {
-            // use collect(...) to emit a row
-            collect(new Tuple2<String, Integer>(s, s.length()));
-        }
+// decouples the type inference from evaluation methods,
+// the type inference is entirely determined by the function hints
+@FunctionHint(
+  input = [@DataTypeHint("INT"), @DataTypeHint("INT")],
+  output = @DataTypeHint("INT")
+)
+@FunctionHint(
+  input = [@DataTypeHint("LONG"), @DataTypeHint("LONG")],
+  output = @DataTypeHint("LONG")
+)
+@FunctionHint(
+  input = [],
+  output = @DataTypeHint("BOOLEAN")
+)
+public static class OverloadedFunction extends TableFunction<Object> {
+
+  // an implementer just needs to make sure that a method exists
+  // that can be called by the JVM
+  public void eval(Object... o) {
+    if (o.length == 0) {
+      collect(false);
     }
+    collect(o[0]);
+  }
 }
 
-BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
-Table myTable = ...         // table schema: [a: String]
+{% endhighlight %}
+</div>
+
+<div data-lang="Scala" markdown="1">
+{% highlight scala %}
+
+import org.apache.flink.table.annotation.DataTypeHint
+import org.apache.flink.table.annotation.FunctionHint
+import org.apache.flink.table.functions.TableFunction
+import org.apache.flink.types.Row
+
+// function with overloaded evaluation methods
+// but globally defined output type
+@FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
+class OverloadedFunction extends TableFunction[Row] {
+
+  def eval(a: Int, b: Int): Unit = {
+    collect(Row.of("Sum", Int.box(a + b)))
+  }
 
-// Register the function.
-tableEnv.registerFunction("split", new Split("#"));
+  // overloading of arguments is still possible
+  def eval(): Unit = {
+    collect(Row.of("Empty args", Int.box(-1)))
+  }
+}
 
-// Use the table function in the Java Table API. "as" specifies the field names of the table.
-myTable.joinLateral("split(a) as (word, length)")
-    .select("a, word, length");
-myTable.leftOuterJoinLateral("split(a) as (word, length)")
-    .select("a, word, length");
+// decouples the type inference from evaluation methods,
+// the type inference is entirely determined by the function hints
+@FunctionHint(
+  input = Array(@DataTypeHint("INT"), @DataTypeHint("INT")),
+  output = @DataTypeHint("INT")
+)
+@FunctionHint(
+  input = Array(@DataTypeHint("LONG"), @DataTypeHint("LONG")),
+  output = @DataTypeHint("LONG")
+)
+@FunctionHint(
+  input = Array(),
+  output = @DataTypeHint("BOOLEAN")
+)
+class OverloadedFunction extends TableFunction[AnyRef] {
+
+  // an implementer just needs to make sure that a method exists
+  // that can be called by the JVM
+  @varargs
+  def eval(o: AnyRef*) = {
+    if (o.length == 0) {
+      collect(Boolean.box(false))
+    }
+    collect(o(0))
+  }
+}
 
-// Use the table function in SQL with LATERAL and TABLE keywords.
-// CROSS JOIN a table function (equivalent to "join" in Table API).
-tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
-// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
-tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
 {% endhighlight %}
 </div>
 
-Please note that POJO types do not have a deterministic field order. Therefore, you cannot rename the fields of POJO returned by a table function using `AS`.
+</div>
 
-By default the result type of a `TableFunction` is determined by Flink’s automatic type extraction facilities. This works well for basic types and simple POJOs but might be wrong for more complex, custom, or composite types. In such a case, the type of the result can be manually specified by overriding `TableFunction#getResultType()` which returns its `TypeInformation`.
+#### Custom Type Inference
 
-The following example shows an example of a `TableFunction` that returns a `Row` type which requires explicit type information. We define that the returned table type should be `RowTypeInfo(String, Integer)` by overriding `TableFunction#getResultType()`.
+For most scenarios, `@DataTypeHint` and `@FunctionHint` should be sufficient to model user-defined functions. However, by overriding the automatic type inference defined in `getTypeInference()`, implementers can create arbitrary functions that behave like built-in system functions.
 
+The following example implemented in Java illustrates the potential of a custom type inference logic. It uses a string literal argument to determine the result type of a function. The function takes two string arguments: the first argument represents the string to be parsed, the second argument represents the target type.
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="Java" markdown="1">
 {% highlight java %}
-public class CustomTypeSplit extends TableFunction<Row> {
-    public void eval(String str) {
-        for (String s : str.split(" ")) {
-            Row row = new Row(2);
-            row.setField(0, s);
-            row.setField(1, s.length());
-            collect(row);
-        }
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.types.Row;
+
+public static class LiteralFunction extends ScalarFunction {
+  public Object eval(String s, String type) {
+    switch (type) {
+      case "INT":
+        return Integer.valueOf(s);
+      case "DOUBLE":
+        return Double.valueOf(s);
+      case "STRING":
+      default:
+        return s;
     }
+  }
+
+  // the automatic, reflection-based type inference is disabled and
+  // replaced by the following logic
+  @Override
+  public TypeInference getTypeInference(DataTypeFactory typeFactory) {
+    return TypeInference.newBuilder()
+      // specify typed arguments
+      // parameters will be casted implicitly to those types if necessary
+      .typedArguments(DataTypes.STRING(), DataTypes.STRING())
+      // specify a strategy for the result data type of the function
+      .outputTypeStrategy(callContext -> {
+        if (!callContext.isArgumentLiteral(1) || callContext.isArgumentNull(1)) {
+          throw callContext.newValidationError("Literal expected for second argument.");
+        }
+        // return a data type based on a literal
+        final String literal = callContext.getArgumentValue(1, String.class).orElse("STRING");
+        switch (literal) {
+          case "INT":
+            return Optional.of(DataTypes.INT().notNull());
+          case "DOUBLE":
+            return Optional.of(DataTypes.DOUBLE().notNull());
+          case "STRING":
+          default:
+            return Optional.of(DataTypes.STRING());
+        }
+      })
+      .build();
+  }
+}
+
+{% endhighlight %}
+</div>
+
+</div>
+
+Runtime Integration
+-------------------
+
+Sometimes it might be necessary for a user-defined function to get global runtime information or do some setup/clean-up work before the actual work. User-defined functions provide `open()` and `close()` methods that can be overridden and provide similar functionality as the methods in `RichFunction` of DataStream API.
+
+The `open()` method is called once before the evaluation method. The `close()` method after the last call to the evaluation method.
+
+The `open()` method provides a `FunctionContext` that contains information about the context in which user-defined functions are executed, such as the metric group, the distributed cache files, or the global job parameters.
+
+The following information can be obtained by calling the corresponding methods of `FunctionContext`:
+
+| Method                                | Description                                            |
+| :------------------------------------ | :----------------------------------------------------- |
+| `getMetricGroup()`                    | Metric group for this parallel subtask.                |
+| `getCachedFile(name)`                 | Local temporary file copy of a distributed cache file. |
+| `getJobParameter(name, defaultValue)` | Global job parameter value associated with given key.  |
+
+The following example snippet shows how to use `FunctionContext` in a scalar function for accessing a global job parameter:
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.ScalarFunction;
+
+public static class HashCodeFunction extends ScalarFunction {
+
+    private int factor = 0;
 
     @Override
-    public TypeInformation<Row> getResultType() {
-        return Types.ROW(Types.STRING(), Types.INT());
+    public void open(FunctionContext context) throws Exception {
+        // access the global "hashcode_factor" parameter
+        // "12" would be the default value if the parameter does not exist
+        factor = Integer.parseInt(context.getJobParameter("hashcode_factor", "12"));
+    }
+
+    public int eval(String s) {
+        return s.hashCode() * factor;
     }
 }
+
+TableEnvironment env = TableEnvironment.create(...);
+
+// add job parameter
+env.getConfig().addJobParameter("hashcode_factor", "31");
+
+// register the function
+env.createTemporarySystemFunction("hashCode", HashCodeFunction.class);
+
+// use the function
+env.sqlQuery("SELECT myField, hashCode(myField) FROM MyTable");
+
 {% endhighlight %}
+</div>
 
 <div data-lang="scala" markdown="1">
-In order to define a table function one has to extend the base class `TableFunction` in `org.apache.flink.table.functions` and implement (one or more) evaluation methods. The behavior of a table function is determined by its evaluation methods. An evaluation method must be declared `public` and named `eval`. The `TableFunction` can be overloaded by implementing multiple methods named `eval`. The parameter types of the evaluation methods determine all valid parameters of the table functio [...]
+{% highlight scala %}
+import org.apache.flink.table.api._
+import org.apache.flink.table.functions.FunctionContext
+import org.apache.flink.table.functions.ScalarFunction
 
-In the Table API, a table function is used with `.joinLateral` or `.leftOuterJoinLateral`. The `joinLateral` operator (cross) joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator). The `leftOuterJoinLateral` operator joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the o [...]
+class HashCodeFunction extends ScalarFunction {
 
-The following example shows how to define table-valued function, register it in the TableEnvironment, and call it in a query. Note that you can configure your table function via a constructor before it is registered: 
+  private var factor: Int = 0
 
-{% highlight scala %}
-// The generic type "(String, Int)" determines the schema of the returned table as (String, Integer).
-class Split(separator: String) extends TableFunction[(String, Int)] {
-  def eval(str: String): Unit = {
-    // use collect(...) to emit a row.
-    str.split(separator).foreach(x => collect((x, x.length)))
+  override def open(context: FunctionContext): Unit = {
+    // access the global "hashcode_factor" parameter
+    // "12" would be the default value if the parameter does not exist
+    factor = context.getJobParameter("hashcode_factor", "12").toInt
+  }
+
+  def eval(s: String): Int = {
+    s.hashCode * factor
   }
 }
 
-val tableEnv = BatchTableEnvironment.create(env)
-val myTable = ...         // table schema: [a: String]
+val env = TableEnvironment.create(...)
+
+// add job parameter
+env.getConfig.addJobParameter("hashcode_factor", "31")
 
-// Use the table function in the Scala Table API (Note: No registration required in Scala Table API).
-val split = new Split("#")
-// "as" specifies the field names of the generated table.
-myTable.joinLateral(split('a) as ('word, 'length)).select('a, 'word, 'length)
-myTable.leftOuterJoinLateral(split('a) as ('word, 'length)).select('a, 'word, 'length)
+// register the function
+env.createTemporarySystemFunction("hashCode", classOf[HashCodeFunction])
 
-// Register the table function to use it in SQL queries.
-tableEnv.registerFunction("split", new Split("#"))
+// use the function
+env.sqlQuery("SELECT myField, hashCode(myField) FROM MyTable")
 
-// Use the table function in SQL with LATERAL and TABLE keywords.
-// CROSS JOIN a table function (equivalent to "join" in Table API)
-tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)")
-// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API)
-tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE")
 {% endhighlight %}
-**IMPORTANT:** Do not implement TableFunction as a Scala object. Scala object is a singleton and will cause concurrency issues.
+</div>
 
-Please note that POJO types do not have a deterministic field order. Therefore, you cannot rename the fields of POJO returned by a table function using `AS`.
+</div>
 
-By default the result type of a `TableFunction` is determined by Flink’s automatic type extraction facilities. This works well for basic types and simple POJOs but might be wrong for more complex, custom, or composite types. In such a case, the type of the result can be manually specified by overriding `TableFunction#getResultType()` which returns its `TypeInformation`.
+{% top %}
 
-The following example shows an example of a `TableFunction` that returns a `Row` type which requires explicit type information. We define that the returned table type should be `RowTypeInfo(String, Integer)` by overriding `TableFunction#getResultType()`.
+Scalar Functions
+----------------
 
-{% highlight scala %}
-class CustomTypeSplit extends TableFunction[Row] {
-  def eval(str: String): Unit = {
-    str.split(" ").foreach({ s =>
-      val row = new Row(2)
-      row.setField(0, s)
-      row.setField(1, s.length)
-      collect(row)
-    })
+A user-defined scalar function maps zero, one, or multiple scalar values to a new scalar value. Any data type listed in the [data types section]({% link dev/table/types.md %}) can be used as a parameter or return type of an evaluation method.
+
+In order to define a scalar function, one has to extend the base class `ScalarFunction` in `org.apache.flink.table.functions` and implement one or more evaluation methods named `eval(...)`.
+
+The following example shows how to define your own hash code function and call it in a query. See the [Implementation Guide](#implementation-guide) for more details.
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="Java" markdown="1">
+{% highlight java %}
+import org.apache.flink.table.annotation.InputGroup;
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.ScalarFunction;
+import static org.apache.flink.table.api.Expressions.*;
+
+public static class HashFunction extends ScalarFunction {
+
+  // take any data type and return INT
+  public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
+    return o.hashCode();
   }
+}
+
+TableEnvironment env = TableEnvironment.create(...);
+
+// call function "inline" without registration in Table API
+env.from("MyTable").select(call(HashFunction.class, $("myField")));
+
+// register function
+env.createTemporarySystemFunction("HashFunction", HashFunction.class);
+
+// call registered function in Table API
+env.from("MyTable").select(call("HashFunction", $("myField")));
 
-  override def getResultType: TypeInformation[Row] = {
-    Types.ROW(Types.STRING, Types.INT)
+// call registered function in SQL
+env.sqlQuery("SELECT HashFunction(myField) FROM MyTable");
+
+{% endhighlight %}
+</div>
+
+<div data-lang="Scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.table.annotation.InputGroup
+import org.apache.flink.table.api._
+import org.apache.flink.table.functions.ScalarFunction
+
+class HashFunction extends ScalarFunction {
+
+  // take any data type and return INT
+  def eval(@DataTypeHint(inputGroup = InputGroup.ANY) o: AnyRef): Int {
+    return o.hashCode();
   }
 }
+
+val env = TableEnvironment.create(...)
+
+// call function "inline" without registration in Table API
+env.from("MyTable").select(call(classOf[HashFunction], $"myField"))
+
+// register function
+env.createTemporarySystemFunction("HashFunction", classOf[HashFunction])
+
+// call registered function in Table API
+env.from("MyTable").select(call("HashFunction", $"myField"))
+
+// call registered function in SQL
+env.sqlQuery("SELECT HashFunction(myField) FROM MyTable")
+
 {% endhighlight %}
+</div>
 
 </div>
 
-<div data-lang="python" markdown="1">
-In order to define a Python table function, one can extend the base class `TableFunction` in `pyflink.table.udtf` and Implement an evaluation method. The behavior of a Python table function is determined by the evaluation method which is named eval.
+If you intend to implement or call functions in Python, please refer to the [Python Scalar Functions]({% link dev/table/python/python_udfs.md %}#scalar-functions) documentation for more details.
 
-In the Python Table API, a Python table function is used with `.join_lateral` or `.left_outer_join_lateral`. The `join_lateral` operator (cross) joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator). The `left_outer_join_lateral` operator joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on t [...]
+{% top %}
 
-The following example shows how to define a Python table function, registered it in the TableEnvironment, and call it in a query. Note that you can configure your table function via a constructor before it is registered:
+Table Functions
+---------------
 
-{% highlight python %}
-class Split(TableFunction):
-    def eval(self, string):
-        for s in string.split(" "):
-            yield s, len(s)
+Similar to a user-defined scalar function, a user-defined table function takes zero, one, or multiple scalar values as input arguments. However, in contrast to a scalar function, it can return an arbitrary number of rows (or structured types) as output instead of a single value. The returned record may consist of one or more fields. If an output record consists of only one field, the structured record can be omitted and a scalar value can be emitted. It will be wrapped into an implicit r [...]
 
-env = StreamExecutionEnvironment.get_execution_environment()
-table_env = StreamTableEnvironment.create(env)
-my_table = ...  # type: Table, table schema: [a: String]
+In order to define a table function, one has to extend the base class `TableFunction` in `org.apache.flink.table.functions` and implement one or more evaluation methods named `eval(...)`. Similar to other functions, input and output data types are automatically extracted using reflection. This includes the generic argument `T` of the class for determining an output data type. In contrast to scalar functions, the evaluation method itself must not have a return type, instead, table functio [...]
 
-# register the Python Table Function
-table_env.register_function("split", udtf(Split(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.INT()]))
+In the Table API, a table function is used with `.joinLateral(...)` or `.leftOuterJoinLateral(...)`. The `joinLateral` operator (cross) joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator). The `leftOuterJoinLateral` operator joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right sid [...]
 
-# use the Python Table Function in Python Table API
-my_table.join_lateral("split(a) as (word, length)")
-my_table.left_outer_join_lateral("split(a) as (word, length)")
+In SQL, use `LATERAL TABLE(<TableFunction>)` with `JOIN` or `LEFT JOIN` with an `ON TRUE` join condition.
 
-# use the Python Table function in SQL API
-table_env.sql_query("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)")
-table_env.sql_query("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE")
+The following example shows how to define your own split function and call it in a query. See the [Implementation Guide](#implementation-guide) for more details.
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="Java" markdown="1">
+{% highlight java %}
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+import static org.apache.flink.table.api.Expressions.*;
+
+@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
+public static class SplitFunction extends TableFunction<Row> {
+
+  public void eval(String str) {
+    for (String s : str.split(" ")) {
+      // use collect(...) to emit a row
+      collect(Row.of(s, s.length()));
+    }
+  }
+}
+
+TableEnvironment env = TableEnvironment.create(...);
+
+// call function "inline" without registration in Table API
+env
+  .from("MyTable")
+  .joinLateral(call(SplitFunction.class, $("myField")))
+  .select($("myField"), $("word"), $("length"));
+env
+  .from("MyTable")
+  .leftOuterJoinLateral(call(SplitFunction.class, $("myField")))
+  .select($("myField"), $("word"), $("length"));
+
+// rename fields of the function in Table API
+env
+  .from("MyTable")
+  .leftOuterJoinLateral(call(SplitFunction.class, $("myField")).as("newWord", "newLength"))
+  .select($("myField"), $("newWord"), $("newLength"));
+
+// register function
+env.createTemporarySystemFunction("SplitFunction", SplitFunction.class);
+
+// call registered function in Table API
+env
+  .from("MyTable")
+  .joinLateral(call("SplitFunction", $("myField")))
+  .select($("myField"), $("word"), $("length"));
+env
+  .from("MyTable")
+  .leftOuterJoinLateral(call("SplitFunction", $("myField")))
+  .select($("myField"), $("word"), $("length"));
+
+// call registered function in SQL
+env.sqlQuery(
+  "SELECT myField, word, length " +
+  "FROM MyTable, LATERAL TABLE(SplitFunction(myField))");
+env.sqlQuery(
+  "SELECT myField, word, length " +
+  "FROM MyTable " +
+  "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE");
+
+// rename fields of the function in SQL
+env.sqlQuery(
+  "SELECT myField, newWord, newLength " +
+  "FROM MyTable " +
+  "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON TRUE");
 
 {% endhighlight %}
+</div>
 
-There are many ways to define a Python table function besides extending the base class `TableFunction`.
-Please refer to the [Python Table Function]({{ site.baseurl }}/dev/table/python/python_udfs.html#table-functions) documentation for more details.
- 
+<div data-lang="Scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.table.annotation.DataTypeHint
+import org.apache.flink.table.annotation.FunctionHint
+import org.apache.flink.table.api._
+import org.apache.flink.table.functions.TableFunction
+import org.apache.flink.types.Row
+
+@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
+class SplitFunction extends TableFunction[Row] {
+
+  def eval(str: String): Unit = {
+    // use collect(...) to emit a row
+    str.split(" ").foreach(s => collect(Row.of(s, s.length)))
+  }
+}
+
+val env = TableEnvironment.create(...)
+
+// call function "inline" without registration in Table API
+env
+  .from("MyTable")
+  .joinLateral(call(classOf[SplitFunction], $"myField")
+  .select($"myField", $"word", $"length")
+env
+  .from("MyTable")
+  .leftOuterJoinLateral(call(classOf[SplitFunction], $"myField"))
+  .select($"myField", $"word", $"length")
+
+// rename fields of the function in Table API
+env
+  .from("MyTable")
+  .leftOuterJoinLateral(call(classOf[SplitFunction], $"myField").as("newWord", "newLength"))
+  .select($"myField", $"newWord", $"newLength")
+
+// register function
+env.createTemporarySystemFunction("SplitFunction", classOf[SplitFunction])
+
+// call registered function in Table API
+env
+  .from("MyTable")
+  .joinLateral(call("SplitFunction", $"myField"))
+  .select($"myField", $"word", $"length")
+env
+  .from("MyTable")
+  .leftOuterJoinLateral(call("SplitFunction", $"myField"))
+  .select($"myField", $"word", $"length")
+
+// call registered function in SQL
+env.sqlQuery(
+  "SELECT myField, word, length " +
+  "FROM MyTable, LATERAL TABLE(SplitFunction(myField))");
+env.sqlQuery(
+  "SELECT myField, word, length " +
+  "FROM MyTable " +
+  "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE")
+
+// rename fields of the function in SQL
+env.sqlQuery(
+  "SELECT myField, newWord, newLength " +
+  "FROM MyTable " +
+  "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON TRUE")
+
+{% endhighlight %}
 </div>
+
 </div>
 
-{% top %}
+If you intend to implement functions in Scala, do not implement a table function as a Scala `object`. Scala `object`s are singletons and will cause concurrency issues.
 
+If you intend to implement or call functions in Python, please refer to the [Python Table Functions]({% link dev/table/python/python_udfs.md %}#table-functions) documentation for more details.
+
+{% top %}
 
 Aggregation Functions
 ---------------------
@@ -816,7 +1356,6 @@ t_env.sql_query("SELECT user, wAvg(points, level) AS avgPoints FROM userScores G
 </div>
 </div>
 
-
 {% top %}
 
 Table Aggregation Functions
@@ -1415,104 +1954,4 @@ tab
 </div>
 </div>
 
-
-{% top %}
-
-Best Practices for Implementing UDFs
-------------------------------------
-
-The Table API and SQL code generation internally tries to work with primitive values as much as possible. A user-defined function can introduce much overhead through object creation, casting, and (un)boxing. Therefore, it is highly recommended to declare parameters and result types as primitive types instead of their boxed classes. `Types.DATE` and `Types.TIME` can also be represented as `int`. `Types.TIMESTAMP` can be represented as `long`. 
-
-We recommended that user-defined functions should be written by Java instead of Scala as Scala types pose a challenge for Flink's type extractor.
-
-{% top %}
-
-Integrating UDFs with the Runtime
----------------------------------
-
-Sometimes it might be necessary for a user-defined function to get global runtime information or do some setup/clean-up work before the actual work. User-defined functions provide `open()` and `close()` methods that can be overridden and provide similar functionality as the methods in `RichFunction` of DataSet or DataStream API.
-
-The `open()` method is called once before the evaluation method. The `close()` method after the last call to the evaluation method.
-
-The `open()` method provides a `FunctionContext` that contains information about the context in which user-defined functions are executed, such as the metric group, the distributed cache files, or the global job parameters.
-
-The following information can be obtained by calling the corresponding methods of `FunctionContext`:
-
-| Method                                | Description                                            |
-| :------------------------------------ | :----------------------------------------------------- |
-| `getMetricGroup()`                    | Metric group for this parallel subtask.                |
-| `getCachedFile(name)`                 | Local temporary file copy of a distributed cache file. |
-| `getJobParameter(name, defaultValue)` | Global job parameter value associated with given key.  |
-
-The following example snippet shows how to use `FunctionContext` in a scalar function for accessing a global job parameter:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-public class HashCode extends ScalarFunction {
-
-    private int factor = 0;
-
-    @Override
-    public void open(FunctionContext context) throws Exception {
-        // access "hashcode_factor" parameter
-        // "12" would be the default value if parameter does not exist
-        factor = Integer.valueOf(context.getJobParameter("hashcode_factor", "12")); 
-    }
-
-    public int eval(String s) {
-        return s.hashCode() * factor;
-    }
-}
-
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
-
-// set job parameter
-Configuration conf = new Configuration();
-conf.setString("hashcode_factor", "31");
-env.getConfig().setGlobalJobParameters(conf);
-
-// register the function
-tableEnv.registerFunction("hashCode", new HashCode());
-
-// use the function in Java Table API
-myTable.select("string, string.hashCode(), hashCode(string)");
-
-// use the function in SQL
-tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-object hashCode extends ScalarFunction {
-
-  var hashcode_factor = 12
-
-  override def open(context: FunctionContext): Unit = {
-    // access "hashcode_factor" parameter
-    // "12" would be the default value if parameter does not exist
-    hashcode_factor = context.getJobParameter("hashcode_factor", "12").toInt
-  }
-
-  def eval(s: String): Int = {
-    s.hashCode() * hashcode_factor
-  }
-}
-
-val tableEnv = BatchTableEnvironment.create(env)
-
-// use the function in Scala Table API
-myTable.select('string, hashCode('string))
-
-// register and use the function in SQL
-tableEnv.registerFunction("hashCode", hashCode)
-tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable")
-{% endhighlight %}
-
-</div>
-</div>
-
 {% top %}
-