You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/13 13:34:18 UTC

[1/3] flink git commit: [FLINK-5223] [doc] Add documentation of UDTF in Table API & SQL

Repository: flink
Updated Branches:
  refs/heads/master 270140a1d -> 5baea3f2e


[FLINK-5223] [doc] Add documentation of UDTF in Table API & SQL

This closes #2956.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c86efbb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c86efbb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c86efbb

Branch: refs/heads/master
Commit: 5c86efbb449c631aea0b1b490cec706ad7596b44
Parents: da4af12
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Wed Dec 7 21:18:58 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Dec 13 14:13:17 2016 +0100

----------------------------------------------------------------------
 docs/dev/table_api.md | 123 +++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 123 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5c86efbb/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 2b42ab2..9271803 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -3897,6 +3897,129 @@ object TimestampModifier extends ScalarFunction {
 </div>
 </div>
 
+### User-defined Table Functions
+
+A user-defined table function is implemented similar to a user-defined scalar function but can return a set of values instead of a single value. The returned set of values can consist of multiple columns and multiple rows similar to a standard table. A user-defined table function works on zero, one, or multiple scalar values as input and returns multiple rows as output.
+
+In order to define a table function one has to extend the base class `TableFunction` in `org.apache.flink.api.table.functions` and implement (one or more) evaluation methods. The behavior of a table function is determined by its evaluation methods. An evaluation method must be declared `public` and named `eval`. The `TableFunction` can be overloaded by implementing multiple methods named `eval`. The parameter types of the evaluation methods determine all valid parameters of the table function. The type of the returned table is determined by the generic type of `TableFunction`. Evaluation methods emit output rows using the protected `collect(T)` method.
+
+In the Table API, a table function is used with `.join(Expression)` or `.leftOuterJoin(Expression)` for Scala users and `.join(String)` or `.leftOuterJoin(String)` for Java users. The `join` operator (cross) joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator). The `leftOuterJoin` operator joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator) and preserves outer rows for which the table function returns an empty table. In SQL use `LATERAL TABLE(<TableFunction>)` with CROSS JOIN and LEFT JOIN with ON TRUE condition (see examples below).
+
+The following examples show how to define a table-valued function and use it:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// the generic type "Tuple2<String, Integer>" determines the returned table type has two columns,
+// the first is a String type and the second is an Integer type
+public class Split extends TableFunction<Tuple2<String, Integer>> {
+    public void evel(String str) {
+        for (String s : str.split(" ")) {
+            // use collect(...) to emit an output row
+            collect(new Tuple2<String, Integer>(s, s.length()));
+        }
+    }
+}
+
+BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+Table myTable = ...         // table schema: [a: String]
+
+// register the function
+tableEnv.registerFunction("split", new Split());
+
+// use the function in Java Table API
+// use AS to rename column names
+myTable.join("split(a) as (word, length)").select("a, word, length");
+myTable.leftOuterJoin("split(a) as (word, length)").select("a, word, length");
+
+// use the function in SQL API, LATERAL and TABLE keywords are required
+// CROSS JOIN a table function (equivalent to "join" in Table API)
+tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
+// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API)
+tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// the generic type "(String, Integer)" determines the returned table type has two columns,
+// the first is a String type and the second is an Integer type
+class Split extends TableFunction[(String, Integer)] {
+  def evel(str: String): Unit = {
+    // use collect(...) to emit an output row
+    str.split(" ").foreach(x -> collect((x, x.length))
+  }
+}
+
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+val myTable = ...         // table schema: [a: String]
+
+// use the function in Scala Table API (Note: No registration required in Scala Table API)
+val split = new Split()
+// use AS to rename column names
+myTable.join(split('a) as ('word, 'length)).select('a, 'word, 'length);
+myTable.leftOuterJoin(split('a) as ('word, 'length)).select('a, 'word, 'length);
+
+// register and use the function in SQL API, LATERAL and TABLE keywords are required
+tableEnv.registerFunction("split", new Split())
+// CROSS JOIN a table function (equivalent to "join" in Table API)
+tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
+// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API)
+tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word, length) ON TRUE");
+{% endhighlight %}
+**IMPORTANT:** Do not implement TableFunction as a Scala object. Scala object is a singleton and will cause concurrency issues.
+</div>
+</div>
+
+Please note that POJO types do not have a deterministic field order. Therefore, you cannot rename the fields of POJO returned by a table function using `AS`.
+
+By default the result type of a `TableFunction` is determined by Flink\u2019s automatic type extraction facilities. This works well for basic types and simple POJOs but might be wrong for more complex, custom, or composite types. In such a case, the type of the result can be manually specified by overriding `TableFunction#getResultType()` which returns its `TypeInformation`.
+
+The following example shows an example of a `TableFunction` that returns a `Row` type which requires explicit type information. We define that the returned table type should be `RowTypeInfo(String, Integer)` by overriding `TableFunction#getResultType()`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class CustomTypeSplit extends TableFunction<Row> {
+    public void eval(String str) {
+        for (String s : str.split(" ")) {
+            Row row = new Row(2);
+            row.setField(0, s);
+            row.setField(1, s.length);
+            collect(row);
+        }
+    }
+
+    @Override
+    public TypeInformation<Row> getResultType() {
+        return new RowTypeInfo(new TypeInformation[]{
+               			BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO});
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class CustomTypeSplit extends TableFunction[Row] {
+  def eval(str: String): Unit = {
+    str.split(" ").foreach({ s =>
+      val row = new Row(2)
+      row.setField(0, s)
+      row.setField(1, s.length)
+      collect(row)
+    })
+  }
+
+  override def getResultType: TypeInformation[Row] = {
+    new RowTypeInfo(Seq(BasicTypeInfo.STRING_TYPE_INFO,
+                        BasicTypeInfo.INT_TYPE_INFO))
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
 ### Limitations
 
 The following operations are not supported yet:


[3/3] flink git commit: [FLINK-3848] [table] Add ProjectableTableSource and push projections into BatchTableSourceScan.

Posted by fh...@apache.org.
[FLINK-3848] [table] Add ProjectableTableSource and push projections into BatchTableSourceScan.

This closes #2923.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5baea3f2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5baea3f2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5baea3f2

Branch: refs/heads/master
Commit: 5baea3f2e13cd2b6d904c617092372f368f12b55
Parents: 5c86efb
Author: beyond1920 <be...@126.com>
Authored: Fri Dec 2 11:33:12 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Dec 13 14:13:18 2016 +0100

----------------------------------------------------------------------
 .../nodes/dataset/BatchTableSourceScan.scala    |  13 +-
 .../api/table/plan/rules/FlinkRuleSets.scala    |   4 +-
 ...ushProjectIntoBatchTableSourceScanRule.scala |  84 +++++++++++
 .../rules/util/RexProgramProjectExtractor.scala | 120 +++++++++++++++
 .../table/sources/ProjectableTableSource.scala  |  38 +++++
 .../batch/ProjectableTableSourceITCase.scala    | 145 +++++++++++++++++++
 .../util/RexProgramProjectExtractorTest.scala   | 120 +++++++++++++++
 7 files changed, 522 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5baea3f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala
index 14da862..e368219 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala
@@ -19,7 +19,8 @@
 package org.apache.flink.api.table.plan.nodes.dataset
 
 import org.apache.calcite.plan._
-import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.table.{BatchTableEnvironment, FlinkTypeFactory}
@@ -39,6 +40,11 @@ class BatchTableSourceScan(
     flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes)
   }
 
+  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+    val rowCnt = metadata.getRowCount(this)
+    planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType))
+  }
+
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new BatchTableSourceScan(
       cluster,
@@ -48,6 +54,11 @@ class BatchTableSourceScan(
     )
   }
 
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw)
+      .item("fields", tableSource.getFieldsNames.mkString(", "))
+  }
+
   override def translateToPlan(
       tableEnv: BatchTableEnvironment,
       expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/5baea3f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index 6847425..183065c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -109,7 +109,9 @@ object FlinkRuleSets {
     DataSetSortRule.INSTANCE,
     DataSetValuesRule.INSTANCE,
     DataSetCorrelateRule.INSTANCE,
-    BatchTableSourceScanRule.INSTANCE
+    BatchTableSourceScanRule.INSTANCE,
+    // project pushdown optimization
+    PushProjectIntoBatchTableSourceScanRule.INSTANCE
   )
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/5baea3f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
new file mode 100644
index 0000000..301a45b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.flink.api.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc}
+import org.apache.flink.api.table.plan.rules.util.RexProgramProjectExtractor._
+import org.apache.flink.api.table.sources.{BatchTableSource, ProjectableTableSource}
+
+/**
+  * This rule tries to push projections into a BatchTableSourceScan.
+  */
+class PushProjectIntoBatchTableSourceScanRule extends RelOptRule(
+  operand(classOf[DataSetCalc],
+          operand(classOf[BatchTableSourceScan], none)),
+  "PushProjectIntoBatchTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall) = {
+    val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
+    scan.tableSource match {
+      case _: ProjectableTableSource[_] => true
+      case _ => false
+    }
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+    val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
+    val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan]
+
+    val usedFields: Array[Int] = extractRefInputFields(calc.calcProgram)
+
+    // if no fields can be projected, there is no need to transform subtree
+    if (scan.tableSource.getNumberOfFields != usedFields.length) {
+      val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]]
+      val newTableSource = originTableSource.projectFields(usedFields)
+      val newScan = new BatchTableSourceScan(
+        scan.getCluster,
+        scan.getTraitSet,
+        scan.getTable,
+        newTableSource.asInstanceOf[BatchTableSource[_]])
+
+      val newCalcProgram = rewriteRexProgram(
+        calc.calcProgram,
+        newScan.getRowType,
+        usedFields,
+        calc.getCluster.getRexBuilder)
+
+      // if project merely returns its input and doesn't exist filter, remove datasetCalc nodes
+      if (newCalcProgram.isTrivial) {
+        call.transformTo(newScan)
+      } else {
+        val newCalc = new DataSetCalc(
+          calc.getCluster,
+          calc.getTraitSet,
+          newScan,
+          calc.getRowType,
+          newCalcProgram,
+          description)
+        call.transformTo(newCalc)
+      }
+    }
+  }
+}
+
+object PushProjectIntoBatchTableSourceScanRule {
+  val INSTANCE: RelOptRule = new PushProjectIntoBatchTableSourceScanRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5baea3f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractor.scala
new file mode 100644
index 0000000..d78e07f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractor.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.util
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex._
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+object RexProgramProjectExtractor {
+
+  /**
+    * Extracts the indexes of input fields accessed by the RexProgram.
+    *
+    * @param rexProgram The RexProgram to analyze
+    * @return The indexes of accessed input fields
+    */
+  def extractRefInputFields(rexProgram: RexProgram): Array[Int] = {
+    val visitor = new RefFieldsVisitor
+    // extract input fields from project expressions
+    rexProgram.getProjectList.foreach(exp => rexProgram.expandLocalRef(exp).accept(visitor))
+    val condition = rexProgram.getCondition
+    // extract input fields from condition expression
+    if (condition != null) {
+      rexProgram.expandLocalRef(condition).accept(visitor)
+    }
+    visitor.getFields
+  }
+
+  /**
+    * Generates a new RexProgram based on mapped input fields.
+    *
+    * @param rexProgram      original RexProgram
+    * @param inputRowType    input row type
+    * @param usedInputFields indexes of used input fields
+    * @param rexBuilder      builder for Rex expressions
+    *
+    * @return A RexProgram with mapped input field expressions.
+    */
+  def rewriteRexProgram(
+      rexProgram: RexProgram,
+      inputRowType: RelDataType,
+      usedInputFields: Array[Int],
+      rexBuilder: RexBuilder): RexProgram = {
+
+    val inputRewriter = new InputRewriter(usedInputFields)
+    val newProjectExpressions = rexProgram.getProjectList.map(
+      exp => rexProgram.expandLocalRef(exp).accept(inputRewriter)
+    ).toList.asJava
+
+    val oldCondition = rexProgram.getCondition
+    val newConditionExpression = {
+      oldCondition match {
+        case ref: RexLocalRef => rexProgram.expandLocalRef(ref).accept(inputRewriter)
+        case _ => null // null does not match any type
+      }
+    }
+    RexProgram.create(
+      inputRowType,
+      newProjectExpressions,
+      newConditionExpression,
+      rexProgram.getOutputRowType,
+      rexBuilder
+    )
+  }
+}
+
+/**
+  * A RexVisitor to extract used input fields
+  */
+class RefFieldsVisitor extends RexVisitorImpl[Unit](true) {
+  private var fields = mutable.LinkedHashSet[Int]()
+
+  def getFields: Array[Int] = fields.toArray
+
+  override def visitInputRef(inputRef: RexInputRef): Unit = fields += inputRef.getIndex
+
+  override def visitCall(call: RexCall): Unit =
+    call.operands.foreach(operand => operand.accept(this))
+}
+
+/**
+  * A RexShuttle to rewrite field accesses of a RexProgram.
+  *
+  * @param fields fields mapping
+  */
+class InputRewriter(fields: Array[Int]) extends RexShuttle {
+
+  /** old input fields ref index -> new input fields ref index mappings */
+  private val fieldMap: Map[Int, Int] =
+    fields.zipWithIndex.toMap
+
+  override def visitInputRef(inputRef: RexInputRef): RexNode =
+    new RexInputRef(relNodeIndex(inputRef), inputRef.getType)
+
+  override def visitLocalRef(localRef: RexLocalRef): RexNode =
+    new RexInputRef(relNodeIndex(localRef), localRef.getType)
+
+  private def relNodeIndex(ref: RexSlot): Int =
+    fieldMap.getOrElse(ref.getIndex,
+      throw new IllegalArgumentException("input field contains invalid index"))
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5baea3f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/ProjectableTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/ProjectableTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/ProjectableTableSource.scala
new file mode 100644
index 0000000..c04138a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/ProjectableTableSource.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.sources
+
+/**
+  * Adds support for projection push-down to a [[TableSource]].
+  * A [[TableSource]] extending this interface is able to project the fields of the return table.
+  *
+  * @tparam T The return type of the [[ProjectableTableSource]].
+  */
+trait ProjectableTableSource[T] {
+
+  /**
+    * Creates a copy of the [[ProjectableTableSource]] that projects its output on the specified
+    * fields.
+    *
+    * @param fields The indexes of the fields to return.
+    * @return A copy of the [[ProjectableTableSource]] that projects its output.
+    */
+  def projectFields(fields: Array[Int]): ProjectableTableSource[T]
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5baea3f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ProjectableTableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ProjectableTableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ProjectableTableSourceITCase.scala
new file mode 100644
index 0000000..42b9de0
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ProjectableTableSourceITCase.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.batch
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.{DataSet => JavaSet, ExecutionEnvironment => JavaExecEnv}
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.sources.{BatchTableSource, ProjectableTableSource}
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.{Before, Test}
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class ProjectableTableSourceITCase(mode: TestExecutionMode,
+  configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  private val tableName = "MyTable"
+  private var tableEnv: BatchTableEnvironment = null
+
+  @Before
+  def initTableEnv(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    tableEnv = TableEnvironment.getTableEnvironment(env, config)
+    tableEnv.registerTableSource(tableName, new TestProjectableTableSource)
+  }
+
+  @Test
+  def testTableAPI(): Unit = {
+    val results = tableEnv
+                  .scan(tableName)
+                  .where("amount < 4")
+                  .select("id, name")
+                  .collect()
+
+    val expected = Seq(
+      "0,Record_0", "1,Record_1", "2,Record_2", "3,Record_3", "16,Record_16",
+      "17,Record_17", "18,Record_18", "19,Record_19", "32,Record_32").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+
+  @Test
+  def testSQL(): Unit = {
+    val results = tableEnv
+                  .sql(s"select id, name from $tableName where amount < 4 ")
+                  .collect()
+
+    val expected = Seq(
+      "0,Record_0", "1,Record_1", "2,Record_2", "3,Record_3", "16,Record_16",
+      "17,Record_17", "18,Record_18", "19,Record_19", "32,Record_32").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+}
+
+class TestProjectableTableSource(
+  fieldTypes: Array[TypeInformation[_]],
+  fieldNames: Array[String])
+  extends BatchTableSource[Row] with ProjectableTableSource[Row] {
+
+  def this() = this(
+    fieldTypes = Array(
+      BasicTypeInfo.STRING_TYPE_INFO,
+      BasicTypeInfo.LONG_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.DOUBLE_TYPE_INFO),
+    fieldNames = Array[String]("name", "id", "amount", "price")
+  )
+
+  /** Returns the data of the table as a [[org.apache.flink.api.java.DataSet]]. */
+  override def getDataSet(execEnv: JavaExecEnv): JavaSet[Row] = {
+    execEnv.fromCollection(generateDynamicCollection(33, fieldNames).asJava, getReturnType)
+  }
+
+  /** Returns the types of the table fields. */
+  override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes
+
+  /** Returns the names of the table fields. */
+  override def getFieldsNames: Array[String] = fieldNames
+
+  /** Returns the [[TypeInformation]] for the return type. */
+  override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes)
+
+  /** Returns the number of fields of the table. */
+  override def getNumberOfFields: Int = fieldNames.length
+
+  override def projectFields(fields: Array[Int]): TestProjectableTableSource = {
+    val projectedFieldTypes = new Array[TypeInformation[_]](fields.length)
+    val projectedFieldNames = new Array[String](fields.length)
+
+    fields.zipWithIndex.foreach(f => {
+      projectedFieldTypes(f._2) = fieldTypes(f._1)
+      projectedFieldNames(f._2) = fieldNames(f._1)
+    })
+    new TestProjectableTableSource(projectedFieldTypes, projectedFieldNames)
+  }
+
+  private def generateDynamicCollection(num: Int, fieldNames: Array[String]): Seq[Row] = {
+    for {cnt <- 0 until num}
+      yield {
+        val row = new Row(fieldNames.length)
+        fieldNames.zipWithIndex.foreach(
+          f =>
+            f._1 match {
+              case "name" =>
+                row.setField(f._2, "Record_" + cnt)
+              case "id" =>
+                row.setField(f._2, cnt.toLong)
+              case "amount" =>
+                row.setField(f._2, cnt.toInt % 16)
+              case "price" =>
+                row.setField(f._2, cnt.toDouble / 3)
+              case _ =>
+                throw new IllegalArgumentException(s"unknown field name $f._1")
+            }
+        )
+        row
+      }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5baea3f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractorTest.scala
new file mode 100644
index 0000000..156f281
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractorTest.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.util
+
+import java.math.BigDecimal
+
+import org.apache.calcite.adapter.java.JavaTypeFactory
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
+import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, DOUBLE, INTEGER, VARCHAR}
+import org.apache.calcite.rex.{RexBuilder, RexProgram, RexProgramBuilder}
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+
+import scala.collection.JavaConverters._
+import org.apache.flink.api.table.plan.rules.util.RexProgramProjectExtractor._
+import org.junit.{Assert, Before, Test}
+
+/**
+  * This class is responsible for testing RexProgramProjectExtractor
+  */
+class RexProgramProjectExtractorTest {
+  private var typeFactory: JavaTypeFactory = null
+  private var rexBuilder: RexBuilder = null
+  private var allFieldTypes: Seq[RelDataType] = null
+  private val allFieldNames = List("name", "id", "amount", "price")
+
+  @Before
+  def setUp: Unit = {
+    typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT)
+    rexBuilder = new RexBuilder(typeFactory)
+    allFieldTypes = List(VARCHAR, BIGINT, INTEGER, DOUBLE).map(typeFactory.createSqlType(_))
+  }
+
+  @Test
+  def testExtractRefInputFields: Unit = {
+    val usedFields = extractRefInputFields(buildRexProgram)
+    Assert.assertArrayEquals(usedFields, Array(2, 3, 1))
+  }
+
+  @Test
+  def testRewriteRexProgram: Unit = {
+    val originRexProgram = buildRexProgram
+    Assert.assertTrue(extractExprStrList(originRexProgram).sameElements(Array(
+      "$0",
+      "$1",
+      "$2",
+      "$3",
+      "*($t2, $t3)",
+      "100",
+      "<($t4, $t5)",
+      "6",
+      ">($t1, $t7)",
+      "AND($t6, $t8)")))
+    // use amount, id, price fields to create a new RexProgram
+    val usedFields = Array(2, 3, 1)
+    val types = usedFields.map(allFieldTypes(_)).toList.asJava
+    val names = usedFields.map(allFieldNames(_)).toList.asJava
+    val inputRowType = typeFactory.createStructType(types, names)
+    val newRexProgram = rewriteRexProgram(originRexProgram, inputRowType, usedFields, rexBuilder)
+    Assert.assertTrue(extractExprStrList(newRexProgram).sameElements(Array(
+      "$0",
+      "$1",
+      "$2",
+      "*($t0, $t1)",
+      "100",
+      "<($t3, $t4)",
+      "6",
+      ">($t2, $t6)",
+      "AND($t5, $t7)")))
+  }
+
+  private def buildRexProgram: RexProgram = {
+    val types = allFieldTypes.asJava
+    val names = allFieldNames.asJava
+    val inputRowType = typeFactory.createStructType(types, names)
+    val builder = new RexProgramBuilder(inputRowType, rexBuilder)
+    val t0 = rexBuilder.makeInputRef(types.get(2), 2)
+    val t1 = rexBuilder.makeInputRef(types.get(1), 1)
+    val t2 = rexBuilder.makeInputRef(types.get(3), 3)
+    val t3 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY, t0, t2))
+    val t4 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L))
+    val t5 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(6L))
+    // project: amount, amount * price
+    builder.addProject(t0, "amount")
+    builder.addProject(t3, "total")
+    // condition: amount * price < 100 and id > 6
+    val t6 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, t3, t4))
+    val t7 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN, t1, t5))
+    val t8 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.AND, List(t6, t7).asJava))
+    builder.addCondition(t8)
+    builder.getProgram
+  }
+
+  /**
+    * extract all expression string list from input RexProgram expression lists
+    *
+    * @param rexProgram input RexProgram instance to analyze
+    * @return all expression string list of input RexProgram expression lists
+    */
+  private def extractExprStrList(rexProgram: RexProgram) = {
+    rexProgram.getExprList.asScala.map(_.toString)
+  }
+
+}


