You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/06/12 14:24:27 UTC

flink git commit: [FLINK-2207] Fix TableAPI conversion documenation and further renamings for consistency.

Repository: flink
Updated Branches:
  refs/heads/master e45c5dc56 -> e4b569505


[FLINK-2207] Fix TableAPI conversion documenation and further renamings for consistency.

This closes #829


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e4b56950
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e4b56950
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e4b56950

Branch: refs/heads/master
Commit: e4b569505ba1c4169d39e24c9dc4ff0ca03bf992
Parents: e45c5dc
Author: Fabian Hueske <fh...@apache.org>
Authored: Fri Jun 12 11:36:03 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Jun 12 14:20:46 2015 +0200

----------------------------------------------------------------------
 docs/libs/table.md                                    |  8 ++++----
 .../flink/api/scala/table/TableConversions.scala      |  4 ++--
 .../main/scala/org/apache/flink/api/table/Table.scala |  2 +-
 .../apache/flink/examples/scala/PageRankTable.scala   |  2 +-
 .../flink/examples/scala/StreamingTableFilter.scala   |  2 +-
 .../flink/api/scala/table/test/FilterITCase.scala     |  6 +++---
 .../flink/api/scala/table/test/JoinITCase.scala       | 14 +++++++-------
 7 files changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e4b56950/docs/libs/table.md
----------------------------------------------------------------------
diff --git a/docs/libs/table.md b/docs/libs/table.md
index bcd2cb1..829c9cf 100644
--- a/docs/libs/table.md
+++ b/docs/libs/table.md
@@ -52,7 +52,7 @@ import org.apache.flink.api.scala.table._
 case class WC(word: String, count: Int)
 val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
 val expr = input.toTable
-val result = expr.groupBy('word).select('word, 'count.sum as 'count).toSet[WC]
+val result = expr.groupBy('word).select('word, 'count.sum as 'count).toDataSet[WC]
 {% endhighlight %}
 
 The expression DSL uses Scala symbols to refer to field names and we use code generation to
@@ -69,7 +69,7 @@ case class MyResult(a: String, d: Int)
 
 val input1 = env.fromElements(...).toTable('a, 'b)
 val input2 = env.fromElements(...).toTable('c, 'd)
-val joined = input1.join(input2).where("b = a && d > 42").select("a, d").toSet[MyResult]
+val joined = input1.join(input2).where("b = a && d > 42").select("a, d").toDataSet[MyResult]
 {% endhighlight %}
 
 Notice, how a DataSet can be converted to a Table by using `as` and specifying new
@@ -108,14 +108,14 @@ DataSet<WC> input = env.fromElements(
         new WC("Ciao", 1),
         new WC("Hello", 1));
 
-Table table = tableEnv.toTable(input);
+Table table = tableEnv.fromDataSet(input);
 
 Table filtered = table
         .groupBy("word")
         .select("word.count as count, word")
         .filter("count = 2");
 
-DataSet<WC> result = tableEnv.toSet(filtered, WC.class);
+DataSet<WC> result = tableEnv.toDataSet(filtered, WC.class);
 {% endhighlight %}
 
 When using Java, the embedded DSL for specifying expressions cannot be used. Only String expressions

http://git-wip-us.apache.org/repos/asf/flink/blob/e4b56950/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
index b9c0a5e..4f2172e 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
@@ -33,14 +33,14 @@ class TableConversions(table: Table) {
   /**
    * Converts the [[Table]] to a [[DataSet]].
    */
-  def toSet[T: TypeInformation]: DataSet[T] = {
+  def toDataSet[T: TypeInformation]: DataSet[T] = {
      new ScalaBatchTranslator().translate[T](table.operation)
   }
 
   /**
    * Converts the [[Table]] to a [[DataStream]].
    */
-  def toStream[T: TypeInformation]: DataStream[T] = {
+  def toDataStream[T: TypeInformation]: DataStream[T] = {
     new ScalaStreamingTranslator().translate[T](table.operation)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e4b56950/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
index 83d5239..fdb125b 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
@@ -39,7 +39,7 @@ import org.apache.flink.api.table.plan._
  *   val table = set.toTable('a, 'b)
  *   ...
  *   val table2 = ...
- *   val set = table2.toSet[MyType]
+ *   val set = table2.toDataSet[MyType]
  * }}}
  */
 case class Table(private[flink] val operation: PlanNode) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e4b56950/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
index 7a26e0e..dda6265 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
@@ -101,7 +101,7 @@ object PageRankTable {
         val newRanks = currentRanks.toTable
           // distribute ranks to target pages
           .join(adjacencyLists).where('pageId === 'sourceId)
-          .select('rank, 'targetIds).toSet[RankOutput]
+          .select('rank, 'targetIds).toDataSet[RankOutput]
           .flatMap {
             (in, out: Collector[(Long, Double)]) =>
               val targets = in.targetIds

http://git-wip-us.apache.org/repos/asf/flink/blob/e4b56950/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
index 4aa5653..63dddc9 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
@@ -42,7 +42,7 @@ object StreamingTableFilter {
     val cars = genCarStream().toTable
       .filter('carId === 0)
       .select('carId, 'speed, 'distance + 1000 as 'distance, 'time % 5 as 'time)
-      .toStream[CarEvent]
+      .toDataStream[CarEvent]
 
     cars.print()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e4b56950/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
index bc51a7e..75cd728 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
@@ -61,7 +61,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
 
     val filterDs = ds.filter( Literal(false) )
 
-    filterDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    filterDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
     expected = "\n"
   }
@@ -76,7 +76,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
 
     val filterDs = ds.filter( Literal(true) )
 
-    filterDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    filterDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
     expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
       "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
@@ -109,7 +109,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
 
     val filterDs = ds.filter( 'a % 2 === 0 )
 
-    filterDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    filterDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
     expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
       "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +

http://git-wip-us.apache.org/repos/asf/flink/blob/e4b56950/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
index b3baa56..8c3d1ca 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
@@ -57,7 +57,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
     val joinDs = ds1.join(ds2).where('b === 'e).select('c, 'g)
 
-    joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
     expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
   }
@@ -70,7 +70,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
     val joinDs = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g)
 
-    joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
     expected = "Hi,Hallo\n"
   }
@@ -83,7 +83,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
     val joinDs = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
 
-    joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
     expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
       "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
@@ -97,7 +97,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
     val joinDs = ds1.join(ds2).where('foo === 'e).select('c, 'g)
 
-    joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
     expected = ""
   }
@@ -110,7 +110,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
     val joinDs = ds1.join(ds2).where('a === 'g).select('c, 'g)
 
-    joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
     expected = ""
   }
@@ -123,7 +123,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
     val joinDs = ds1.join(ds2).where('a === 'd).select('c, 'g)
 
-    joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
     expected = ""
   }
@@ -136,7 +136,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
     val joinDs = ds1.join(ds2).where('a === 'd).select('g.count)
 
-    joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
     expected = "6"
   }