You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 20:04:57 UTC

flink git commit: [FLINK-6334] [table] Refactor Table API TableFunction join methods.

Repository: flink
Updated Branches:
  refs/heads/master 8ed85fe49 -> c969237fc


[FLINK-6334] [table] Refactor Table API TableFunction join methods.

This closes #3791.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c969237f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c969237f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c969237f

Branch: refs/heads/master
Commit: c969237fce4fe5394e1cfdbb1186db63333d73d0
Parents: 8ed85fe
Author: Xpray <le...@gmail.com>
Authored: Wed May 3 16:52:43 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed May 3 21:51:21 2017 +0200

----------------------------------------------------------------------
 .../flink/table/api/TableEnvironment.scala      |   2 +-
 .../api/scala/TableFunctionConversions.scala    |  56 ++++
 .../apache/flink/table/api/scala/package.scala  |   4 +
 .../org/apache/flink/table/api/table.scala      | 288 +++++++++----------
 .../flink/table/functions/TableFunction.scala   |  16 --
 .../utils/UserDefinedFunctionUtils.scala        |  39 ++-
 .../flink/table/plan/logical/operators.scala    |  22 +-
 .../api/scala/batch/table/JoinITCase.scala      |  25 +-
 .../table/UserDefinedTableFunctionTest.scala    |  18 +-
 .../table/UserDefinedTableFunctionTest.scala    | 126 +++++++-
 10 files changed, 404 insertions(+), 192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c969237f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index bd974b0..45267d2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -79,7 +79,7 @@ abstract class TableEnvironment(val config: TableConfig) {
   private val rootSchema: SchemaPlus = internalSchema.plus()
 
   // Table API/SQL function catalog
-  private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns
+  private[flink] val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns
 
   // the configuration to create a Calcite planner
   private lazy val frameworkConfig: FrameworkConfig = Frameworks

http://git-wip-us.apache.org/repos/asf/flink/blob/c969237f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala
new file mode 100644
index 0000000..692876f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.Table
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.functions.TableFunction
+import org.apache.flink.table.plan.logical.LogicalTableFunctionCall
+
+/**
+  * Holds methods to convert a [[TableFunction]] call in the Scala Table API into a [[Table]].
+  *
+  * @param tf The TableFunction to convert.
+  */
+class TableFunctionConversions[T](tf: TableFunction[T]) {
+
+  /**
+    * Creates a [[Table]] from a [[TableFunction]] in Scala Table API.
+    *
+    * @param args The arguments of the table function call.
+    * @return A [[Table]] with which represents the [[LogicalTableFunctionCall]].
+    */
+  final def apply(args: Expression*)(implicit typeInfo: TypeInformation[T]): Table = {
+
+    val resultType = if (tf.getResultType == null) typeInfo else tf.getResultType
+
+    new Table(
+      tableEnv = null, // Table environment will be set later.
+      LogicalTableFunctionCall(
+        tf.getClass.getCanonicalName,
+        tf,
+        args.toList,
+        resultType,
+        Array.empty,
+        child = null // Child will be set later.
+      )
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c969237f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
index 5b431ec..e8a2017 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
@@ -24,6 +24,7 @@ import org.apache.flink.api.scala.DataSet
 import org.apache.flink.streaming.api.scala.DataStream
 import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTableEnv}
 import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv}
+import org.apache.flink.table.functions.TableFunction
 
 import _root_.scala.language.implicitConversions
 
@@ -89,4 +90,7 @@ package object scala extends ImplicitExpressionConversions {
     tableEnv.toDataStream[Row](table)
   }
 
+  implicit def tableFunctionCall2Table[T](tf: TableFunction[T]): TableFunctionConversions[T] = {
+    new TableFunctionConversions[T](tf)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c969237f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 0953611..9606979 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -20,9 +20,10 @@ package org.apache.flink.table.api
 import org.apache.calcite.rel.RelNode
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.operators.join.JoinType
-import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionParser, Ordering, UnresolvedAlias, UnresolvedFieldReference}
 import org.apache.flink.table.plan.logical.Minus
-import org.apache.flink.table.expressions.{Alias, Asc, Call, Expression, ExpressionParser, Ordering, TableFunctionCall, UnresolvedAlias}
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
 import org.apache.flink.table.plan.ProjectionTranslator._
 import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.sinks.TableSink
@@ -64,13 +65,34 @@ class Table(
     private[flink] val tableEnv: TableEnvironment,
     private[flink] val logicalPlan: LogicalNode) {
 
-  private val tableSchema: TableSchema = new TableSchema(
+  // Check if the plan has an unbounded TableFunctionCall as child node.
+  //   A TableFunctionCall is tolerated as root node because the Table holds the initial call.
+  if (containsUnboudedUDTFCall(logicalPlan) &&
+    !logicalPlan.isInstanceOf[LogicalTableFunctionCall]) {
+    throw new ValidationException("TableFunction can only be used in join and leftOuterJoin.")
+  }
+
+  /**
+    * Creates a [[Table]] for a TableFunction call from a String expression.
+    *
+    * @param tableEnv The TableEnvironment in which the call is created.
+    * @param udtfCall A String expression of the TableFunction call.
+    */
+  def this(tableEnv: TableEnvironment, udtfCall: String) {
+    this(tableEnv, UserDefinedFunctionUtils.createLogicalFunctionCall(tableEnv, udtfCall))
+  }
+
+  private lazy val tableSchema: TableSchema = new TableSchema(
     logicalPlan.output.map(_.name).toArray,
     logicalPlan.output.map(_.resultType).toArray)
 
-  def relBuilder = tableEnv.getRelBuilder
+  def relBuilder: FlinkRelBuilder = tableEnv.getRelBuilder
 
-  def getRelNode: RelNode = logicalPlan.toRelNode(relBuilder)
+  def getRelNode: RelNode = if (containsUnboudedUDTFCall(logicalPlan)) {
+    throw new ValidationException("Cannot translate a query with an unbounded table function call.")
+  } else {
+    logicalPlan.toRelNode(relBuilder)
+  }
 
   /**
     * Returns the schema of this table.
@@ -143,7 +165,35 @@ class Table(
     * }}}
     */
   def as(fields: Expression*): Table = {
-    new Table(tableEnv, AliasNode(fields, logicalPlan).validate(tableEnv))
+
+    logicalPlan match {
+      case functionCall: LogicalTableFunctionCall if functionCall.child == null => {
+        // If the logical plan is a TableFunctionCall, we replace its field names to avoid special
+        //   cases during the validation.
+        if (fields.length != functionCall.output.length) {
+          throw new ValidationException(
+            "List of column aliases must have same degree as TableFunction's output")
+        }
+        if (!fields.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+          throw new ValidationException(
+            "Alias field must be an instance of UnresolvedFieldReference"
+          )
+        }
+        new Table(
+          tableEnv,
+          new LogicalTableFunctionCall(
+            functionCall.functionName,
+            functionCall.tableFunction,
+            functionCall.parameters,
+            functionCall.resultType,
+            fields.map(_.asInstanceOf[UnresolvedFieldReference].name).toArray,
+            functionCall.child)
+        )
+      }
+      case _ =>
+        // prepend an AliasNode
+        new Table(tableEnv, AliasNode(fields, logicalPlan).validate(tableEnv))
+    }
   }
 
   /**
@@ -310,6 +360,41 @@ class Table(
   }
 
   /**
+    * Joins this [[Table]] with an user-defined [[org.apache.calcite.schema.TableFunction]].
+    * This join is similar to a SQL left outer join with ON TRUE predicate, but it works with a
+    * table function. Each row of the outer table is joined with all rows produced by the table
+    * function. If the table function does not produce any row, the outer row is padded with nulls.
+    *
+    * Scala Example:
+    * {{{
+    *   class MySplitUDTF extends TableFunction[String] {
+    *     def eval(str: String): Unit = {
+    *       str.split("#").foreach(collect)
+    *     }
+    *   }
+    *
+    *   val split = new MySplitUDTF()
+    *   table.leftOuterJoin(split('c) as ('s)).select('a,'b,'c,'s)
+    * }}}
+    *
+    * Java Example:
+    * {{{
+    *   class MySplitUDTF extends TableFunction<String> {
+    *     public void eval(String str) {
+    *       str.split("#").forEach(this::collect);
+    *     }
+    *   }
+    *
+    *   TableFunction<String> split = new MySplitUDTF();
+    *   tableEnv.registerFunction("split", split);
+    *   table.leftOuterJoin(new Table(tableEnv, "split(c)").as("s"))).select("a, b, c, s");
+    * }}}
+    */
+  def leftOuterJoin(right: Table): Table = {
+    join(right, None, JoinType.LEFT_OUTER)
+  }
+
+  /**
     * Joins two [[Table]]s. Similar to an SQL left outer join. The fields of the two joined
     * operations must not overlap, use [[as]] to rename fields if necessary.
     *
@@ -417,14 +502,46 @@ class Table(
   }
 
   private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {
-    // check that right table belongs to the same TableEnvironment
-    if (right.tableEnv != this.tableEnv) {
-      throw new ValidationException("Only tables from the same TableEnvironment can be joined.")
+
+    // check if we join with a table or a table function
+    if (!containsUnboudedUDTFCall(right.logicalPlan)) {
+      // regular table-table join
+
+      // check that the TableEnvironment of right table is not null
+      // and right table belongs to the same TableEnvironment
+      if (right.tableEnv != this.tableEnv) {
+        throw new ValidationException("Only tables from the same TableEnvironment can be joined.")
+      }
+
+      new Table(
+        tableEnv,
+        Join(this.logicalPlan, right.logicalPlan, joinType, joinPredicate, correlated = false)
+          .validate(tableEnv))
+
+    } else {
+      // join with a table function
+
+      // check join type
+      if (joinType != JoinType.INNER && joinType != JoinType.LEFT_OUTER) {
+        throw new ValidationException(
+          "TableFunctions are currently supported for join and leftOuterJoin.")
+      }
+
+      val udtf = right.logicalPlan.asInstanceOf[LogicalTableFunctionCall]
+      val udtfCall = LogicalTableFunctionCall(
+        udtf.functionName,
+        udtf.tableFunction,
+        udtf.parameters,
+        udtf.resultType,
+        udtf.fieldNames,
+        this.logicalPlan
+      ).validate(tableEnv)
+
+      new Table(
+        tableEnv,
+        Join(this.logicalPlan, udtfCall, joinType, joinPredicate, correlated = true)
+          .validate(tableEnv))
     }
-    new Table(
-      tableEnv,
-      Join(this.logicalPlan, right.logicalPlan, joinType, joinPredicate, correlated = false)
-        .validate(tableEnv))
   }
 
   /**
@@ -633,136 +750,6 @@ class Table(
   }
 
   /**
-    * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
-    * to an SQL cross join, but it works with a table function. It returns rows from the outer
-    * table (table on the left of the operator) that produces matching values from the table
-    * function (which is defined in the expression on the right side of the operator).
-    *
-    * Example:
-    *
-    * {{{
-    *   class MySplitUDTF extends TableFunction[String] {
-    *     def eval(str: String): Unit = {
-    *       str.split("#").foreach(collect)
-    *     }
-    *   }
-    *
-    *   val split = new MySplitUDTF()
-    *   table.join(split('c) as ('s)).select('a,'b,'c,'s)
-    * }}}
-    */
-  def join(udtf: Expression): Table = {
-    joinUdtfInternal(udtf, JoinType.INNER)
-  }
-
-  /**
-    * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
-    * to an SQL cross join, but it works with a table function. It returns rows from the outer
-    * table (table on the left of the operator) that produces matching values from the table
-    * function (which is defined in the expression on the right side of the operator).
-    *
-    * Example:
-    *
-    * {{{
-    *   class MySplitUDTF extends TableFunction<String> {
-    *     public void eval(String str) {
-    *       str.split("#").forEach(this::collect);
-    *     }
-    *   }
-    *
-    *   TableFunction<String> split = new MySplitUDTF();
-    *   tableEnv.registerFunction("split", split);
-    *
-    *   table.join("split(c) as (s)").select("a, b, c, s");
-    * }}}
-    */
-  def join(udtf: String): Table = {
-    joinUdtfInternal(udtf, JoinType.INNER)
-  }
-
-  /**
-    * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
-    * to an SQL left outer join with ON TRUE, but it works with a table function. It returns all
-    * the rows from the outer table (table on the left of the operator), and rows that do not match
-    * the condition from the table function (which is defined in the expression on the right
-    * side of the operator). Rows with no matching condition are filled with null values.
-    *
-    * Example:
-    *
-    * {{{
-    *   class MySplitUDTF extends TableFunction[String] {
-    *     def eval(str: String): Unit = {
-    *       str.split("#").foreach(collect)
-    *     }
-    *   }
-    *
-    *   val split = new MySplitUDTF()
-    *   table.leftOuterJoin(split('c) as ('s)).select('a,'b,'c,'s)
-    * }}}
-    */
-  def leftOuterJoin(udtf: Expression): Table = {
-    joinUdtfInternal(udtf, JoinType.LEFT_OUTER)
-  }
-
-  /**
-    * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
-    * to an SQL left outer join with ON TRUE, but it works with a table function. It returns all
-    * the rows from the outer table (table on the left of the operator), and rows that do not match
-    * the condition from the table function (which is defined in the expression on the right
-    * side of the operator). Rows with no matching condition are filled with null values.
-    *
-    * Example:
-    *
-    * {{{
-    *   class MySplitUDTF extends TableFunction<String> {
-    *     public void eval(String str) {
-    *       str.split("#").forEach(this::collect);
-    *     }
-    *   }
-    *
-    *   TableFunction<String> split = new MySplitUDTF();
-    *   tableEnv.registerFunction("split", split);
-    *
-    *   table.leftOuterJoin("split(c) as (s)").select("a, b, c, s");
-    * }}}
-    */
-  def leftOuterJoin(udtf: String): Table = {
-    joinUdtfInternal(udtf, JoinType.LEFT_OUTER)
-  }
-
-  private def joinUdtfInternal(udtfString: String, joinType: JoinType): Table = {
-    val udtf = ExpressionParser.parseExpression(udtfString)
-    joinUdtfInternal(udtf, joinType)
-  }
-
-  private def joinUdtfInternal(udtf: Expression, joinType: JoinType): Table = {
-    var alias: Option[Seq[String]] = None
-
-    // unwrap an Expression until we get a TableFunctionCall
-    def unwrap(expr: Expression): TableFunctionCall = expr match {
-      case Alias(child, name, extraNames) =>
-        alias = Some(Seq(name) ++ extraNames)
-        unwrap(child)
-      case Call(name, args) =>
-        val function = tableEnv.getFunctionCatalog.lookupFunction(name, args)
-        unwrap(function)
-      case c: TableFunctionCall => c
-      case _ =>
-        throw new TableException(
-          "Cross/Outer Apply operators only accept expressions that define table functions.")
-    }
-
-    val call = unwrap(udtf)
-      .as(alias)
-      .toLogicalTableFunctionCall(this.logicalPlan)
-      .validate(tableEnv)
-
-    new Table(
-      tableEnv,
-      Join(this.logicalPlan, call, joinType, None, correlated = true).validate(tableEnv))
-  }
-
-  /**
     * Writes the [[Table]] to a [[TableSink]]. A [[TableSink]] defines an external storage location.
     *
     * A batch [[Table]] can only be written to a
@@ -773,7 +760,6 @@ class Table(
     * @tparam T The data type that the [[TableSink]] expects.
     */
   def writeToSink[T](sink: TableSink[T]): Unit = {
-
     // get schema information of table
     val rowType = getRelNode.getRowType
     val fieldNames: Array[String] = rowType.getFieldNames.asScala.toArray
@@ -859,6 +845,20 @@ class Table(
     }
     tableName
   }
+
+  /**
+    * Checks if the plan represented by a [[LogicalNode]] contains an unbounded UDTF call.
+    * @param n the node to check
+    * @return true if the plan contains an unbounded UDTF call, false otherwise.
+    */
+  private def containsUnboudedUDTFCall(n: LogicalNode): Boolean = {
+    n match {
+      case functionCall: LogicalTableFunctionCall if functionCall.child == null => true
+      case u: UnaryNode => containsUnboudedUDTFCall(u.child)
+      case b: BinaryNode => containsUnboudedUDTFCall(b.left) || containsUnboudedUDTFCall(b.right)
+      case _: LeafNode => false
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/c969237f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala
index d4c5021..5354349 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.table.functions
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.expressions.{Expression, TableFunctionCall}
 import org.apache.flink.util.Collector
 
 /**
@@ -79,21 +78,6 @@ import org.apache.flink.util.Collector
   */
 abstract class TableFunction[T] extends UserDefinedFunction {
 
-  /**
-    * Creates a call to a [[TableFunction]] in Scala Table API.
-    *
-    * @param params actual parameters of function
-    * @return [[Expression]] in form of a [[TableFunctionCall]]
-    */
-  final def apply(params: Expression*)(implicit typeInfo: TypeInformation[T]): Expression = {
-    val resultType = if (getResultType == null) {
-      typeInfo
-    } else {
-      getResultType
-    }
-    TableFunctionCall(getClass.getSimpleName, this, params, resultType)
-  }
-
   override def toString: String = getClass.getCanonicalName
 
   // ----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c969237f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index c1cfe06..d108e31 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -19,7 +19,7 @@
 
 package org.apache.flink.table.functions.utils
 
-import java.lang.{Long => JLong, Integer => JInt}
+import java.lang.{Integer => JInt, Long => JLong}
 import java.lang.reflect.{Method, Modifier}
 import java.sql.{Date, Time, Timestamp}
 
@@ -30,8 +30,10 @@ import org.apache.flink.api.common.functions.InvalidTypesException
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException}
+import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.{ScalarFunction, TableFunction, UserDefinedFunction}
+import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl
 import org.apache.flink.util.InstantiationUtil
 
@@ -358,4 +360,37 @@ object UserDefinedFunctionUtils {
     InstantiationUtil
       .deserializeObject[UserDefinedFunction](byteData, Thread.currentThread.getContextClassLoader)
   }
+
+  /**
+    * Creates a [[LogicalTableFunctionCall]] by parsing a String expression.
+    *
+    * @param tableEnv The table environmenent to lookup the function.
+    * @param udtf a String expression of a TableFunctionCall, such as "split(c)"
+    * @return A LogicalTableFunctionCall.
+    */
+  def createLogicalFunctionCall(
+      tableEnv: TableEnvironment,
+      udtf: String): LogicalTableFunctionCall = {
+
+    var alias: Option[Seq[String]] = None
+
+    // unwrap an Expression until we get a TableFunctionCall
+    def unwrap(expr: Expression): TableFunctionCall = expr match {
+      case Alias(child, name, extraNames) =>
+        alias = Some(Seq(name) ++ extraNames)
+        unwrap(child)
+      case Call(name, args) =>
+        val function = tableEnv.functionCatalog.lookupFunction(name, args)
+        unwrap(function)
+      case c: TableFunctionCall => c
+      case _ =>
+        throw new TableException(
+          "Table(TableEnv, String) constructor only accept String that " +
+            "define table function followed by some Alias.")
+    }
+
+    val functionCall: LogicalTableFunctionCall = unwrap(ExpressionParser.parseExpression(udtf))
+      .as(alias).toLogicalTableFunctionCall(child = null)
+    functionCall
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c969237f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index dbfd3ce..c67bfd1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -170,7 +170,7 @@ case class Limit(offset: Int, fetch: Int = -1, child: LogicalNode) extends Unary
     if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
       failValidation(s"Limit on stream tables is currently not supported.")
     }
-    if (!child.validate(tableEnv).isInstanceOf[Sort]) {
+    if (!child.isInstanceOf[Sort]) {
       failValidation(s"Limit operator must be preceded by an OrderBy operator.")
     }
     if (offset < 0) {
@@ -662,11 +662,19 @@ case class LogicalTableFunctionCall(
     child: LogicalNode)
   extends UnaryNode {
 
-  private val (_, fieldIndexes, fieldTypes) = getFieldInfo(resultType)
+  private val (generatedNames, fieldIndexes, fieldTypes) = getFieldInfo(resultType)
   private var evalMethod: Method = _
 
-  override def output: Seq[Attribute] = fieldNames.zip(fieldTypes).map {
-    case (n, t) => ResolvedFieldReference(n, t)
+  override def output: Seq[Attribute] = {
+    if (fieldNames.isEmpty) {
+      generatedNames.zip(fieldTypes).map {
+        case (n, t) => ResolvedFieldReference(n, t)
+      }
+    } else {
+      fieldNames.zip(fieldTypes).map {
+        case (n, t) => ResolvedFieldReference(n, t)
+      }
+    }
   }
 
   override def validate(tableEnv: TableEnvironment): LogicalNode = {
@@ -691,7 +699,11 @@ case class LogicalTableFunctionCall(
 
   override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
     val fieldIndexes = getFieldInfo(resultType)._2
-    val function = new FlinkTableFunctionImpl(resultType, fieldIndexes, fieldNames, evalMethod)
+    val function = new FlinkTableFunctionImpl(
+      resultType,
+      fieldIndexes,
+      if (fieldNames.isEmpty) generatedNames else fieldNames, evalMethod
+    )
     val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
     val sqlFunction = TableSqlFunction(
       tableFunction.functionIdentifier,

http://git-wip-us.apache.org/repos/asf/flink/blob/c969237f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
index 5993728..8085a3c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
@@ -24,8 +24,9 @@ import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableC
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.types.Row
-import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException}
+import org.apache.flink.table.api.{TableEnvironment, ValidationException}
 import org.apache.flink.table.expressions.Literal
+import org.apache.flink.table.utils.TableFunc2
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
@@ -345,4 +346,26 @@ class JoinITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test
+  def testUDTFJoinOnTuples(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val data = List("hi#world", "how#are#you")
+
+    val ds1 = env.fromCollection(data).toTable(tEnv, 'a)
+    val func2 = new TableFunc2
+
+    val joinDs = ds1.join(func2('a) as ('name, 'len))
+
+    val results = joinDs.toDataSet[Row].collect()
+    val expected = Seq(
+      "hi#world,hi,2",
+      "hi#world,world,5",
+      "how#are#you,how,3",
+      "how#are#you,are,3",
+      "how#are#you,you,3").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c969237f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
index 2dbcccf..4d7f6cb 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.types.Row
 import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils.{PojoTableFunc, TableFunc2, _}
-import org.apache.flink.table.api.{TableEnvironment, Types}
+import org.apache.flink.table.api.{Table, TableEnvironment, Types}
 import org.junit.Test
 import org.mockito.Mockito._
 
@@ -54,24 +54,24 @@ class UserDefinedTableFunctionTest extends TableTestBase {
     val func1 = new TableFunc1
     javaTableEnv.registerFunction("func1", func1)
     var scalaTable = in1.join(func1('c) as 's).select('c, 's)
-    var javaTable = in2.join("func1(c).as(s)").select("c, s")
+    var javaTable = in2.join(new Table(javaTableEnv, "func1(c).as(s)")).select("c, s")
     verifyTableEquals(scalaTable, javaTable)
 
     // test left outer join
     scalaTable = in1.leftOuterJoin(func1('c) as 's).select('c, 's)
-    javaTable = in2.leftOuterJoin("as(func1(c), s)").select("c, s")
+    javaTable = in2.leftOuterJoin(new Table(javaTableEnv, "as(func1(c), s)")).select("c, s")
     verifyTableEquals(scalaTable, javaTable)
 
     // test overloading
     scalaTable = in1.join(func1('c, "$") as 's).select('c, 's)
-    javaTable = in2.join("func1(c, '$') as (s)").select("c, s")
+    javaTable = in2.join(new Table(javaTableEnv, "func1(c, '$') as (s)")).select("c, s")
     verifyTableEquals(scalaTable, javaTable)
 
     // test custom result type
     val func2 = new TableFunc2
     javaTableEnv.registerFunction("func2", func2)
     scalaTable = in1.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
-    javaTable = in2.join("func2(c).as(name, len)").select("c, name, len")
+    javaTable = in2.join(new Table(javaTableEnv, "func2(c).as(name, len)")).select("c, name, len")
     verifyTableEquals(scalaTable, javaTable)
 
     // test hierarchy generic type
@@ -79,7 +79,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
     javaTableEnv.registerFunction("hierarchy", hierarchy)
     scalaTable = in1.join(hierarchy('c) as ('name, 'adult, 'len))
       .select('c, 'name, 'len, 'adult)
-    javaTable = in2.join("AS(hierarchy(c), name, adult, len)")
+    javaTable = in2.join(new Table(javaTableEnv, "AS(hierarchy(c), name, adult, len)"))
       .select("c, name, len, adult")
     verifyTableEquals(scalaTable, javaTable)
 
@@ -88,21 +88,21 @@ class UserDefinedTableFunctionTest extends TableTestBase {
     javaTableEnv.registerFunction("pojo", pojo)
     scalaTable = in1.join(pojo('c))
       .select('c, 'name, 'age)
-    javaTable = in2.join("pojo(c)")
+    javaTable = in2.join(new Table(javaTableEnv, "pojo(c)"))
       .select("c, name, age")
     verifyTableEquals(scalaTable, javaTable)
 
     // test with filter
     scalaTable = in1.join(func2('c) as ('name, 'len))
       .select('c, 'name, 'len).filter('len > 2)
-    javaTable = in2.join("func2(c) as (name, len)")
+    javaTable = in2.join(new Table(javaTableEnv, "func2(c) as (name, len)"))
       .select("c, name, len").filter("len > 2")
     verifyTableEquals(scalaTable, javaTable)
 
     // test with scalar function
     scalaTable = in1.join(func1('c.substring(2)) as 's)
       .select('a, 'c, 's)
-    javaTable = in2.join("func1(substring(c, 2)) as (s)")
+    javaTable = in2.join(new Table(javaTableEnv, "func1(substring(c, 2)) as (s)"))
       .select("a, c, s")
     verifyTableEquals(scalaTable, javaTable)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/c969237f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala
index 56b9fdb..3edfd8c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala
@@ -57,24 +57,24 @@ class UserDefinedTableFunctionTest extends TableTestBase {
     val func1 = new TableFunc1
     javaTableEnv.registerFunction("func1", func1)
     var scalaTable = in1.join(func1('c) as 's).select('c, 's)
-    var javaTable = in2.join("func1(c).as(s)").select("c, s")
+    var javaTable = in2.join(new Table(javaTableEnv, "func1(c).as(s)")).select("c, s")
     verifyTableEquals(scalaTable, javaTable)
 
     // test left outer join
     scalaTable = in1.leftOuterJoin(func1('c) as 's).select('c, 's)
-    javaTable = in2.leftOuterJoin("as(func1(c), s)").select("c, s")
+    javaTable = in2.leftOuterJoin(new Table(javaTableEnv, "func1(c)").as("s")).select("c, s")
     verifyTableEquals(scalaTable, javaTable)
 
     // test overloading
     scalaTable = in1.join(func1('c, "$") as 's).select('c, 's)
-    javaTable = in2.join("func1(c, '$') as (s)").select("c, s")
+    javaTable = in2.join(new Table(javaTableEnv, "func1(c, '$') as (s)")).select("c, s")
     verifyTableEquals(scalaTable, javaTable)
 
     // test custom result type
     val func2 = new TableFunc2
     javaTableEnv.registerFunction("func2", func2)
     scalaTable = in1.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
-    javaTable = in2.join("func2(c).as(name, len)").select("c, name, len")
+    javaTable = in2.join(new Table(javaTableEnv, "func2(c).as(name, len)")).select("c, name, len")
     verifyTableEquals(scalaTable, javaTable)
 
     // test hierarchy generic type
@@ -82,7 +82,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
     javaTableEnv.registerFunction("hierarchy", hierarchy)
     scalaTable = in1.join(hierarchy('c) as ('name, 'adult, 'len))
       .select('c, 'name, 'len, 'adult)
-    javaTable = in2.join("AS(hierarchy(c), name, adult, len)")
+    javaTable = in2.join(new Table(javaTableEnv, "AS(hierarchy(c), name, adult, len)"))
       .select("c, name, len, adult")
     verifyTableEquals(scalaTable, javaTable)
 
@@ -91,21 +91,21 @@ class UserDefinedTableFunctionTest extends TableTestBase {
     javaTableEnv.registerFunction("pojo", pojo)
     scalaTable = in1.join(pojo('c))
       .select('c, 'name, 'age)
-    javaTable = in2.join("pojo(c)")
+    javaTable = in2.join(new Table(javaTableEnv, "pojo(c)"))
       .select("c, name, age")
     verifyTableEquals(scalaTable, javaTable)
 
     // test with filter
     scalaTable = in1.join(func2('c) as ('name, 'len))
       .select('c, 'name, 'len).filter('len > 2)
-    javaTable = in2.join("func2(c) as (name, len)")
+    javaTable = in2.join(new Table(javaTableEnv, "func2(c) as (name, len)"))
       .select("c, name, len").filter("len > 2")
     verifyTableEquals(scalaTable, javaTable)
 
     // test with scalar function
     scalaTable = in1.join(func1('c.substring(2)) as 's)
       .select('a, 'c, 's)
-    javaTable = in2.join("func1(substring(c, 2)) as (s)")
+    javaTable = in2.join(new Table(javaTableEnv, "func1(substring(c, 2)) as (s)"))
       .select("a, c, s")
     verifyTableEquals(scalaTable, javaTable)
 
@@ -114,8 +114,103 @@ class UserDefinedTableFunctionTest extends TableTestBase {
       tableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
     expectExceptionThrown(
       javaTableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
+    expectExceptionThrown(in1.join(ObjectTableFunction('a, 1)), "Scala object")
+
+  }
+
+  @Test
+  def testInvalidTableFunctions(): Unit = {
+    // mock
+    val ds = mock(classOf[DataStream[Row]])
+    val jDs = mock(classOf[JDataStream[Row]])
+    val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
+    when(ds.javaStream).thenReturn(jDs)
+    when(jDs.getType).thenReturn(typeInfo)
+
+    // Scala environment
+    val env = mock(classOf[ScalaExecutionEnv])
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val in1 = ds.toTable(tableEnv).as('a, 'b, 'c)
+
+    // Java environment
+    val javaEnv = mock(classOf[JavaExecutionEnv])
+    val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
+    val in2 = javaTableEnv.fromDataStream(jDs).as("a, b, c")
+
+    val func1 = new TableFunc1
+    javaTableEnv.registerFunction("func1", func1)
+
+    // table function call select
+    expectExceptionThrown(
+      func1('c).select("f0"),
+      "TableFunction can only be used in join and leftOuterJoin."
+    )
+
+    // table function call select
+    expectExceptionThrown(
+      func1('c).select('f0),
+      "TableFunction can only be used in join and leftOuterJoin."
+    )
+
+    // table function call writeToSink
     expectExceptionThrown(
-      in1.join(ObjectTableFunction('a, 1)), "Scala object")
+      func1('c).writeToSink(null),
+      "Cannot translate a query with an unbounded table function call."
+    )
+
+    // table function call distinct
+    expectExceptionThrown(
+      func1('c).distinct(),
+      "TableFunction can only be used in join and leftOuterJoin."
+    )
+
+    // table function call filter
+    expectExceptionThrown(
+      func1('c).filter('f0 === "?"),
+      "TableFunction can only be used in join and leftOuterJoin."
+    )
+
+    // table function call filter
+    expectExceptionThrown(
+      func1('c).filter("f0 = '?'"),
+      "TableFunction can only be used in join and leftOuterJoin."
+    )
+
+    // table function call limit
+    expectExceptionThrown(
+      func1('c).orderBy('f0).limit(3),
+      "TableFunction can only be used in join and leftOuterJoin."
+    )
+
+    // table function call limit
+    expectExceptionThrown(
+      func1('c).orderBy('f0).limit(0, 3),
+      "TableFunction can only be used in join and leftOuterJoin."
+    )
+
+    // table function call orderBy
+    expectExceptionThrown(
+      func1('c).orderBy("f0"),
+      "TableFunction can only be used in join and leftOuterJoin."
+    )
+
+    // table function call orderBy
+    expectExceptionThrown(
+      func1('c).orderBy('f0),
+      "TableFunction can only be used in join and leftOuterJoin."
+    )
+
+    // table function call where
+    expectExceptionThrown(
+      func1('c).where("f0 = '?'"),
+      "TableFunction can only be used in join and leftOuterJoin."
+    )
+
+    // table function call where
+    expectExceptionThrown(
+      func1('c).where('f0 === "?"),
+      "TableFunction can only be used in join and leftOuterJoin."
+    )
 
   }
 
@@ -137,7 +232,9 @@ class UserDefinedTableFunctionTest extends TableTestBase {
 
     //============ throw exception when table function is not registered =========
     // Java Table API call
-    expectExceptionThrown(t.join("nonexist(a)"), "Undefined function: NONEXIST")
+    expectExceptionThrown(
+      t.join(new Table(util.tEnv, "nonexist(a)")
+      ), "Undefined function: NONEXIST")
     // SQL API call
     expectExceptionThrown(
       util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(nonexist(a))"),
@@ -145,11 +242,12 @@ class UserDefinedTableFunctionTest extends TableTestBase {
 
 
     //========= throw exception when the called function is a scalar function ====
-    util.addFunction("func0", Func0)
+    util.tEnv.registerFunction("func0", Func0)
+
     // Java Table API call
     expectExceptionThrown(
-      t.join("func0(a)"),
-      "only accept expressions that define table functions",
+      t.join(new Table(util.tEnv, "func0(a)")),
+      "only accept String that define table function",
       classOf[TableException])
     // SQL API call
     // NOTE: it doesn't throw an exception but an AssertionError, maybe a Calcite bug
@@ -162,7 +260,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
     // Java Table API call
     util.addFunction("func2", new TableFunc2)
     expectExceptionThrown(
-      t.join("func2(c, c)"),
+      t.join(new Table(util.tEnv, "func2(c, c)")),
       "Given parameters of function 'FUNC2' do not match any signature")
     // SQL API call
     expectExceptionThrown(