You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/17 15:35:16 UTC

incubator-flink git commit: [FLINK-890] [docs] Adjust examples to show reader methods and correct class-vs.-interface errors.

Repository: incubator-flink
Updated Branches:
  refs/heads/master 42828f245 -> 7f8296ea3


[FLINK-890] [docs] Adjust examples to show reader methods and correct class-vs.-interface errors.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/7f8296ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/7f8296ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/7f8296ea

Branch: refs/heads/master
Commit: 7f8296ea3c39c41ff0e38f0c7e1c910485895fbb
Parents: 42828f2
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Nov 17 14:51:48 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 17 14:51:48 2014 +0100

----------------------------------------------------------------------
 docs/examples.md | 62 +++++++++++++++++++++++++++++----------------------
 1 file changed, 35 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7f8296ea/docs/examples.md
----------------------------------------------------------------------
diff --git a/docs/examples.md b/docs/examples.md
index 86f6fe0..7cae4bc 100644
--- a/docs/examples.md
+++ b/docs/examples.md
@@ -19,20 +19,21 @@ WordCount is the "Hello World" of Big Data processing systems. It computes the f
 <div data-lang="java" markdown="1">
 
 ~~~java
-// get input data
-DataSet<String> text = getTextDataSet(env);
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+DataSet<String> text = env.readTextFile("/path/to/file"); 
 
 DataSet<Tuple2<String, Integer>> counts = 
         // split up the lines in pairs (2-tuples) containing: (word,1)
         text.flatMap(new Tokenizer())
         // group by the tuple field "0" and sum up tuple field "1"
         .groupBy(0)
-        .aggregate(Aggregations.SUM, 1);
+        .sum(1);
 
 counts.writeAsCsv(outputPath, "\n", " ");
 
 // User-defined functions
-public static final class Tokenizer extends FlatMapFunction<String, Tuple2<String, Integer>> {
+public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
 
     @Override
     public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
@@ -58,7 +59,7 @@ The {% gh_link /flink-examples/flink-java-examples/src/main/java/org/apache/flin
 val env = ExecutionEnvironment.getExecutionEnvironment
 
 // get input data
-val text = getTextDataSet(env)
+val text = env.readTextFile("/path/to/file")
 
 val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
   .map { (_, 1) }
@@ -84,8 +85,13 @@ In this simple example, PageRank is implemented with a [bulk iteration](java_api
 <div data-lang="java" markdown="1">
 
 ~~~java
-// get input data
-DataSet<Tuple2<Long, Double>> pagesWithRanks = getPagesWithRanksDataSet(env);
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+// read the pages and initial ranks by parsing a CSV file
+DataSet<Tuple2<Long, Double>> pagesWithRanks = env.readCsvFile(pagesInputPath)
+						   .types(Long.class, Double.class)
+
+// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
 DataSet<Tuple2<Long, Long[]>> pageLinkLists = getLinksDataSet(env);
 
 // set iterative data set
@@ -95,7 +101,7 @@ DataSet<Tuple2<Long, Double>> newRanks = iteration
         // join pages with outgoing edges and distribute rank
         .join(pageLinkLists).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch())
         // collect and sum ranks
-        .groupBy(0).aggregate(SUM, 1)
+        .groupBy(0).sum(1)
         // apply dampening factor
         .map(new Dampener(DAMPENING_FACTOR, numPages));
 
@@ -110,14 +116,14 @@ finalPageRanks.writeAsCsv(outputPath, "\n", " ");
 // User-defined functions
 
 public static final class JoinVertexWithEdgesMatch 
-                    extends FlatMapFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>, 
+                    implements FlatJoinFunction<Tuple2<Long, Double>, Tuple2<Long, Long[]>, 
                                             Tuple2<Long, Double>> {
 
     @Override
-    public void flatMap(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>> value, 
+    public void join(<Tuple2<Long, Double> page, Tuple2<Long, Long[]> adj, 
                         Collector<Tuple2<Long, Double>> out) {
-        Long[] neigbors = value.f1.f1;
-        double rank = value.f0.f1;
+        Long[] neigbors = adj.f1;
+        double rank = page.f1;
         double rankToDistribute = rank / ((double) neigbors.length);
             
         for (int i = 0; i < neigbors.length; i++) {
@@ -126,7 +132,7 @@ public static final class JoinVertexWithEdgesMatch
     }
 }
 
-public static final class Dampener extends MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
+public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
     private final double dampening, randomJump;
 
     public Dampener(double dampening, double numVertices) {
@@ -142,7 +148,7 @@ public static final class Dampener extends MapFunction<Tuple2<Long,Double>, Tupl
 }
 
 public static final class EpsilonFilter 
-                    extends FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
+                implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
 
     @Override
     public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) {
@@ -158,12 +164,19 @@ It requires the following parameters to run: `<pages input path>, <links input p
 <div data-lang="scala" markdown="1">
 
 ~~~scala
+// User-defined types
+case class Link(sourceId: Long, targetId: Long)
+case class Page(pageId: Long, rank: Double)
+case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
+
 // set up execution environment
 val env = ExecutionEnvironment.getExecutionEnvironment
 
-// read input data
-val pages = getPagesDataSet(env)
-val links = getLinksDataSet(env)
+// read the pages and initial ranks by parsing a CSV file
+val pages = env.readCsvFile[Page](pagesInputPath)
+
+// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
+val links = env.readCsvFile[Link](linksInputPath)
 
 // assign initial ranks to pages
 val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages))
@@ -209,11 +222,6 @@ val result = finalRanks
 
 // emit result
 result.writeAsCsv(outputPath, "\n", " ")
-
-// User-defined types
-case class Link(sourceId: Long, targetId: Long)
-case class Page(pageId: Long, rank: Double)
-case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
 ~~~
 
 he {% gh_link /flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala "PageRank program" %} implements the above example.
@@ -268,7 +276,7 @@ result.writeAsCsv(outputPath, "\n", " ");
 
 // User-defined functions
 
-public static final class DuplicateValue<T> extends MapFunction<T, Tuple2<T, T>> {
+public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {
     
     @Override
     public Tuple2<T, T> map(T vertex) {
@@ -277,7 +285,7 @@ public static final class DuplicateValue<T> extends MapFunction<T, Tuple2<T, T>>
 }
 
 public static final class UndirectEdge 
-                    extends FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+                    implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
     Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();
     
     @Override
@@ -290,7 +298,7 @@ public static final class UndirectEdge
 }
 
 public static final class NeighborWithComponentIDJoin 
-                    extends JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+                implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
     @Override
     public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
@@ -299,7 +307,7 @@ public static final class NeighborWithComponentIDJoin
 }
 
 public static final class ComponentIdFilter 
-                    extends FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, 
+                    implements FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, 
                                             Tuple2<Long, Long>> {
 
     @Override
@@ -461,4 +469,4 @@ CC       = gcc
 
 ~~~bash
 ./dbgen -T o -s 1
-~~~
\ No newline at end of file
+~~~