You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/06/12 15:33:07 UTC
[2/4] flink git commit: [FLINK-2207] Fix TableAPI conversion
documenation and further renamings for consistency.
[FLINK-2207] Fix TableAPI conversion documenation and further renamings for consistency.
This closes #829
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af0fee51
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af0fee51
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af0fee51
Branch: refs/heads/release-0.9
Commit: af0fee512bde4a5dc5c08a3cc17da788a06cd113
Parents: e513be7
Author: Fabian Hueske <fh...@apache.org>
Authored: Fri Jun 12 11:36:03 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Jun 12 14:27:03 2015 +0200
----------------------------------------------------------------------
docs/libs/table.md | 8 ++++----
.../flink/api/scala/table/TableConversions.scala | 4 ++--
.../main/scala/org/apache/flink/api/table/Table.scala | 2 +-
.../apache/flink/examples/scala/PageRankTable.scala | 2 +-
.../flink/examples/scala/StreamingTableFilter.scala | 2 +-
.../flink/api/scala/table/test/FilterITCase.scala | 6 +++---
.../flink/api/scala/table/test/JoinITCase.scala | 14 +++++++-------
7 files changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/af0fee51/docs/libs/table.md
----------------------------------------------------------------------
diff --git a/docs/libs/table.md b/docs/libs/table.md
index bcd2cb1..829c9cf 100644
--- a/docs/libs/table.md
+++ b/docs/libs/table.md
@@ -52,7 +52,7 @@ import org.apache.flink.api.scala.table._
case class WC(word: String, count: Int)
val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
val expr = input.toTable
-val result = expr.groupBy('word).select('word, 'count.sum as 'count).toSet[WC]
+val result = expr.groupBy('word).select('word, 'count.sum as 'count).toDataSet[WC]
{% endhighlight %}
The expression DSL uses Scala symbols to refer to field names and we use code generation to
@@ -69,7 +69,7 @@ case class MyResult(a: String, d: Int)
val input1 = env.fromElements(...).toTable('a, 'b)
val input2 = env.fromElements(...).toTable('c, 'd)
-val joined = input1.join(input2).where("b = a && d > 42").select("a, d").toSet[MyResult]
+val joined = input1.join(input2).where("b = a && d > 42").select("a, d").toDataSet[MyResult]
{% endhighlight %}
Notice, how a DataSet can be converted to a Table by using `as` and specifying new
@@ -108,14 +108,14 @@ DataSet<WC> input = env.fromElements(
new WC("Ciao", 1),
new WC("Hello", 1));
-Table table = tableEnv.toTable(input);
+Table table = tableEnv.fromDataSet(input);
Table filtered = table
.groupBy("word")
.select("word.count as count, word")
.filter("count = 2");
-DataSet<WC> result = tableEnv.toSet(filtered, WC.class);
+DataSet<WC> result = tableEnv.toDataSet(filtered, WC.class);
{% endhighlight %}
When using Java, the embedded DSL for specifying expressions cannot be used. Only String expressions
http://git-wip-us.apache.org/repos/asf/flink/blob/af0fee51/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
index b9c0a5e..4f2172e 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
@@ -33,14 +33,14 @@ class TableConversions(table: Table) {
/**
* Converts the [[Table]] to a [[DataSet]].
*/
- def toSet[T: TypeInformation]: DataSet[T] = {
+ def toDataSet[T: TypeInformation]: DataSet[T] = {
new ScalaBatchTranslator().translate[T](table.operation)
}
/**
* Converts the [[Table]] to a [[DataStream]].
*/
- def toStream[T: TypeInformation]: DataStream[T] = {
+ def toDataStream[T: TypeInformation]: DataStream[T] = {
new ScalaStreamingTranslator().translate[T](table.operation)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/af0fee51/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
index 83d5239..fdb125b 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
@@ -39,7 +39,7 @@ import org.apache.flink.api.table.plan._
* val table = set.toTable('a, 'b)
* ...
* val table2 = ...
- * val set = table2.toSet[MyType]
+ * val set = table2.toDataSet[MyType]
* }}}
*/
case class Table(private[flink] val operation: PlanNode) {
http://git-wip-us.apache.org/repos/asf/flink/blob/af0fee51/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
index 7a26e0e..dda6265 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
@@ -101,7 +101,7 @@ object PageRankTable {
val newRanks = currentRanks.toTable
// distribute ranks to target pages
.join(adjacencyLists).where('pageId === 'sourceId)
- .select('rank, 'targetIds).toSet[RankOutput]
+ .select('rank, 'targetIds).toDataSet[RankOutput]
.flatMap {
(in, out: Collector[(Long, Double)]) =>
val targets = in.targetIds
http://git-wip-us.apache.org/repos/asf/flink/blob/af0fee51/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
index 4aa5653..63dddc9 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
@@ -42,7 +42,7 @@ object StreamingTableFilter {
val cars = genCarStream().toTable
.filter('carId === 0)
.select('carId, 'speed, 'distance + 1000 as 'distance, 'time % 5 as 'time)
- .toStream[CarEvent]
+ .toDataStream[CarEvent]
cars.print()
http://git-wip-us.apache.org/repos/asf/flink/blob/af0fee51/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
index bc51a7e..75cd728 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
@@ -61,7 +61,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
val filterDs = ds.filter( Literal(false) )
- filterDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ filterDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
env.execute()
expected = "\n"
}
@@ -76,7 +76,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
val filterDs = ds.filter( Literal(true) )
- filterDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ filterDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
env.execute()
expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
"how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
@@ -109,7 +109,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
val filterDs = ds.filter( 'a % 2 === 0 )
- filterDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ filterDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
env.execute()
expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
"Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
http://git-wip-us.apache.org/repos/asf/flink/blob/af0fee51/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
index b3baa56..8c3d1ca 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
@@ -57,7 +57,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
val joinDs = ds1.join(ds2).where('b === 'e).select('c, 'g)
- joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
env.execute()
expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
}
@@ -70,7 +70,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
val joinDs = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g)
- joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
env.execute()
expected = "Hi,Hallo\n"
}
@@ -83,7 +83,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
val joinDs = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
- joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
env.execute()
expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
"Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
@@ -97,7 +97,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
val joinDs = ds1.join(ds2).where('foo === 'e).select('c, 'g)
- joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
env.execute()
expected = ""
}
@@ -110,7 +110,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
val joinDs = ds1.join(ds2).where('a === 'g).select('c, 'g)
- joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
env.execute()
expected = ""
}
@@ -123,7 +123,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
val joinDs = ds1.join(ds2).where('a === 'd).select('c, 'g)
- joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
env.execute()
expected = ""
}
@@ -136,7 +136,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
val joinDs = ds1.join(ds2).where('a === 'd).select('g.count)
- joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
env.execute()
expected = "6"
}