You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (JIRA)" <ji...@apache.org> on 2016/08/30 04:52:21 UTC

[jira] [Comment Edited] (FLINK-4469) Add support for user defined table function in Table API & SQL

    [ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15448024#comment-15448024 ] 

Jark Wu edited comment on FLINK-4469 at 8/30/16 4:51 AM:
---------------------------------------------------------

Agree with that. Using a collector will be more user-friendly and can optimize further. Instead of defining a {{Collector}} as parameter, I prefer to define a {{collect(T)}} protected method. So that user do not need to define the `Collector` in {{eval}} method's parameter list, and the parameter list can keep consistent with the calling in SQL. 

Maybe the UDTF signature looks like this: 

{code}
abstract class UDTF[T] {

  var collector: Collector[T] = null
  
  def setCollector(collector: Collector[T]): Unit = {
    this.collector = collector
  }
  
  def collect(ele: T): Unit = {
    collector.collect(ele)
  }
}
{code}

And we can define  specific UDTF like this:  

{code}
public class SplitStringUDTF extends UDTF<Word> {
    public void eval(String str) {
        if (str != null) {
            for (String s : str.split(",")) {
                Word word = new Word(s, s.length());
                collect(word);
            }
        }
    }
}

// calling in SQL as usual
tableEnv.registerFunction("split", new SplitStringUDTF())
tableEnv.sql("SELECT a, b, t.* FROM MyTable CROSS APPLY split(c) AS t(w,l)")
{code}


was (Author: jark):
Agree with that. Using a collector will be more user-friendly and can optimize further. Instead of defining a {Collector} as parameter, I prefer to define a {collect(T)} protected method. So that user do not need to define the `Collector` in {eval} method's parameter list, and the parameter list can keep consistent with the calling in SQL. 

Maybe the UDTF signature looks like this: 

{code}
abstract class UDTF[T] {

  var collector: Collector[T] = null
  
  def setCollector(collector: Collector[T]): Unit = {
    this.collector = collector
  }
  
  def collect(ele: T): Unit = {
    collector.collect(ele)
  }
}
{code}

And we can define  specific UDTF like this:  

{code}
public class SplitStringUDTF extends UDTF<Word> {
    public void eval(String str) {
        if (str != null) {
            for (String s : str.split(",")) {
                Word word = new Word(s, s.length());
                collect(word);
            }
        }
    }
}

// calling in SQL as usual
tableEnv.registerFunction("split", new SplitStringUDTF())
tableEnv.sql("SELECT a, b, t.* FROM MyTable CROSS APPLY split(c) AS t(w,l)")
{code}

> Add support for user defined table function in Table API & SQL
> --------------------------------------------------------------
>
>                 Key: FLINK-4469
>                 URL: https://issues.apache.org/jira/browse/FLINK-4469
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Jark Wu
>            Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row and output a single output row. In contrast, table-generating functions transform a single input row to multiple output rows. It is very useful in some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function. 
> NOTE: 
> 1. the eval method must be public and non-static.
> 2. eval should always return java.lang.Iterable or scala.collection.Iterable with the generic type T.
> 3. the generic type T is the row type returned by table function. Because of Java type erasure, we can’t extract T from the Iterable.
> 4. eval method can be overload. Blink will choose the best match eval method to call according to parameter types and number.
> {code}
> public class Word {
>   public String word;
>   public Integer length;
> }
> public class SplitStringUDTF extends UDTF<Word> {
>     public Iterable<Word> eval(String str) {
>         if (str == null) {
>             return new ArrayList<>();
>         } else {
>             List<Word> list = new ArrayList<>();
>             for (String s : str.split(",")) {
>                 Word word = new Word(s, s.length());
>                 list.add(word);
>             }
>             return list;
>         }
>     }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable CROSS APPLY split(c) AS t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to “w” and “l”
> table.crossApply("split(c)", "w, l")	
>      .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
>      .select("a, b, word, length")
> // in Scala Table API
> val split = new SplitStringUDTF()
> table.crossApply(split('c), 'w, 'l)
>      .select('a, 'b, 'w, 'l)
> // outerApply for outer join to a UDTF
> table.outerApply(split('c))
>      .select('a, 'b, 'word, 'length)
> {code}
> Here we introduce CROSS/OUTER APPLY keywords to join table functions , which is used in SQL Server. We can discuss the API in the comment. 
> Maybe the {{UDTF}} class should be replaced by {{TableFunction}} or something others, because we have introduced {{ScalarFunction}} for custom functions, we need to keep consistent. Although, I prefer {{UDTF}} rather than {{TableFunction}} as the former is more SQL-like and the latter maybe confused with DataStream functions. 
> **This issue is blocked by CALCITE-1309, so we need to wait Calcite fix this and release.**
> See [1] for more information about UDTF design.
> [1] https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)