You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/17 15:35:16 UTC
incubator-flink git commit: [FLINK-890] [docs] Adjust examples to
show reader methods and correct class-vs.-interface errors.
Repository: incubator-flink
Updated Branches:
refs/heads/master 42828f245 -> 7f8296ea3
[FLINK-890] [docs] Adjust examples to show reader methods and correct class-vs.-interface errors.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/7f8296ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/7f8296ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/7f8296ea
Branch: refs/heads/master
Commit: 7f8296ea3c39c41ff0e38f0c7e1c910485895fbb
Parents: 42828f2
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Nov 17 14:51:48 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 17 14:51:48 2014 +0100
----------------------------------------------------------------------
docs/examples.md | 62 +++++++++++++++++++++++++++++----------------------
1 file changed, 35 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7f8296ea/docs/examples.md
----------------------------------------------------------------------
diff --git a/docs/examples.md b/docs/examples.md
index 86f6fe0..7cae4bc 100644
--- a/docs/examples.md
+++ b/docs/examples.md
@@ -19,20 +19,21 @@ WordCount is the "Hello World" of Big Data processing systems. It computes the f
<div data-lang="java" markdown="1">
~~~java
-// get input data
-DataSet<String> text = getTextDataSet(env);
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+DataSet<String> text = env.readTextFile("/path/to/file");
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
- .aggregate(Aggregations.SUM, 1);
+ .sum(1);
counts.writeAsCsv(outputPath, "\n", " ");
// User-defined functions
-public static final class Tokenizer extends FlatMapFunction<String, Tuple2<String, Integer>> {
+public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
@@ -58,7 +59,7 @@ The {% gh_link /flink-examples/flink-java-examples/src/main/java/org/apache/flin
val env = ExecutionEnvironment.getExecutionEnvironment
// get input data
-val text = getTextDataSet(env)
+val text = env.readTextFile("/path/to/file")
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
@@ -84,8 +85,13 @@ In this simple example, PageRank is implemented with a [bulk iteration](java_api
<div data-lang="java" markdown="1">
~~~java
-// get input data
-DataSet<Tuple2<Long, Double>> pagesWithRanks = getPagesWithRanksDataSet(env);
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+// read the pages and initial ranks by parsing a CSV file
+DataSet<Tuple2<Long, Double>> pagesWithRanks = env.readCsvFile(pagesInputPath)
+ .types(Long.class, Double.class)
+
+// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
DataSet<Tuple2<Long, Long[]>> pageLinkLists = getLinksDataSet(env);
// set iterative data set
@@ -95,7 +101,7 @@ DataSet<Tuple2<Long, Double>> newRanks = iteration
// join pages with outgoing edges and distribute rank
.join(pageLinkLists).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch())
// collect and sum ranks
- .groupBy(0).aggregate(SUM, 1)
+ .groupBy(0).sum(1)
// apply dampening factor
.map(new Dampener(DAMPENING_FACTOR, numPages));
@@ -110,14 +116,14 @@ finalPageRanks.writeAsCsv(outputPath, "\n", " ");
// User-defined functions
public static final class JoinVertexWithEdgesMatch
- extends FlatMapFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>,
+ implements FlatJoinFunction<Tuple2<Long, Double>, Tuple2<Long, Long[]>,
Tuple2<Long, Double>> {
@Override
- public void flatMap(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>> value,
+ public void join(<Tuple2<Long, Double> page, Tuple2<Long, Long[]> adj,
Collector<Tuple2<Long, Double>> out) {
- Long[] neigbors = value.f1.f1;
- double rank = value.f0.f1;
+ Long[] neigbors = adj.f1;
+ double rank = page.f1;
double rankToDistribute = rank / ((double) neigbors.length);
for (int i = 0; i < neigbors.length; i++) {
@@ -126,7 +132,7 @@ public static final class JoinVertexWithEdgesMatch
}
}
-public static final class Dampener extends MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
+public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
private final double dampening, randomJump;
public Dampener(double dampening, double numVertices) {
@@ -142,7 +148,7 @@ public static final class Dampener extends MapFunction<Tuple2<Long,Double>, Tupl
}
public static final class EpsilonFilter
- extends FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
+ implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
@Override
public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) {
@@ -158,12 +164,19 @@ It requires the following parameters to run: `<pages input path>, <links input p
<div data-lang="scala" markdown="1">
~~~scala
+// User-defined types
+case class Link(sourceId: Long, targetId: Long)
+case class Page(pageId: Long, rank: Double)
+case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
+
// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
-// read input data
-val pages = getPagesDataSet(env)
-val links = getLinksDataSet(env)
+// read the pages and initial ranks by parsing a CSV file
+val pages = env.readCsvFile[Page](pagesInputPath)
+
+// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
+val links = env.readCsvFile[Link](linksInputPath)
// assign initial ranks to pages
val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages))
@@ -209,11 +222,6 @@ val result = finalRanks
// emit result
result.writeAsCsv(outputPath, "\n", " ")
-
-// User-defined types
-case class Link(sourceId: Long, targetId: Long)
-case class Page(pageId: Long, rank: Double)
-case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
~~~
he {% gh_link /flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala "PageRank program" %} implements the above example.
@@ -268,7 +276,7 @@ result.writeAsCsv(outputPath, "\n", " ");
// User-defined functions
-public static final class DuplicateValue<T> extends MapFunction<T, Tuple2<T, T>> {
+public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {
@Override
public Tuple2<T, T> map(T vertex) {
@@ -277,7 +285,7 @@ public static final class DuplicateValue<T> extends MapFunction<T, Tuple2<T, T>>
}
public static final class UndirectEdge
- extends FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();
@Override
@@ -290,7 +298,7 @@ public static final class UndirectEdge
}
public static final class NeighborWithComponentIDJoin
- extends JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
@Override
public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
@@ -299,7 +307,7 @@ public static final class NeighborWithComponentIDJoin
}
public static final class ComponentIdFilter
- extends FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>,
+ implements FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>,
Tuple2<Long, Long>> {
@Override
@@ -461,4 +469,4 @@ CC = gcc
~~~bash
./dbgen -T o -s 1
-~~~
\ No newline at end of file
+~~~