You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/08/06 11:47:34 UTC
git commit: Reflect recent changes to Java API documentation
Repository: incubator-flink
Updated Branches:
refs/heads/master ecb829ed4 -> 71cccd6d8
Reflect recent changes to Java API documentation
change text to String
This closes #87.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/71cccd6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/71cccd6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/71cccd6d
Branch: refs/heads/master
Commit: 71cccd6d88fcefd824960d312153e2df523beb4d
Parents: ecb829e
Author: Kostas Tzoumas <Ko...@gmail.com>
Authored: Thu Jul 31 21:21:35 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Aug 6 11:46:45 2014 +0200
----------------------------------------------------------------------
docs/java_api_guide.md | 214 +++++++++++++++++++++++++++-------
docs/java_api_transformations.md | 29 ++++-
2 files changed, 202 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/71cccd6d/docs/java_api_guide.md
----------------------------------------------------------------------
diff --git a/docs/java_api_guide.md b/docs/java_api_guide.md
index f22b2b7..76dd332 100644
--- a/docs/java_api_guide.md
+++ b/docs/java_api_guide.md
@@ -31,14 +31,14 @@ public class WordCountExample {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<String> text = env.fromElements(
+ DataSet<String> text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?");
DataSet<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.groupBy(0)
- .aggregate(Aggregations.SUM, 1);
+ .sum(1);
wordCounts.print();
@@ -140,7 +140,7 @@ an execution environment for executing your program on a cluster.
For specifying data sources the execution environment has several methods
to read from files using various methods: you can just read them line by line,
as CSV files, or using completely custom data input formats. To just read
-a text file as a sequence of lines, you could use:
+a text file as a sequence of lines, you can use:
```java
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -156,7 +156,7 @@ Once you have a `DataSet` you can apply transformations to create a new
`DataSet` which you can then write to a file, transform again, or
combine with other `DataSet`s. You apply transformations by calling
methods on `DataSet` with your own custom transformation function. For example,
-map looks like this:
+a map transformation looks like this:
```java
DataSet<String> input = ...;
@@ -262,7 +262,7 @@ data.flatMap(new FlatMapFunction<String, String>() {
<tr>
<td><strong>Filter</strong></td>
<td>
- <p>Evaluates a boolean function for each element and retains those for which the function returns *true*.</p>
+ <p>Evaluates a boolean function for each element and retains those for which the function returns true.</p>
{% highlight java %}
data.filter(new FilterFunction<Integer>() {
public boolean filter(Integer value) { return value > 1000; }
@@ -307,45 +307,21 @@ data.reduceGroup(new GroupReduceFunction<Integer, Integer> {
<td>
<p>Aggregates a group of values into a single value. Aggregation functions can be thought of as built-in reduce functions. Aggregate may be applied on a full data set, or on a grouped data set.</p>
{% highlight java %}
-DataSet<Tuple3<Integer, String, Double>> input = // [...]
-DataSet<Tuple3<Integer, String, Double>> output = input.aggregate(SUM, 0).and(MIN, 2);
-{% endhighlight %}
- </td>
- </tr>
-
- <tr>
- <td><strong>ReduceGroup</strong></td>
- <td>
- <p>Combines a group of elements into one or more elements. ReduceGroup may be applied on a full data set, or on a grouped data set.</p>
-{% highlight java %}
-data.reduceGroup(new GroupReduceFunction<Integer, Integer> {
- public void reduceGroup(Iterable<Integer> values, Collector<Integer> out) {
- int prefixSum = 0;
- for (Integer i : values) {
- prefixSum += i;
- out.collect(prefixSum);
- }
- }
-});
-{% endhighlight %}
- </td>
- </tr>
-
- <tr>
- <td><strong>Aggregate</strong></td>
- <td>
- <p>Aggregates a group of values into a single value. Aggregation functions can be thought of as built-in reduce functions. Aggregate may be applied on a full data set, or on a grouped data set.</p>
-{% highlight java %}
-DataSet<Tuple3<Integer, String, Double>> input = // [...]
+Dataset<Tuple3<Integer, String, Double>> input = // [...]
DataSet<Tuple3<Integer, String, Double>> output = input.aggregate(SUM, 0).and(MIN, 2);
{% endhighlight %}
+ <p>You can also use short-hand syntax for minimum, maximum, and sum aggregations.</p>
+ {% highlight java %}
+ Dataset<Tuple3<Integer, String, Double>> input = // [...]
+DataSet<Tuple3<Integer, String, Double>> output = input.sum(0).andMin(2);
+ {% endhighlight %}
</td>
</tr>
</tr>
<td><strong>Join</strong></td>
<td>
- <p>Joins two data sets by creating all pairs of elements that are equal on their keys. Optionally uses a JoinFunction to turn the pair of elements into a single element. See [keys](#keys) on how to define join keys.</p>
+ Joins two data sets by creating all pairs of elements that are equal on their keys. Optionally uses a JoinFunction to turn the pair of elements into a single element, or a FlatJoinFunction to turn the pair of elements into arbitararily many (including none) elements. See [keys](#keys) on how to define join keys.
{% highlight java %}
result = input1.join(input2)
.where(0) // key of the first input (tuple field 0)
@@ -374,7 +350,7 @@ data1.coGroup(data2)
<tr>
<td><strong>Cross</strong></td>
<td>
- <p>Builds the cartesian product (cross product) of two inputs, creating all pairs of elements. Optionally uses a CrossFunction to turn the pair of elements into a single element</p>
+ <p>Builds the Cartesian product (cross product) of two inputs, creating all pairs of elements. Optionally uses a CrossFunction to turn the pair of elements into a single element</p>
{% highlight java %}
DataSet<Integer> data1 = // [...]
DataSet<String> data2 = // [...]
@@ -427,6 +403,88 @@ DataSet<Tuple2<String, Integer>> out = in.project(2,0).types(String.class, Integ
Defining Keys
-------------
+One transformation (join, coGroup) require that a key is defined on
+its argument DataSets, and other transformations (Reduce, GroupReduce,
+Aggregate) allow that the DataSet is grouped on a key before they are
+applied.
+
+A DataSet is grouped as
+{% highlight java %}
+DataSet<...> input = // [...]
+DataSet<...> reduced = input
+ .groupBy(/*define key here*/)
+ .reduceGroup(/*do something*/);
+{% endhighlight %}
+
+The data model of Flink is not based on key-value pairs. Therefore,
+you do not need to physically pack the data set types into keys and
+values. Keys are "virtual": they are defined as functions over the
+actual data to guide the grouping operator.
+
+The simplest case is grouping a data set of Tuples on one or more
+fields of the Tuple:
+{% highlight java %}
+DataSet<Tuple3<Integer,String,Long>> input = // [...]
+DataSet<Tuple3<Integer,String,Long> grouped = input
+ .groupBy(0)
+ .reduceGroup(/*do something*/);
+{% endhighlight %}
+
+The data set is grouped on the first field of the tuples (the one of
+Integer type). The GroupReduceFunction will thus receive groups with
+the same value of the first field.
+
+{% highlight java %}
+DataSet<Tuple3<Integer,String,Long>> input = // [...]
+DataSet<Tuple3<Integer,String,Long> grouped = input
+ .groupBy(0,1)
+ .reduce(/*do something*/);
+{% endhighlight %}
+
+The data set is grouped on the composite key consisting of the first and the
+second fields, therefore the GroupReduceFuntion will receive groups
+with the same value for both fields.
+
+In general, key definition is done via a "key selector" function, which
+takes as argument one dataset element and returns a key of an
+arbitrary data type by performing an arbitrary computation on this
+element. For example:
+{% highlight java %}
+// some ordinary POJO
+public class WC {public String word; public int count;}
+DataSet<WC> words = // [...]
+DataSet<WC> wordCounts = words
+ .groupBy(
+ new KeySelector<WC, String>() {
+ public String getKey(WC wc) { return wc.word; }
+ })
+ .reduce(/*do something*/);
+{% endhighlight %}
+
+Remember that keys are not only used for grouping, but also joining and matching data sets:
+{% highlight java %}
+// some POJO
+public class Rating {
+ public String name;
+ public String category;
+ public int points;
+}
+DataSet<Rating> ratings = // [...]
+DataSet<Tuple2<String, Double>> weights = // [...]
+DataSet<Tuple2<String, Double>>
+ weightedRatings =
+ ratings.join(weights)
+
+ // key of the first input
+ .where(new KeySelector<Rating, String>() {
+ public String getKey(Rating r) { return r.category; }
+ })
+
+ // key of the second input
+ .equalTo(new KeySelector<Tuple2<String, Double>, String>() {
+ public String getKey(Tuple2<String, Double> t) { return t.f0; }
+ });
+{% endhighlight %}
[Back to top](#top)
@@ -435,6 +493,85 @@ Defining Keys
Functions
---------
+You can define a user-defined function and pass it to the DataSet
+transformations in several ways:
+
+#### Implementing an interface
+
+The most basic way is to implement one of the provided interfaces:
+
+{% highlight java %}
+class MyMapFunction implements MapFunction<String, Integer> {
+ public Integer map(String value) { return Integer.parseInt(value); }
+});
+data.map (new MyMapFunction());
+{% endhighlight %}
+
+#### Anonymous classes
+
+You can pass a function as an anonmymous class:
+{% highlight java %}
+data.map(new MapFunction<String, Integer> () {
+ public Integer map(String value) { return Integer.parseInt(value); }
+});
+{% endhighlight %}
+
+#### Java 8 Lambdas
+
+***Warning: Lambdas are currently only supported for filter and reduce
+ transformations***
+
+{% highlight java %}
+DataSet<String> data = // [...]
+data.filter(s -> s.startsWith("http://"));
+{% endhighlight %}
+
+{% highlight java %}
+DataSet<Integer> data = // [...]
+data.reduce((i1,i2) -> i1 + i2);
+{% endhighlight %}
+
+#### Rich functions
+
+All transformations that take as argument a user-defined function can
+instead take as argument a *rich* function. For example, instead of
+{% highlight java %}
+class MyMapFunction implements MapFunction<String, Integer> {
+ public Integer map(String value) { return Integer.parseInt(value); }
+});
+{% endhighlight %}
+you can write
+{% highlight java %}
+class MyMapFunction extends RichMapFunction<String, Integer> {
+ public Integer map(String value) { return Integer.parseInt(value); }
+});
+{% endhighlight %}
+and pass the function as usual to a `map` transformation:
+{% highlight java %}
+data.map(new MyMapFunction());
+{% endhighlight %}
+
+Rich functions can also be defined as an anonymous class:
+{% highlight java %}
+data.map (new RichMapFunction<String, Integer>() {
+ public Integer map(String value) { return Integer.parseInt(value); }
+});
+{% endhighlight %}
+
+Rich functions provide, in addition to the user-defined function (map,
+reduce, etc), four methods: `open`, `close`, `getRuntimeContext`, and
+`setRuntimeContext`. These are useful for creating and finalizing
+local state, accessing broadcast variables (see
+[Broadcast Variables](#broadcast_variables), and for accessing runtime
+information such as accumulators and counters (see
+[Accumulators and Counters](#accumulators_counters), and information
+on iterations (see [Iterations](#iterations)).
+
+In particular for the `reduceGroup` transformation, using a rich
+function is the only way to define an optional `combine` function. See
+the
+[transformations documentation]({{site.baseurl}}/java_api_transformations.html)
+for a complete example.
[Back to top](#top)
@@ -502,7 +639,7 @@ wordCounts.map(new MapFunction<WordCount, Integer>() {
When working with operators that require a Key for grouping or matching records
you need to implement a `KeySelector` for your custom type (see
-[Section Data Transformations](#transformations)).
+[Section Defining Keys](#keys)).
```java
wordCounts.groupBy(new KeySelector<WordCount, String>() {
@@ -1043,4 +1180,3 @@ The script to start the webinterface is located under ```bin/start-webclient.sh`
You are able to specify program arguments in the textbox at the bottom of the page. Checking the plan visualization checkbox shows the execution plan before executing the actual program.
[Back to top](#top)
-
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/71cccd6d/docs/java_api_transformations.md
----------------------------------------------------------------------
diff --git a/docs/java_api_transformations.md b/docs/java_api_transformations.md
index bd5c45b..e4980de 100644
--- a/docs/java_api_transformations.md
+++ b/docs/java_api_transformations.md
@@ -241,7 +241,11 @@ DataSet<Double> output = input
#### Combinable GroupReduceFunctions
-In contrast to a `ReduceFunction`, a `GroupReduceFunction` is not necessarily combinable. In order to make a `GroupReduceFunction` combinable, you need to implement (override) the `combine()` method and annotate the `GroupReduceFunction` with the `@Combinable` annotation as shown here:
+In contrast to a `ReduceFunction`, a `GroupReduceFunction` is not
+necessarily combinable. In order to make a `GroupReduceFunction`
+combinable, you need to use the `RichGroupReduceFunction` variant,
+implement (override) the `combine()` method, and annotate the
+`GroupReduceFunction` with the `@Combinable` annotation as shown here:
```java
// Combinable GroupReduceFunction that computes two sums.
@@ -431,6 +435,28 @@ DataSet<Tuple2<String, Double>>
.with(new PointWeighter());
```
+#### Join with FlatJoinFunction
+
+Analogous to Map and FlatMap, a FlatJoin function behaves in the same
+way as a JoinFunction, but instead of returning one element, it can
+return (collect), zero, one, or more elements.
+{% highlight java %}
+public class PointWeighter
+ implements FlatJoinFunction<Rating, Tuple2<String, Double>, Tuple2<String, Double>> {
+ @Override
+ public void join(Rating rating, Tuple2<String, Double> weight,
+ Collector<Tuple2<String, Double>> out) {
+ if (weight.f1 > 0.1) {
+ out.collect(new Tuple2<String, Double>(rating.name, rating.points * weight.f1));
+ }
+ }
+}
+
+DataSet<Tuple2<String, Double>>
+ weightedRatings =
+ ratings.join(weights) // [...]
+{% endhighlight %}
+
#### Join with Projection
A Join transformation can construct result tuples using a projection as shown here:
@@ -625,4 +651,3 @@ DataSet<Tuple2<String, Integer>> unioned = vals1.union(vals2)
[Back to top](#top)
-