[2/3] flink git commit: [FLINK-5304] [table] Rename crossApply/outerApply to join/leftOuterJoin in Table API.

Posted by fh...@apache.org.
[FLINK-5304] [table] Rename crossApply/outerApply to join/leftOuterJoin in Table API.

This closes #2978.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da4af125
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da4af125
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da4af125

Branch: refs/heads/master
Commit: da4af1259ae750953ca2e7a3ecec342d9eb77bac
Parents: 270140a
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Fri Dec 9 10:31:15 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Dec 13 14:13:17 2016 +0100

----------------------------------------------------------------------
 .../api/table/functions/TableFunction.scala     |  4 +-
 .../api/table/plan/nodes/FlinkCorrelate.scala   |  8 +--
 .../plan/nodes/dataset/DataSetCorrelate.scala   |  2 +-
 .../nodes/datastream/DataStreamCorrelate.scala  |  2 +-
 .../org/apache/flink/api/table/table.scala      | 70 +++++++++-----------
 .../sql/UserDefinedTableFunctionTest.scala      |  4 +-
 .../table/UserDefinedTableFunctionTest.scala    | 46 ++++++-------
 .../sql/UserDefinedTableFunctionTest.scala      |  4 +-
 .../table/UserDefinedTableFunctionTest.scala    | 66 +++++++++---------
 .../dataset/DataSetCorrelateITCase.scala        | 20 +++---
 .../datastream/DataStreamCorrelateITCase.scala  |  8 +--
 11 files changed, 115 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/da4af125/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
