You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/13 13:34:19 UTC
[2/3] flink git commit: [FLINK-5304] [table] Rename
crossApply/outerApply to join/leftOuterJoin in Table API.
[FLINK-5304] [table] Rename crossApply/outerApply to join/leftOuterJoin in Table API.
This closes #2978.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da4af125
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da4af125
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da4af125
Branch: refs/heads/master
Commit: da4af1259ae750953ca2e7a3ecec342d9eb77bac
Parents: 270140a
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Fri Dec 9 10:31:15 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Dec 13 14:13:17 2016 +0100
----------------------------------------------------------------------
.../api/table/functions/TableFunction.scala | 4 +-
.../api/table/plan/nodes/FlinkCorrelate.scala | 8 +--
.../plan/nodes/dataset/DataSetCorrelate.scala | 2 +-
.../nodes/datastream/DataStreamCorrelate.scala | 2 +-
.../org/apache/flink/api/table/table.scala | 70 +++++++++-----------
.../sql/UserDefinedTableFunctionTest.scala | 4 +-
.../table/UserDefinedTableFunctionTest.scala | 46 ++++++-------
.../sql/UserDefinedTableFunctionTest.scala | 4 +-
.../table/UserDefinedTableFunctionTest.scala | 66 +++++++++---------
.../dataset/DataSetCorrelateITCase.scala | 20 +++---
.../datastream/DataStreamCorrelateITCase.scala | 8 +--
11 files changed, 115 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/da4af125/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
index 3a56efb..ca9aaf1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
@@ -64,11 +64,11 @@ import org.apache.flink.api.table.expressions.{Expression, TableFunctionCall}
*
* // for Scala users
* val split = new Split()
- * table.crossApply(split('c) as ('s)).select('a, 's)
+ * table.join(split('c) as ('s)).select('a, 's)
*
* // for Java users
* tEnv.registerFunction("split", new Split()) // register table function first
- * table.crossApply("split(a) as (s)").select("a, s")
+ * table.join("split(a) as (s)").select("a, s")
*
* // for SQL users
* tEnv.registerFunction("split", new Split()) // register table function first
http://git-wip-us.apache.org/repos/asf/flink/blob/da4af125/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
index 93a8f53..c058265 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
@@ -33,7 +33,7 @@ import org.apache.flink.api.table.{TableConfig, TableException}
import scala.collection.JavaConverters._
/**
- * cross/outer apply a user-defined table function
+ * Join a user-defined table function
*/
trait FlinkCorrelate {
@@ -63,7 +63,7 @@ trait FlinkCorrelate {
""".stripMargin
if (joinType == SemiJoinType.INNER) {
- // cross apply
+ // cross join
body +=
s"""
|if (!iter.hasNext()) {
@@ -71,9 +71,9 @@ trait FlinkCorrelate {
|}
""".stripMargin
} else if (joinType == SemiJoinType.LEFT) {
- // outer apply
+ // left outer join
- // in case of outer apply and the returned row of table function is empty,
+ // in case of left outer join and the returned row of table function is empty,
// fill all fields of row with null
val input2NullExprs = input2AccessExprs.map { x =>
GeneratedExpression(
http://git-wip-us.apache.org/repos/asf/flink/blob/da4af125/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
index 3cddf8b..95eb15b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
@@ -34,7 +34,7 @@ import org.apache.flink.api.table.plan.nodes.FlinkCorrelate
import org.apache.flink.api.table.typeutils.TypeConverter._
/**
- * Flink RelNode which matches along with cross apply a user defined table function.
+ * Flink RelNode which matches along with join a user defined table function.
*/
class DataSetCorrelate(
cluster: RelOptCluster,
http://git-wip-us.apache.org/repos/asf/flink/blob/da4af125/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala
index 028cb10..3bfa6e2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala
@@ -33,7 +33,7 @@ import org.apache.flink.api.table.typeutils.TypeConverter._
import org.apache.flink.streaming.api.datastream.DataStream
/**
- * Flink RelNode which matches along with cross apply a user defined table function.
+ * Flink RelNode which matches along with join a user defined table function.
*/
class DataStreamCorrelate(
cluster: RelOptCluster,
http://git-wip-us.apache.org/repos/asf/flink/blob/da4af125/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index b421c8e..b74ddb0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -611,11 +611,10 @@ class Table(
}
/**
- * The Cross Apply operator returns rows from the outer table (table on the left of the
- * operator) that produces matching values from the table-valued function (which is defined in
- * the expression on the right side of the operator).
- *
- * The Cross Apply is equivalent to Inner Join, but it works with a table-valued function.
+ * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
+ * to an SQL cross join, but it works with a table function. It returns rows from the outer
+ * table (table on the left of the operator) that produces matching values from the table
+ * function (which is defined in the expression on the right side of the operator).
*
* Example:
*
@@ -627,19 +626,18 @@ class Table(
* }
*
* val split = new MySplitUDTF()
- * table.crossApply(split('c) as ('s)).select('a,'b,'c,'s)
+ * table.join(split('c) as ('s)).select('a,'b,'c,'s)
* }}}
*/
- def crossApply(udtf: Expression): Table = {
- applyInternal(udtf, JoinType.INNER)
+ def join(udtf: Expression): Table = {
+ joinUdtfInternal(udtf, JoinType.INNER)
}
/**
- * The Cross Apply operator returns rows from the outer table (table on the left of the
- * operator) that produces matching values from the table-valued function (which is defined in
- * the expression on the right side of the operator).
- *
- * The Cross Apply is equivalent to Inner Join, but it works with a table-valued function.
+ * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
+ * to an SQL cross join, but it works with a table function. It returns rows from the outer
+ * table (table on the left of the operator) that produces matching values from the table
+ * function (which is defined in the expression on the right side of the operator).
*
* Example:
*
@@ -653,20 +651,19 @@ class Table(
* TableFunction<String> split = new MySplitUDTF();
* tableEnv.registerFunction("split", split);
*
- * table.crossApply("split(c) as (s)").select("a, b, c, s");
+ * table.join("split(c) as (s)").select("a, b, c, s");
* }}}
*/
- def crossApply(udtf: String): Table = {
- applyInternal(udtf, JoinType.INNER)
+ def join(udtf: String): Table = {
+ joinUdtfInternal(udtf, JoinType.INNER)
}
/**
- * The Outer Apply operator returns all the rows from the outer table (table on the left of the
- * Apply operator), and rows that do not match the condition from the table-valued function
- * (which is defined in the expression on the right side of the operator).
- * Rows with no matching condition are filled with null values.
- *
- * The Outer Apply is equivalent to Left Outer Join, but it works with a table-valued function.
+ * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
+ * to an SQL left outer join with ON TRUE, but it works with a table function. It returns all
+ * the rows from the outer table (table on the left of the operator), and rows that do not match
+ * the condition from the table function (which is defined in the expression on the right
+ * side of the operator). Rows with no matching condition are filled with null values.
*
* Example:
*
@@ -678,20 +675,19 @@ class Table(
* }
*
* val split = new MySplitUDTF()
- * table.outerApply(split('c) as ('s)).select('a,'b,'c,'s)
+ * table.leftOuterJoin(split('c) as ('s)).select('a,'b,'c,'s)
* }}}
*/
- def outerApply(udtf: Expression): Table = {
- applyInternal(udtf, JoinType.LEFT_OUTER)
+ def leftOuterJoin(udtf: Expression): Table = {
+ joinUdtfInternal(udtf, JoinType.LEFT_OUTER)
}
/**
- * The Outer Apply operator returns all the rows from the outer table (table on the left of the
- * Apply operator), and rows that do not match the condition from the table-valued function
- * (which is defined in the expression on the right side of the operator).
- * Rows with no matching condition are filled with null values.
- *
- * The Outer Apply is equivalent to Left Outer Join, but it works with a table-valued function.
+ * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
+ * to an SQL left outer join with ON TRUE, but it works with a table function. It returns all
+ * the rows from the outer table (table on the left of the operator), and rows that do not match
+ * the condition from the table function (which is defined in the expression on the right
+ * side of the operator). Rows with no matching condition are filled with null values.
*
* Example:
*
@@ -705,19 +701,19 @@ class Table(
* TableFunction<String> split = new MySplitUDTF();
* tableEnv.registerFunction("split", split);
*
- * table.outerApply("split(c) as (s)").select("a, b, c, s");
+ * table.leftOuterJoin("split(c) as (s)").select("a, b, c, s");
* }}}
*/
- def outerApply(udtf: String): Table = {
- applyInternal(udtf, JoinType.LEFT_OUTER)
+ def leftOuterJoin(udtf: String): Table = {
+ joinUdtfInternal(udtf, JoinType.LEFT_OUTER)
}
- private def applyInternal(udtfString: String, joinType: JoinType): Table = {
+ private def joinUdtfInternal(udtfString: String, joinType: JoinType): Table = {
val udtf = ExpressionParser.parseExpression(udtfString)
- applyInternal(udtf, joinType)
+ joinUdtfInternal(udtf, joinType)
}
- private def applyInternal(udtf: Expression, joinType: JoinType): Table = {
+ private def joinUdtfInternal(udtf: Expression, joinType: JoinType): Table = {
var alias: Option[Seq[String]] = None
// unwrap an Expression until we get a TableFunctionCall
http://git-wip-us.apache.org/repos/asf/flink/blob/da4af125/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UserDefinedTableFunctionTest.scala
index 1c505ba..245f117 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UserDefinedTableFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UserDefinedTableFunctionTest.scala
@@ -28,7 +28,7 @@ import org.junit.Test
class UserDefinedTableFunctionTest extends TableTestBase {
@Test
- def testCrossApply(): Unit = {
+ def testCrossJoin(): Unit = {
val util = batchTestUtil()
val func1 = new TableFunc1
util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
@@ -74,7 +74,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
}
@Test
- def testOuterApply(): Unit = {
+ def testLeftOuterJoin(): Unit = {
val util = batchTestUtil()
val func1 = new TableFunc1
util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
http://git-wip-us.apache.org/repos/asf/flink/blob/da4af125/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala
index a9f3f7b..7e170d4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala
@@ -50,70 +50,70 @@ class UserDefinedTableFunctionTest extends TableTestBase {
val in2 = javaTableEnv.fromDataSet(jDs).as("a, b, c")
javaTableEnv.registerTable("MyTable", in2)
- // test cross apply
+ // test cross join
val func1 = new TableFunc1
javaTableEnv.registerFunction("func1", func1)
- var scalaTable = in1.crossApply(func1('c) as 's).select('c, 's)
- var javaTable = in2.crossApply("func1(c).as(s)").select("c, s")
+ var scalaTable = in1.join(func1('c) as 's).select('c, 's)
+ var javaTable = in2.join("func1(c).as(s)").select("c, s")
verifyTableEquals(scalaTable, javaTable)
- // test outer apply
- scalaTable = in1.outerApply(func1('c) as 's).select('c, 's)
- javaTable = in2.outerApply("as(func1(c), s)").select("c, s")
+ // test left outer join
+ scalaTable = in1.leftOuterJoin(func1('c) as 's).select('c, 's)
+ javaTable = in2.leftOuterJoin("as(func1(c), s)").select("c, s")
verifyTableEquals(scalaTable, javaTable)
// test overloading
- scalaTable = in1.crossApply(func1('c, "$") as 's).select('c, 's)
- javaTable = in2.crossApply("func1(c, '$') as (s)").select("c, s")
+ scalaTable = in1.join(func1('c, "$") as 's).select('c, 's)
+ javaTable = in2.join("func1(c, '$') as (s)").select("c, s")
verifyTableEquals(scalaTable, javaTable)
// test custom result type
val func2 = new TableFunc2
javaTableEnv.registerFunction("func2", func2)
- scalaTable = in1.crossApply(func2('c) as ('name, 'len)).select('c, 'name, 'len)
- javaTable = in2.crossApply("func2(c).as(name, len)").select("c, name, len")
+ scalaTable = in1.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
+ javaTable = in2.join("func2(c).as(name, len)").select("c, name, len")
verifyTableEquals(scalaTable, javaTable)
// test hierarchy generic type
val hierarchy = new HierarchyTableFunction
javaTableEnv.registerFunction("hierarchy", hierarchy)
- scalaTable = in1.crossApply(hierarchy('c) as ('name, 'adult, 'len))
+ scalaTable = in1.join(hierarchy('c) as ('name, 'adult, 'len))
.select('c, 'name, 'len, 'adult)
- javaTable = in2.crossApply("AS(hierarchy(c), name, adult, len)")
+ javaTable = in2.join("AS(hierarchy(c), name, adult, len)")
.select("c, name, len, adult")
verifyTableEquals(scalaTable, javaTable)
// test pojo type
val pojo = new PojoTableFunc
javaTableEnv.registerFunction("pojo", pojo)
- scalaTable = in1.crossApply(pojo('c))
+ scalaTable = in1.join(pojo('c))
.select('c, 'name, 'age)
- javaTable = in2.crossApply("pojo(c)")
+ javaTable = in2.join("pojo(c)")
.select("c, name, age")
verifyTableEquals(scalaTable, javaTable)
// test with filter
- scalaTable = in1.crossApply(func2('c) as ('name, 'len))
+ scalaTable = in1.join(func2('c) as ('name, 'len))
.select('c, 'name, 'len).filter('len > 2)
- javaTable = in2.crossApply("func2(c) as (name, len)")
+ javaTable = in2.join("func2(c) as (name, len)")
.select("c, name, len").filter("len > 2")
verifyTableEquals(scalaTable, javaTable)
// test with scalar function
- scalaTable = in1.crossApply(func1('c.substring(2)) as 's)
+ scalaTable = in1.join(func1('c.substring(2)) as 's)
.select('a, 'c, 's)
- javaTable = in2.crossApply("func1(substring(c, 2)) as (s)")
+ javaTable = in2.join("func1(substring(c, 2)) as (s)")
.select("a, c, s")
verifyTableEquals(scalaTable, javaTable)
}
@Test
- def testCrossApply(): Unit = {
+ def testCrossJoin(): Unit = {
val util = batchTestUtil()
val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val function = util.addFunction("func1", new TableFunc1)
- val result1 = table.crossApply(function('c) as 's).select('c, 's)
+ val result1 = table.join(function('c) as 's).select('c, 's)
val expected1 = unaryNode(
"DataSetCalc",
@@ -133,7 +133,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
// test overloading
- val result2 = table.crossApply(function('c, "$") as 's).select('c, 's)
+ val result2 = table.join(function('c, "$") as 's).select('c, 's)
val expected2 = unaryNode(
"DataSetCalc",
@@ -153,12 +153,12 @@ class UserDefinedTableFunctionTest extends TableTestBase {
}
@Test
- def testOuterApply(): Unit = {
+ def testLeftOuterJoin(): Unit = {
val util = batchTestUtil()
val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val function = util.addFunction("func1", new TableFunc1)
- val result = table.outerApply(function('c) as 's).select('c, 's)
+ val result = table.leftOuterJoin(function('c) as 's).select('c, 's)
val expected = unaryNode(
"DataSetCalc",
http://git-wip-us.apache.org/repos/asf/flink/blob/da4af125/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
index c2ded28..21629e4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
@@ -28,7 +28,7 @@ import org.junit.Test
class UserDefinedTableFunctionTest extends TableTestBase {
@Test
- def testCrossApply(): Unit = {
+ def testCrossJoin(): Unit = {
val util = streamTestUtil()
val func1 = new TableFunc1
util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
@@ -74,7 +74,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
}
@Test
- def testOuterApply(): Unit = {
+ def testLeftOuterJoin(): Unit = {
val util = streamTestUtil()
val func1 = new TableFunc1
util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
http://git-wip-us.apache.org/repos/asf/flink/blob/da4af125/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala
index bc28d67..b45ae8e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala
@@ -53,59 +53,59 @@ class UserDefinedTableFunctionTest extends TableTestBase {
val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
val in2 = javaTableEnv.fromDataStream(jDs).as("a, b, c")
- // test cross apply
+ // test cross join
val func1 = new TableFunc1
javaTableEnv.registerFunction("func1", func1)
- var scalaTable = in1.crossApply(func1('c) as 's).select('c, 's)
- var javaTable = in2.crossApply("func1(c).as(s)").select("c, s")
+ var scalaTable = in1.join(func1('c) as 's).select('c, 's)
+ var javaTable = in2.join("func1(c).as(s)").select("c, s")
verifyTableEquals(scalaTable, javaTable)
- // test outer apply
- scalaTable = in1.outerApply(func1('c) as 's).select('c, 's)
- javaTable = in2.outerApply("as(func1(c), s)").select("c, s")
+ // test left outer join
+ scalaTable = in1.leftOuterJoin(func1('c) as 's).select('c, 's)
+ javaTable = in2.leftOuterJoin("as(func1(c), s)").select("c, s")
verifyTableEquals(scalaTable, javaTable)
// test overloading
- scalaTable = in1.crossApply(func1('c, "$") as 's).select('c, 's)
- javaTable = in2.crossApply("func1(c, '$') as (s)").select("c, s")
+ scalaTable = in1.join(func1('c, "$") as 's).select('c, 's)
+ javaTable = in2.join("func1(c, '$') as (s)").select("c, s")
verifyTableEquals(scalaTable, javaTable)
// test custom result type
val func2 = new TableFunc2
javaTableEnv.registerFunction("func2", func2)
- scalaTable = in1.crossApply(func2('c) as ('name, 'len)).select('c, 'name, 'len)
- javaTable = in2.crossApply("func2(c).as(name, len)").select("c, name, len")
+ scalaTable = in1.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
+ javaTable = in2.join("func2(c).as(name, len)").select("c, name, len")
verifyTableEquals(scalaTable, javaTable)
// test hierarchy generic type
val hierarchy = new HierarchyTableFunction
javaTableEnv.registerFunction("hierarchy", hierarchy)
- scalaTable = in1.crossApply(hierarchy('c) as ('name, 'adult, 'len))
+ scalaTable = in1.join(hierarchy('c) as ('name, 'adult, 'len))
.select('c, 'name, 'len, 'adult)
- javaTable = in2.crossApply("AS(hierarchy(c), name, adult, len)")
+ javaTable = in2.join("AS(hierarchy(c), name, adult, len)")
.select("c, name, len, adult")
verifyTableEquals(scalaTable, javaTable)
// test pojo type
val pojo = new PojoTableFunc
javaTableEnv.registerFunction("pojo", pojo)
- scalaTable = in1.crossApply(pojo('c))
+ scalaTable = in1.join(pojo('c))
.select('c, 'name, 'age)
- javaTable = in2.crossApply("pojo(c)")
+ javaTable = in2.join("pojo(c)")
.select("c, name, age")
verifyTableEquals(scalaTable, javaTable)
// test with filter
- scalaTable = in1.crossApply(func2('c) as ('name, 'len))
+ scalaTable = in1.join(func2('c) as ('name, 'len))
.select('c, 'name, 'len).filter('len > 2)
- javaTable = in2.crossApply("func2(c) as (name, len)")
+ javaTable = in2.join("func2(c) as (name, len)")
.select("c, name, len").filter("len > 2")
verifyTableEquals(scalaTable, javaTable)
// test with scalar function
- scalaTable = in1.crossApply(func1('c.substring(2)) as 's)
+ scalaTable = in1.join(func1('c.substring(2)) as 's)
.select('a, 'c, 's)
- javaTable = in2.crossApply("func1(substring(c, 2)) as (s)")
+ javaTable = in2.join("func1(substring(c, 2)) as (s)")
.select("a, c, s")
verifyTableEquals(scalaTable, javaTable)
@@ -115,7 +115,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
expectExceptionThrown(
javaTableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
expectExceptionThrown(
- in1.crossApply(ObjectTableFunction('a, 1)),"Scala object")
+ in1.join(ObjectTableFunction('a, 1)), "Scala object")
}
@@ -132,12 +132,12 @@ class UserDefinedTableFunctionTest extends TableTestBase {
// Java table environment register
expectExceptionThrown(tEnv.registerFunction("udtf", ObjectTableFunction), "Scala object")
// Scala Table API directly call
- expectExceptionThrown(t.crossApply(ObjectTableFunction('a, 1)), "Scala object")
+ expectExceptionThrown(t.join(ObjectTableFunction('a, 1)), "Scala object")
//============ throw exception when table function is not registered =========
// Java Table API call
- expectExceptionThrown(t.crossApply("nonexist(a)"), "Undefined function: NONEXIST")
+ expectExceptionThrown(t.join("nonexist(a)"), "Undefined function: NONEXIST")
// SQL API call
expectExceptionThrown(
util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(nonexist(a))"),
@@ -148,7 +148,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
util.addFunction("func0", Func0)
// Java Table API call
expectExceptionThrown(
- t.crossApply("func0(a)"),
+ t.join("func0(a)"),
"only accept expressions that define table functions",
classOf[TableException])
// SQL API call
@@ -162,7 +162,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
// Java Table API call
util.addFunction("func2", new TableFunc2)
expectExceptionThrown(
- t.crossApply("func2(c, c)"),
+ t.join("func2(c, c)"),
"Given parameters of function 'FUNC2' do not match any signature")
// SQL API call
expectExceptionThrown(
@@ -171,12 +171,12 @@ class UserDefinedTableFunctionTest extends TableTestBase {
}
@Test
- def testCrossApply(): Unit = {
+ def testCrossJoin(): Unit = {
val util = streamTestUtil()
val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val function = util.addFunction("func1", new TableFunc1)
- val result1 = table.crossApply(function('c) as 's).select('c, 's)
+ val result1 = table.join(function('c) as 's).select('c, 's)
val expected1 = unaryNode(
"DataStreamCalc",
@@ -196,7 +196,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
// test overloading
- val result2 = table.crossApply(function('c, "$") as 's).select('c, 's)
+ val result2 = table.join(function('c, "$") as 's).select('c, 's)
val expected2 = unaryNode(
"DataStreamCalc",
@@ -216,12 +216,12 @@ class UserDefinedTableFunctionTest extends TableTestBase {
}
@Test
- def testOuterApply(): Unit = {
+ def testLeftOuterJoin(): Unit = {
val util = streamTestUtil()
val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val function = util.addFunction("func1", new TableFunc1)
- val result = table.outerApply(function('c) as 's).select('c, 's)
+ val result = table.leftOuterJoin(function('c) as 's).select('c, 's)
val expected = unaryNode(
"DataStreamCalc",
@@ -246,7 +246,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val function = util.addFunction("func2", new TableFunc2)
- val result = table.crossApply(function('c) as ('name, 'len)).select('c, 'name, 'len)
+ val result = table.join(function('c) as ('name, 'len)).select('c, 'name, 'len)
val expected = unaryNode(
"DataStreamCalc",
@@ -272,7 +272,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val function = util.addFunction("hierarchy", new HierarchyTableFunction)
- val result = table.crossApply(function('c) as ('name, 'adult, 'len))
+ val result = table.join(function('c) as ('name, 'adult, 'len))
val expected = unaryNode(
"DataStreamCorrelate",
@@ -294,7 +294,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val function = util.addFunction("pojo", new PojoTableFunc)
- val result = table.crossApply(function('c))
+ val result = table.join(function('c))
val expected = unaryNode(
"DataStreamCorrelate",
@@ -317,7 +317,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
val function = util.addFunction("func2", new TableFunc2)
val result = table
- .crossApply(function('c) as ('name, 'len))
+ .join(function('c) as ('name, 'len))
.select('c, 'name, 'len)
.filter('len > 2)
@@ -346,7 +346,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val function = util.addFunction("func1", new TableFunc1)
- val result = table.crossApply(function('c.substring(2)) as 's)
+ val result = table.join(function('c.substring(2)) as 's)
val expected = unaryNode(
"DataStreamCorrelate",
http://git-wip-us.apache.org/repos/asf/flink/blob/da4af125/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala
index cc551f9..32559f1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala
@@ -40,20 +40,20 @@ class DataSetCorrelateITCase(
extends TableProgramsTestBase(mode, configMode) {
@Test
- def testCrossApply(): Unit = {
+ def testCrossJoin(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env, config)
val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
val func1 = new TableFunc1
- val result = in.crossApply(func1('c) as 's).select('c, 's).toDataSet[Row]
+ val result = in.join(func1('c) as 's).select('c, 's).toDataSet[Row]
val results = result.collect()
val expected = "Jack#22,Jack\n" + "Jack#22,22\n" + "John#19,John\n" + "John#19,19\n" +
"Anna#44,Anna\n" + "Anna#44,44\n"
TestBaseUtils.compareResultAsText(results.asJava, expected)
// with overloading
- val result2 = in.crossApply(func1('c, "$") as 's).select('c, 's).toDataSet[Row]
+ val result2 = in.join(func1('c, "$") as 's).select('c, 's).toDataSet[Row]
val results2 = result2.collect()
val expected2 = "Jack#22,$Jack\n" + "Jack#22,$22\n" + "John#19,$John\n" +
"John#19,$19\n" + "Anna#44,$Anna\n" + "Anna#44,$44\n"
@@ -61,13 +61,13 @@ class DataSetCorrelateITCase(
}
@Test
- def testOuterApply(): Unit = {
+ def testLeftOuterJoin(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env, config)
val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
val func2 = new TableFunc2
- val result = in.outerApply(func2('c) as ('s, 'l)).select('c, 's, 'l).toDataSet[Row]
+ val result = in.leftOuterJoin(func2('c) as ('s, 'l)).select('c, 's, 'l).toDataSet[Row]
val results = result.collect()
val expected = "Jack#22,Jack,4\n" + "Jack#22,22,2\n" + "John#19,John,4\n" +
"John#19,19,2\n" + "Anna#44,Anna,4\n" + "Anna#44,44,2\n" + "nosharp,null,null"
@@ -82,7 +82,7 @@ class DataSetCorrelateITCase(
val func0 = new TableFunc0
val result = in
- .crossApply(func0('c) as ('name, 'age))
+ .join(func0('c) as ('name, 'age))
.select('c, 'name, 'age)
.filter('age > 20)
.toDataSet[Row]
@@ -100,7 +100,7 @@ class DataSetCorrelateITCase(
val func2 = new TableFunc2
val result = in
- .crossApply(func2('c) as ('name, 'len))
+ .join(func2('c) as ('name, 'len))
.select('c, 'name, 'len)
.toDataSet[Row]
@@ -118,7 +118,7 @@ class DataSetCorrelateITCase(
val hierarchy = new HierarchyTableFunction
val result = in
- .crossApply(hierarchy('c) as ('name, 'adult, 'len))
+ .join(hierarchy('c) as ('name, 'adult, 'len))
.select('c, 'name, 'adult, 'len)
.toDataSet[Row]
@@ -136,7 +136,7 @@ class DataSetCorrelateITCase(
val pojo = new PojoTableFunc()
val result = in
- .crossApply(pojo('c))
+ .join(pojo('c))
.select('c, 'name, 'age)
.toDataSet[Row]
@@ -153,7 +153,7 @@ class DataSetCorrelateITCase(
val func1 = new TableFunc1
val result = in
- .crossApply(func1('c.substring(2)) as 's)
+ .join(func1('c.substring(2)) as 's)
.select('c, 's)
.toDataSet[Row]
http://git-wip-us.apache.org/repos/asf/flink/blob/da4af125/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala
index c2c523a..70b0359 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala
@@ -32,7 +32,7 @@ import scala.collection.mutable
class DataStreamCorrelateITCase extends StreamingMultipleProgramsTestBase {
@Test
- def testCrossApply(): Unit = {
+ def testCrossJoin(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.clear
@@ -41,7 +41,7 @@ class DataStreamCorrelateITCase extends StreamingMultipleProgramsTestBase {
val func0 = new TableFunc0
val result = t
- .crossApply(func0('c) as('d, 'e))
+ .join(func0('c) as('d, 'e))
.select('c, 'd, 'e)
.toDataStream[Row]
@@ -53,7 +53,7 @@ class DataStreamCorrelateITCase extends StreamingMultipleProgramsTestBase {
}
@Test
- def testOuterApply(): Unit = {
+ def testLeftOuterJoin(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.clear
@@ -62,7 +62,7 @@ class DataStreamCorrelateITCase extends StreamingMultipleProgramsTestBase {
val func0 = new TableFunc0
val result = t
- .outerApply(func0('c) as('d, 'e))
+ .leftOuterJoin(func0('c) as('d, 'e))
.select('c, 'd, 'e)
.toDataStream[Row]