index 3a56efb..ca9aaf1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
@@ -64,11 +64,11 @@ import org.apache.flink.api.table.expressions.{Expression, TableFunctionCall}
   *
   *   // for Scala users
   *   val split = new Split()
-  *   table.crossApply(split('c) as ('s)).select('a, 's)
+  *   table.join(split('c) as ('s)).select('a, 's)
   *
   *   // for Java users
   *   tEnv.registerFunction("split", new Split())   // register table function first
-  *   table.crossApply("split(a) as (s)").select("a, s")
+  *   table.join("split(a) as (s)").select("a, s")
   *
   *   // for SQL users
   *   tEnv.registerFunction("split", new Split())   // register table function first

http://git-wip-us.apache.org/repos/asf/flink/blob/da4af125/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
index 93a8f53..c058265 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
@@ -33,7 +33,7 @@ import org.apache.flink.api.table.{TableConfig, TableException}
 import scala.collection.JavaConverters._
 
 /**
-  * cross/outer apply a user-defined table function
+  * Join a user-defined table function
   */
 trait FlinkCorrelate {
 
@@ -63,7 +63,7 @@ trait FlinkCorrelate {
        """.stripMargin
 
     if (joinType == SemiJoinType.INNER) {
-      // cross apply
+      // cross join
       body +=
         s"""
            |if (!iter.hasNext()) {
@@ -71,9 +71,9 @@ trait FlinkCorrelate {
            |}
         """.stripMargin
     } else if (joinType == SemiJoinType.LEFT) {
-      // outer apply
+      // left outer join
 
-      // in case of outer apply and the returned row of table function is empty,
+      // in case of left outer join and the returned row of table function is empty,
       // fill all fields of row with null
       val input2NullExprs = input2AccessExprs.map { x =>
         GeneratedExpression(

http://git-wip-us.apache.org/repos/asf/flink/blob/da4af125/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
index 3cddf8b..95eb15b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
@@ -34,7 +34,7 @@ import org.apache.flink.api.table.plan.nodes.FlinkCorrelate
 import org.apache.flink.api.table.typeutils.TypeConverter._
 
 /**
-  * Flink RelNode which matches along with cross apply a user defined table function.
+  * Flink RelNode which matches along with join a user defined table function.
   */
 class DataSetCorrelate(
     cluster: RelOptCluster,

http://git-wip-us.apache.org/repos/asf/flink/blob/da4af125/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala
index 028cb10..3bfa6e2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala
@@ -33,7 +33,7 @@ import org.apache.flink.api.table.typeutils.TypeConverter._
 import org.apache.flink.streaming.api.datastream.DataStream
 
 /**
-  * Flink RelNode which matches along with cross apply a user defined table function.
+  * Flink RelNode which matches along with join a user defined table function.
   */
 class DataStreamCorrelate(
     cluster: RelOptCluster,

http://git-wip-us.apache.org/repos/asf/flink/blob/da4af125/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index b421c8e..b74ddb0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -611,11 +611,10 @@ class Table(
   }
 
   /**
-    * The Cross Apply operator returns rows from the outer table (table on the left of the
-    * operator) that produces matching values from the table-valued function (which is defined in
-    * the expression on the right side of the operator).
-    *
-    * The Cross Apply is equivalent to Inner Join, but it works with a table-valued function.
+    * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
+    * to an SQL cross join, but it works with a table function. It returns rows from the outer
+    * table (table on the left of the operator) that produces matching values from the table
+    * function (which is defined in the expression on the right side of the operator).
     *
     * Example:
     *
@@ -627,19 +626,18 @@ class Table(
     *   }
     *
     *   val split = new MySplitUDTF()
-    *   table.crossApply(split('c) as ('s)).select('a,'b,'c,'s)
+    *   table.join(split('c) as ('s)).select('a,'b,'c,'s)
     * }}}
     */
-  def crossApply(udtf: Expression): Table = {
-    applyInternal(udtf, JoinType.INNER)
+  def join(udtf: Expression): Table = {
+    joinUdtfInternal(udtf, JoinType.INNER)
   }
 
   /**
-    * The Cross Apply operator returns rows from the outer table (table on the left of the
-    * operator) that produces matching values from the table-valued function (which is defined in
-    * the expression on the right side of the operator).
-    *
-    * The Cross Apply is equivalent to Inner Join, but it works with a table-valued function.
+    * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
+    * to an SQL cross join, but it works with a table function. It returns rows from the outer
+    * table (table on the left of the operator) that produces matching values from the table
+    * function (which is defined in the expression on the right side of the operator).
     *
     * Example:
     *
@@ -653,20 +651,19 @@ class Table(
     *   TableFunction<String> split = new MySplitUDTF();
     *   tableEnv.registerFunction("split", split);
     *
-    *   table.crossApply("split(c) as (s)").select("a, b, c, s");
+    *   table.join("split(c) as (s)").select("a, b, c, s");
     * }}}
     */
-  def crossApply(udtf: String): Table = {
-    applyInternal(udtf, JoinType.INNER)
+  def join(udtf: String): Table = {
+    joinUdtfInternal(udtf, JoinType.INNER)
   }
 
   /**
-    * The Outer Apply operator returns all the rows from the outer table (table on the left of the
-    * Apply operator), and rows that do not match the condition from the table-valued function
-    * (which is defined in the expression on the right side of the operator).
-    * Rows with no matching condition are filled with null values.
-    *
-    * The Outer Apply is equivalent to Left Outer Join, but it works with a table-valued function.
+    * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
+    * to an SQL left outer join with ON TRUE, but it works with a table function. It returns all
+    * the rows from the outer table (table on the left of the operator), and rows that do not match
+    * the condition from the table function (which is defined in the expression on the right
+    * side of the operator). Rows with no matching condition are filled with null values.
     *
     * Example:
     *
@@ -678,20 +675,19 @@ class Table(
     *   }
     *
     *   val split = new MySplitUDTF()
-    *   table.outerApply(split('c) as ('s)).select('a,'b,'c,'s)
+    *   table.leftOuterJoin(split('c) as ('s)).select('a,'b,'c,'s)
     * }}}
     */
-  def outerApply(udtf: Expression): Table = {
-    applyInternal(udtf, JoinType.LEFT_OUTER)
+  def leftOuterJoin(udtf: Expression): Table = {
+    joinUdtfInternal(udtf, JoinType.LEFT_OUTER)
   }
 
   /**
-    * The Outer Apply operator returns all the rows from the outer table (table on the left of the
-    * Apply operator), and rows that do not match the condition from the table-valued function
-    * (which is defined in the expression on the right side of the operator).
-    * Rows with no matching condition are filled with null values.
-    *
-    * The Outer Apply is equivalent to Left Outer Join, but it works with a table-valued function.
+    * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
+    * to an SQL left outer join with ON TRUE, but it works with a table function. It returns all
+    * the rows from the outer table (table on the left of the operator), and rows that do not match
+    * the condition from the table function (which is defined in the expression on the right
+    * side of the operator). Rows with no matching condition are filled with null values.
     *
     * Example:
     *
@@ -705,19 +701,19 @@ class Table(
     *   TableFunction<String> split = new MySplitUDTF();
     *   tableEnv.registerFunction("split", split);
     *
-    *   table.outerApply("split(c) as (s)").select("a, b, c, s");
+    *   table.leftOuterJoin("split(c) as (s)").select("a, b, c, s");
     * }}}
     */
-  def outerApply(udtf: String): Table = {
-    applyInternal(udtf, JoinType.LEFT_OUTER)
+  def leftOuterJoin(udtf: String): Table = {
+    joinUdtfInternal(udtf, JoinType.LEFT_OUTER)
   }
 
-  private def applyInternal(udtfString: String, joinType: JoinType): Table = {
+  private def joinUdtfInternal(udtfString: String, joinType: JoinType): Table = {
     val udtf = ExpressionParser.parseExpression(udtfString)
-    applyInternal(udtf, joinType)
+    joinUdtfInternal(udtf, joinType)
   }
 
-  private def applyInternal(udtf: Expression, joinType: JoinType): Table = {
+  private def joinUdtfInternal(udtf: Expression, joinType: JoinType): Table = {
     var alias: Option[Seq[String]] = None
 
     // unwrap an Expression until we get a TableFunctionCall

http://git-wip-us.apache.org/repos/asf/flink/blob/da4af125/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UserDefinedTableFunctionTest.scala
index 1c505ba..245f117 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UserDefinedTableFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UserDefinedTableFunctionTest.scala
@@ -28,7 +28,7 @@ import org.junit.Test
 class UserDefinedTableFunctionTest extends TableTestBase {
 
   @Test
-  def testCrossApply(): Unit = {
+  def testCrossJoin(): Unit = {
     val util = batchTestUtil()
     val func1 = new TableFunc1
     util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
@@ -74,7 +74,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
   }
 
   @Test
-  def testOuterApply(): Unit = {
+  def testLeftOuterJoin(): Unit = {
     val util = batchTestUtil()
     val func1 = new TableFunc1
     util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)

http://git-wip-us.apache.org/repos/asf/flink/blob/da4af125/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala
index a9f3f7b..7e170d4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala
@@ -50,70 +50,70 @@ class UserDefinedTableFunctionTest extends TableTestBase {
     val in2 = javaTableEnv.fromDataSet(jDs).as("a, b, c")
     javaTableEnv.registerTable("MyTable", in2)
 
-    // test cross apply
+    // test cross join
     val func1 = new TableFunc1
     javaTableEnv.registerFunction("func1", func1)
-    var scalaTable = in1.crossApply(func1('c) as 's).select('c, 's)
-    var javaTable = in2.crossApply("func1(c).as(s)").select("c, s")
+    var scalaTable = in1.join(func1('c) as 's).select('c, 's)
+    var javaTable = in2.join("func1(c).as(s)").select("c, s")
     verifyTableEquals(scalaTable, javaTable)
 
-    // test outer apply
-    scalaTable = in1.outerApply(func1('c) as 's).select('c, 's)
-    javaTable = in2.outerApply("as(func1(c), s)").select("c, s")
+    // test left outer join
+    scalaTable = in1.leftOuterJoin(func1('c) as 's).select('c, 's)
+    javaTable = in2.leftOuterJoin("as(func1(c), s)").select("c, s")
     verifyTableEquals(scalaTable, javaTable)
 
     // test overloading
-    scalaTable = in1.crossApply(func1('c, "$") as 's).select('c, 's)
-    javaTable = in2.crossApply("func1(c, '$') as (s)").select("c, s")
+    scalaTable = in1.join(func1('c, "$") as 's).select('c, 's)
+    javaTable = in2.join("func1(c, '$') as (s)").select("c, s")
     verifyTableEquals(scalaTable, javaTable)
 
     // test custom result type
     val func2 = new TableFunc2
     javaTableEnv.registerFunction("func2", func2)
-    scalaTable = in1.crossApply(func2('c) as ('name, 'len)).select('c, 'name, 'len)
-    javaTable = in2.crossApply("func2(c).as(name, len)").select("c, name, len")
+    scalaTable = in1.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
+    javaTable = in2.join("func2(c).as(name, len)").select("c, name, len")
     verifyTableEquals(scalaTable, javaTable)
 
     // test hierarchy generic type
     val hierarchy = new HierarchyTableFunction
     javaTableEnv.registerFunction("hierarchy", hierarchy)
-    scalaTable = in1.crossApply(hierarchy('c) as ('name, 'adult, 'len))
+    scalaTable = in1.join(hierarchy('c) as ('name, 'adult, 'len))
       .select('c, 'name, 'len, 'adult)
-    javaTable = in2.crossApply("AS(hierarchy(c), name, adult, len)")
+    javaTable = in2.join("AS(hierarchy(c), name, adult, len)")
       .select("c, name, len, adult")
     verifyTableEquals(scalaTable, javaTable)
 
     // test pojo type
     val pojo = new PojoTableFunc
     javaTableEnv.registerFunction("pojo", pojo)
-    scalaTable = in1.crossApply(pojo('c))
+    scalaTable = in1.join(pojo('c))
       .select('c, 'name, 'age)
-    javaTable = in2.crossApply("pojo(c)")
+    javaTable = in2.join("pojo(c)")
       .select("c, name, age")
     verifyTableEquals(scalaTable, javaTable)
 
     // test with filter
-    scalaTable = in1.crossApply(func2('c) as ('name, 'len))
+    scalaTable = in1.join(func2('c) as ('name, 'len))
       .select('c, 'name, 'len).filter('len > 2)
-    javaTable = in2.crossApply("func2(c) as (name, len)")
+    javaTable = in2.join("func2(c) as (name, len)")
       .select("c, name, len").filter("len > 2")
     verifyTableEquals(scalaTable, javaTable)
 
     // test with scalar function
-    scalaTable = in1.crossApply(func1('c.substring(2)) as 's)
+    scalaTable = in1.join(func1('c.substring(2)) as 's)
       .select('a, 'c, 's)
-    javaTable = in2.crossApply("func1(substring(c, 2)) as (s)")
+    javaTable = in2.join("func1(substring(c, 2)) as (s)")
       .select("a, c, s")
     verifyTableEquals(scalaTable, javaTable)
   }
 
   @Test
-  def testCrossApply(): Unit = {
+  def testCrossJoin(): Unit = {
     val util = batchTestUtil()
     val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = util.addFunction("func1", new TableFunc1)
 
-    val result1 = table.crossApply(function('c) as 's).select('c, 's)
+    val result1 = table.join(function('c) as 's).select('c, 's)
 
     val expected1 = unaryNode(
       "DataSetCalc",
@@ -133,7 +133,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
 
     // test overloading
 
-    val result2 = table.crossApply(function('c, "$") as 's).select('c, 's)
+    val result2 = table.join(function('c, "$") as 's).select('c, 's)
 
     val expected2 = unaryNode(
       "DataSetCalc",
@@ -153,12 +153,12 @@ class UserDefinedTableFunctionTest extends TableTestBase {
   }
 
   @Test
-  def testOuterApply(): Unit = {
+  def testLeftOuterJoin(): Unit = {
     val util = batchTestUtil()
     val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = util.addFunction("func1", new TableFunc1)
 
-    val result = table.outerApply(function('c) as 's).select('c, 's)
+    val result = table.leftOuterJoin(function('c) as 's).select('c, 's)
 
     val expected = unaryNode(
       "DataSetCalc",

http://git-wip-us.apache.org/repos/asf/flink/blob/da4af125/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
index c2ded28..21629e4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
@@ -28,7 +28,7 @@ import org.junit.Test
 class UserDefinedTableFunctionTest extends TableTestBase {
 
   @Test
-  def testCrossApply(): Unit = {
+  def testCrossJoin(): Unit = {
     val util = streamTestUtil()
     val func1 = new TableFunc1
     util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
@@ -74,7 +74,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
   }
 
   @Test
-  def testOuterApply(): Unit = {
+  def testLeftOuterJoin(): Unit = {
     val util = streamTestUtil()
     val func1 = new TableFunc1
     util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)

http://git-wip-us.apache.org/repos/asf/flink/blob/da4af125/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala
index bc28d67..b45ae8e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala
@@ -53,59 +53,59 @@ class UserDefinedTableFunctionTest extends TableTestBase {
     val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
     val in2 = javaTableEnv.fromDataStream(jDs).as("a, b, c")
 
-    // test cross apply
+    // test cross join
     val func1 = new TableFunc1
     javaTableEnv.registerFunction("func1", func1)
-    var scalaTable = in1.crossApply(func1('c) as 's).select('c, 's)
-    var javaTable = in2.crossApply("func1(c).as(s)").select("c, s")
+    var scalaTable = in1.join(func1('c) as 's).select('c, 's)
+    var javaTable = in2.join("func1(c).as(s)").select("c, s")
     verifyTableEquals(scalaTable, javaTable)
 
-    // test outer apply
-    scalaTable = in1.outerApply(func1('c) as 's).select('c, 's)
-    javaTable = in2.outerApply("as(func1(c), s)").select("c, s")
+    // test left outer join
+    scalaTable = in1.leftOuterJoin(func1('c) as 's).select('c, 's)
+    javaTable = in2.leftOuterJoin("as(func1(c), s)").select("c, s")
     verifyTableEquals(scalaTable, javaTable)
 
     // test overloading
-    scalaTable = in1.crossApply(func1('c, "$") as 's).select('c, 's)
-    javaTable = in2.crossApply("func1(c, '$') as (s)").select("c, s")
+    scalaTable = in1.join(func1('c, "$") as 's).select('c, 's)
+    javaTable = in2.join("func1(c, '$') as (s)").select("c, s")
     verifyTableEquals(scalaTable, javaTable)
 
     // test custom result type
     val func2 = new TableFunc2
     javaTableEnv.registerFunction("func2", func2)
-    scalaTable = in1.crossApply(func2('c) as ('name, 'len)).select('c, 'name, 'len)
-    javaTable = in2.crossApply("func2(c).as(name, len)").select("c, name, len")
+    scalaTable = in1.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
+    javaTable = in2.join("func2(c).as(name, len)").select("c, name, len")
     verifyTableEquals(scalaTable, javaTable)
 
     // test hierarchy generic type
     val hierarchy = new HierarchyTableFunction
     javaTableEnv.registerFunction("hierarchy", hierarchy)
-    scalaTable = in1.crossApply(hierarchy('c) as ('name, 'adult, 'len))
+    scalaTable = in1.join(hierarchy('c) as ('name, 'adult, 'len))
       .select('c, 'name, 'len, 'adult)
-    javaTable = in2.crossApply("AS(hierarchy(c), name, adult, len)")
+    javaTable = in2.join("AS(hierarchy(c), name, adult, len)")
       .select("c, name, len, adult")
     verifyTableEquals(scalaTable, javaTable)
 
     // test pojo type
     val pojo = new PojoTableFunc
     javaTableEnv.registerFunction("pojo", pojo)
-    scalaTable = in1.crossApply(pojo('c))
+    scalaTable = in1.join(pojo('c))
       .select('c, 'name, 'age)
-    javaTable = in2.crossApply("pojo(c)")
+    javaTable = in2.join("pojo(c)")
       .select("c, name, age")
     verifyTableEquals(scalaTable, javaTable)
 
     // test with filter
-    scalaTable = in1.crossApply(func2('c) as ('name, 'len))
+    scalaTable = in1.join(func2('c) as ('name, 'len))
       .select('c, 'name, 'len).filter('len > 2)
-    javaTable = in2.crossApply("func2(c) as (name, len)")
+    javaTable = in2.join("func2(c) as (name, len)")
       .select("c, name, len").filter("len > 2")
     verifyTableEquals(scalaTable, javaTable)
 
     // test with scalar function
-    scalaTable = in1.crossApply(func1('c.substring(2)) as 's)
+    scalaTable = in1.join(func1('c.substring(2)) as 's)
       .select('a, 'c, 's)
-    javaTable = in2.crossApply("func1(substring(c, 2)) as (s)")
+    javaTable = in2.join("func1(substring(c, 2)) as (s)")
       .select("a, c, s")
     verifyTableEquals(scalaTable, javaTable)
 
@@ -115,7 +115,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
     expectExceptionThrown(
       javaTableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
     expectExceptionThrown(
-      in1.crossApply(ObjectTableFunction('a, 1)),"Scala object")
+      in1.join(ObjectTableFunction('a, 1)), "Scala object")
 
   }
 
@@ -132,12 +132,12 @@ class UserDefinedTableFunctionTest extends TableTestBase {
     // Java table environment register
     expectExceptionThrown(tEnv.registerFunction("udtf", ObjectTableFunction), "Scala object")
     // Scala Table API directly call
-    expectExceptionThrown(t.crossApply(ObjectTableFunction('a, 1)), "Scala object")
+    expectExceptionThrown(t.join(ObjectTableFunction('a, 1)), "Scala object")
 
 
     //============ throw exception when table function is not registered =========
     // Java Table API call
-    expectExceptionThrown(t.crossApply("nonexist(a)"), "Undefined function: NONEXIST")
+    expectExceptionThrown(t.join("nonexist(a)"), "Undefined function: NONEXIST")
     // SQL API call
     expectExceptionThrown(
       util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(nonexist(a))"),
@@ -148,7 +148,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
     util.addFunction("func0", Func0)
     // Java Table API call
     expectExceptionThrown(
-      t.crossApply("func0(a)"),
+      t.join("func0(a)"),
       "only accept expressions that define table functions",
       classOf[TableException])
     // SQL API call
@@ -162,7 +162,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
     // Java Table API call
     util.addFunction("func2", new TableFunc2)
     expectExceptionThrown(
-      t.crossApply("func2(c, c)"),
+      t.join("func2(c, c)"),
       "Given parameters of function 'FUNC2' do not match any signature")
     // SQL API call
     expectExceptionThrown(
@@ -171,12 +171,12 @@ class UserDefinedTableFunctionTest extends TableTestBase {
   }
 
   @Test
-  def testCrossApply(): Unit = {
+  def testCrossJoin(): Unit = {
     val util = streamTestUtil()
     val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = util.addFunction("func1", new TableFunc1)
 
-    val result1 = table.crossApply(function('c) as 's).select('c, 's)
+    val result1 = table.join(function('c) as 's).select('c, 's)
 
     val expected1 = unaryNode(
       "DataStreamCalc",
@@ -196,7 +196,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
 
     // test overloading
 
-    val result2 = table.crossApply(function('c, "$") as 's).select('c, 's)
+    val result2 = table.join(function('c, "$") as 's).select('c, 's)
 
     val expected2 = unaryNode(
       "DataStreamCalc",
@@ -216,12 +216,12 @@ class UserDefinedTableFunctionTest extends TableTestBase {
   }
 
   @Test
-  def testOuterApply(): Unit = {
+  def testLeftOuterJoin(): Unit = {
     val util = streamTestUtil()
     val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = util.addFunction("func1", new TableFunc1)
 
-    val result = table.outerApply(function('c) as 's).select('c, 's)
+    val result = table.leftOuterJoin(function('c) as 's).select('c, 's)
 
     val expected = unaryNode(
       "DataStreamCalc",
@@ -246,7 +246,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
     val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = util.addFunction("func2", new TableFunc2)
 
-    val result = table.crossApply(function('c) as ('name, 'len)).select('c, 'name, 'len)
+    val result = table.join(function('c) as ('name, 'len)).select('c, 'name, 'len)
 
     val expected = unaryNode(
       "DataStreamCalc",
@@ -272,7 +272,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
     val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = util.addFunction("hierarchy", new HierarchyTableFunction)
 
-    val result = table.crossApply(function('c) as ('name, 'adult, 'len))
+    val result = table.join(function('c) as ('name, 'adult, 'len))
 
     val expected = unaryNode(
       "DataStreamCorrelate",
@@ -294,7 +294,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
     val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = util.addFunction("pojo", new PojoTableFunc)
 
-    val result = table.crossApply(function('c))
+    val result = table.join(function('c))
 
     val expected = unaryNode(
       "DataStreamCorrelate",
@@ -317,7 +317,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
     val function = util.addFunction("func2", new TableFunc2)
 
     val result = table
-      .crossApply(function('c) as ('name, 'len))
+      .join(function('c) as ('name, 'len))
       .select('c, 'name, 'len)
       .filter('len > 2)
 
@@ -346,7 +346,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
     val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = util.addFunction("func1", new TableFunc1)
 
-    val result = table.crossApply(function('c.substring(2)) as 's)
+    val result = table.join(function('c.substring(2)) as 's)
 
     val expected = unaryNode(
         "DataStreamCorrelate",

http://git-wip-us.apache.org/repos/asf/flink/blob/da4af125/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala
index cc551f9..32559f1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala
@@ -40,20 +40,20 @@ class DataSetCorrelateITCase(
   extends TableProgramsTestBase(mode, configMode) {
 
   @Test
-  def testCrossApply(): Unit = {
+  def testCrossJoin(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tableEnv = TableEnvironment.getTableEnvironment(env, config)
     val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
 
     val func1 = new TableFunc1
-    val result = in.crossApply(func1('c) as 's).select('c, 's).toDataSet[Row]
+    val result = in.join(func1('c) as 's).select('c, 's).toDataSet[Row]
     val results = result.collect()
     val expected = "Jack#22,Jack\n" + "Jack#22,22\n" + "John#19,John\n" + "John#19,19\n" +
       "Anna#44,Anna\n" + "Anna#44,44\n"
     TestBaseUtils.compareResultAsText(results.asJava, expected)
 
     // with overloading
-    val result2 = in.crossApply(func1('c, "$") as 's).select('c, 's).toDataSet[Row]
+    val result2 = in.join(func1('c, "$") as 's).select('c, 's).toDataSet[Row]
     val results2 = result2.collect()
     val expected2 = "Jack#22,$Jack\n" + "Jack#22,$22\n" + "John#19,$John\n" +
       "John#19,$19\n" + "Anna#44,$Anna\n" + "Anna#44,$44\n"
@@ -61,13 +61,13 @@ class DataSetCorrelateITCase(
   }
 
   @Test
-  def testOuterApply(): Unit = {
+  def testLeftOuterJoin(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tableEnv = TableEnvironment.getTableEnvironment(env, config)
     val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
 
     val func2 = new TableFunc2
-    val result = in.outerApply(func2('c) as ('s, 'l)).select('c, 's, 'l).toDataSet[Row]
+    val result = in.leftOuterJoin(func2('c) as ('s, 'l)).select('c, 's, 'l).toDataSet[Row]
     val results = result.collect()
     val expected = "Jack#22,Jack,4\n" + "Jack#22,22,2\n" + "John#19,John,4\n" +
       "John#19,19,2\n" + "Anna#44,Anna,4\n" + "Anna#44,44,2\n" + "nosharp,null,null"
@@ -82,7 +82,7 @@ class DataSetCorrelateITCase(
     val func0 = new TableFunc0
 
     val result = in
-      .crossApply(func0('c) as ('name, 'age))
+      .join(func0('c) as ('name, 'age))
       .select('c, 'name, 'age)
       .filter('age > 20)
       .toDataSet[Row]
@@ -100,7 +100,7 @@ class DataSetCorrelateITCase(
     val func2 = new TableFunc2
 
     val result = in
-      .crossApply(func2('c) as ('name, 'len))
+      .join(func2('c) as ('name, 'len))
       .select('c, 'name, 'len)
       .toDataSet[Row]
 
@@ -118,7 +118,7 @@ class DataSetCorrelateITCase(
 
     val hierarchy = new HierarchyTableFunction
     val result = in
-      .crossApply(hierarchy('c) as ('name, 'adult, 'len))
+      .join(hierarchy('c) as ('name, 'adult, 'len))
       .select('c, 'name, 'adult, 'len)
       .toDataSet[Row]
 
@@ -136,7 +136,7 @@ class DataSetCorrelateITCase(
 
     val pojo = new PojoTableFunc()
     val result = in
-      .crossApply(pojo('c))
+      .join(pojo('c))
       .select('c, 'name, 'age)
       .toDataSet[Row]
 
@@ -153,7 +153,7 @@ class DataSetCorrelateITCase(
     val func1 = new TableFunc1
 
     val result = in
-      .crossApply(func1('c.substring(2)) as 's)
+      .join(func1('c.substring(2)) as 's)
       .select('c, 's)
       .toDataSet[Row]
 

http://git-wip-us.apache.org/repos/asf/flink/blob/da4af125/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala
index c2c523a..70b0359 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala
@@ -32,7 +32,7 @@ import scala.collection.mutable
 class DataStreamCorrelateITCase extends StreamingMultipleProgramsTestBase {
 
   @Test
-  def testCrossApply(): Unit = {
+  def testCrossJoin(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.clear
@@ -41,7 +41,7 @@ class DataStreamCorrelateITCase extends StreamingMultipleProgramsTestBase {
     val func0 = new TableFunc0
 
     val result = t
-      .crossApply(func0('c) as('d, 'e))
+      .join(func0('c) as('d, 'e))
       .select('c, 'd, 'e)
       .toDataStream[Row]
 
@@ -53,7 +53,7 @@ class DataStreamCorrelateITCase extends StreamingMultipleProgramsTestBase {
   }
 
   @Test
-  def testOuterApply(): Unit = {
+  def testLeftOuterJoin(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.clear
@@ -62,7 +62,7 @@ class DataStreamCorrelateITCase extends StreamingMultipleProgramsTestBase {
     val func0 = new TableFunc0
 
     val result = t
-      .outerApply(func0('c) as('d, 'e))
+      .leftOuterJoin(func0('c) as('d, 'e))
       .select('c, 'd, 'e)
       .toDataStream[Row]