You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/08/06 11:47:34 UTC

git commit: Reflect recent changes to Java API documentation

Repository: incubator-flink
Updated Branches:
  refs/heads/master ecb829ed4 -> 71cccd6d8


Reflect recent changes to Java API documentation

change text to String

This closes #87.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/71cccd6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/71cccd6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/71cccd6d

Branch: refs/heads/master
Commit: 71cccd6d88fcefd824960d312153e2df523beb4d
Parents: ecb829e
Author: Kostas Tzoumas <Ko...@gmail.com>
Authored: Thu Jul 31 21:21:35 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Aug 6 11:46:45 2014 +0200

----------------------------------------------------------------------
 docs/java_api_guide.md           | 214 +++++++++++++++++++++++++++-------
 docs/java_api_transformations.md |  29 ++++-
 2 files changed, 202 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/71cccd6d/docs/java_api_guide.md
----------------------------------------------------------------------
diff --git a/docs/java_api_guide.md b/docs/java_api_guide.md
index f22b2b7..76dd332 100644
--- a/docs/java_api_guide.md
+++ b/docs/java_api_guide.md
@@ -31,14 +31,14 @@ public class WordCountExample {
     public static void main(String[] args) throws Exception {
         final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-        DataSet<String> text = env.fromElements(
+	    DataSet<String> text = env.fromElements(
             "Who's there?",
             "I think I hear them. Stand, ho! Who's there?");
 
         DataSet<Tuple2<String, Integer>> wordCounts = text
             .flatMap(new LineSplitter())
             .groupBy(0)
-            .aggregate(Aggregations.SUM, 1);
+            .sum(1);
 
         wordCounts.print();
 
@@ -140,7 +140,7 @@ an execution environment for executing your program on a cluster.
 For specifying data sources the execution environment has several methods
 to read from files using various methods: you can just read them line by line,
 as CSV files, or using completely custom data input formats. To just read
-a text file as a sequence of lines, you could use:
+a text file as a sequence of lines, you can use:
 
 ```java
 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -156,7 +156,7 @@ Once you have a `DataSet` you can apply transformations to create a new
 `DataSet` which you can then write to a file, transform again, or
 combine with other `DataSet`s. You apply transformations by calling
 methods on `DataSet` with your own custom transformation function. For example,
-map looks like this:
+a map transformation looks like this:
 
 ```java
 DataSet<String> input = ...;
@@ -262,7 +262,7 @@ data.flatMap(new FlatMapFunction<String, String>() {
     <tr>
       <td><strong>Filter</strong></td>
       <td>
-        <p>Evaluates a boolean function for each element and retains those for which the function returns *true*.</p>
+        <p>Evaluates a boolean function for each element and retains those for which the function returns true.</p>
 {% highlight java %}
 data.filter(new FilterFunction<Integer>() {
   public boolean filter(Integer value) { return value > 1000; }
@@ -307,45 +307,21 @@ data.reduceGroup(new GroupReduceFunction<Integer, Integer> {
       <td>
         <p>Aggregates a group of values into a single value. Aggregation functions can be thought of as built-in reduce functions. Aggregate may be applied on a full data set, or on a grouped data set.</p>
 {% highlight java %}
-DataSet<Tuple3<Integer, String, Double>> input = // [...]
-DataSet<Tuple3<Integer, String, Double>> output = input.aggregate(SUM, 0).and(MIN, 2);
-{% endhighlight %}
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>ReduceGroup</strong></td>
-      <td>
-        <p>Combines a group of elements into one or more elements. ReduceGroup may be applied on a full data set, or on a grouped data set.</p>
-{% highlight java %}
-data.reduceGroup(new GroupReduceFunction<Integer, Integer> {
-  public void reduceGroup(Iterable<Integer> values, Collector<Integer> out) {
-    int prefixSum = 0;
-    for (Integer i : values) {
-      prefixSum += i;
-      out.collect(prefixSum);
-    }
-  }
-});
-{% endhighlight %}
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>Aggregate</strong></td>
-      <td>
-        <p>Aggregates a group of values into a single value. Aggregation functions can be thought of as built-in reduce functions. Aggregate may be applied on a full data set, or on a grouped data set.</p>
-{% highlight java %}
-DataSet<Tuple3<Integer, String, Double>> input = // [...]
+Dataset<Tuple3<Integer, String, Double>> input = // [...]
 DataSet<Tuple3<Integer, String, Double>> output = input.aggregate(SUM, 0).and(MIN, 2);
 {% endhighlight %}
+	<p>You can also use short-hand syntax for minimum, maximum, and sum aggregations.</p>
+	{% highlight java %}
+	Dataset<Tuple3<Integer, String, Double>> input = // [...]
+DataSet<Tuple3<Integer, String, Double>> output = input.sum(0).andMin(2);
+	{% endhighlight %}
       </td>
     </tr>
 
     </tr>
       <td><strong>Join</strong></td>
       <td>
-        <p>Joins two data sets by creating all pairs of elements that are equal on their keys. Optionally uses a JoinFunction to turn the pair of elements into a single element. See [keys](#keys) on how to define join keys.</p>
+        Joins two data sets by creating all pairs of elements that are equal on their keys. Optionally uses a JoinFunction to turn the pair of elements into a single element, or a FlatJoinFunction to turn the pair of elements into arbitararily many (including none) elements. See [keys](#keys) on how to define join keys.
 {% highlight java %}
 result = input1.join(input2)
                .where(0)       // key of the first input (tuple field 0)
@@ -374,7 +350,7 @@ data1.coGroup(data2)
     <tr>
       <td><strong>Cross</strong></td>
       <td>
-        <p>Builds the cartesian product (cross product) of two inputs, creating all pairs of elements. Optionally uses a CrossFunction to turn the pair of elements into a single element</p>
+        <p>Builds the Cartesian product (cross product) of two inputs, creating all pairs of elements. Optionally uses a CrossFunction to turn the pair of elements into a single element</p>
 {% highlight java %}
 DataSet<Integer> data1 = // [...]
 DataSet<String> data2 = // [...]
@@ -427,6 +403,88 @@ DataSet<Tuple2<String, Integer>> out = in.project(2,0).types(String.class, Integ
 Defining Keys
 -------------
 
+One transformation (join, coGroup) require that a key is defined on
+its argument DataSets, and other transformations (Reduce, GroupReduce,
+Aggregate) allow that the DataSet is grouped on a key before they are
+applied.
+
+A DataSet is grouped as
+{% highlight java %}
+DataSet<...> input = // [...]
+DataSet<...> reduced = input
+	.groupBy(/*define key here*/)
+	.reduceGroup(/*do something*/);
+{% endhighlight %}
+
+The data model of Flink is not based on key-value pairs. Therefore,
+you do not need to physically pack the data set types into keys and
+values. Keys are "virtual": they are defined as functions over the
+actual data to guide the grouping operator.
+
+The simplest case is grouping a data set of Tuples on one or more
+fields of the Tuple:
+{% highlight java %}
+DataSet<Tuple3<Integer,String,Long>> input = // [...]
+DataSet<Tuple3<Integer,String,Long> grouped = input
+	.groupBy(0)
+	.reduceGroup(/*do something*/);
+{% endhighlight %}
+
+The data set is grouped on the first field of the tuples (the one of
+Integer type). The GroupReduceFunction will thus receive groups with
+the same value of the first field.
+
+{% highlight java %}
+DataSet<Tuple3<Integer,String,Long>> input = // [...]
+DataSet<Tuple3<Integer,String,Long> grouped = input
+	.groupBy(0,1)
+	.reduce(/*do something*/);
+{% endhighlight %}
+
+The data set is grouped on the composite key consisting of the first and the
+second fields, therefore the GroupReduceFuntion will receive groups
+with the same value for both fields.
+
+In general, key definition is done via a "key selector" function, which
+takes as argument one dataset element and returns a key of an
+arbitrary data type by performing an arbitrary computation on this
+element. For example:
+{% highlight java %}
+// some ordinary POJO
+public class WC {public String word; public int count;}
+DataSet<WC> words = // [...]
+DataSet<WC> wordCounts = words
+                         .groupBy(
+                           new KeySelector<WC, String>() {
+                             public String getKey(WC wc) { return wc.word; }
+                           })
+                         .reduce(/*do something*/);
+{% endhighlight %}
+
+Remember that keys are not only used for grouping, but also joining and matching data sets:
+{% highlight java %}
+// some POJO
+public class Rating {
+  public String name;
+  public String category;
+  public int points;
+}
+DataSet<Rating> ratings = // [...]
+DataSet<Tuple2<String, Double>> weights = // [...]
+DataSet<Tuple2<String, Double>>
+            weightedRatings =
+            ratings.join(weights)
+
+                   // key of the first input
+                   .where(new KeySelector<Rating, String>() {
+                            public String getKey(Rating r) { return r.category; }
+                          })
+
+                   // key of the second input
+                   .equalTo(new KeySelector<Tuple2<String, Double>, String>() {
+                              public String getKey(Tuple2<String, Double> t) { return t.f0; }
+                            });
+{% endhighlight %}
 
 [Back to top](#top)
 
@@ -435,6 +493,85 @@ Defining Keys
 Functions
 ---------
 
+You can define a user-defined function and pass it to the DataSet
+transformations in several ways:
+
+#### Implementing an interface
+
+The most basic way is to implement one of the provided interfaces:
+
+{% highlight java %}
+class MyMapFunction implements MapFunction<String, Integer> {
+  public Integer map(String value) { return Integer.parseInt(value); }
+});
+data.map (new MyMapFunction());
+{% endhighlight %}
+
+#### Anonymous classes
+
+You can pass a function as an anonmymous class:
+{% highlight java %}
+data.map(new MapFunction<String, Integer> () {
+  public Integer map(String value) { return Integer.parseInt(value); }
+});
+{% endhighlight %}
+
+#### Java 8 Lambdas
+
+***Warning: Lambdas are currently only supported for filter and reduce
+   transformations***
+
+{% highlight java %}
+DataSet<String> data = // [...]
+data.filter(s -> s.startsWith("http://"));
+{% endhighlight %}
+
+{% highlight java %}
+DataSet<Integer> data = // [...]
+data.reduce((i1,i2) -> i1 + i2);
+{% endhighlight %}
+
+#### Rich functions
+
+All transformations that take as argument a user-defined function can
+instead take as argument a *rich* function. For example, instead of
+{% highlight java %}
+class MyMapFunction implements MapFunction<String, Integer> {
+  public Integer map(String value) { return Integer.parseInt(value); }
+});
+{% endhighlight %}
+you can write
+{% highlight java %}
+class MyMapFunction extends RichMapFunction<String, Integer> {
+  public Integer map(String value) { return Integer.parseInt(value); }
+});
+{% endhighlight %}
+and pass the function as usual to a `map` transformation:
+{% highlight java %}
+data.map(new MyMapFunction());
+{% endhighlight %}
+
+Rich functions can also be defined as an anonymous class:
+{% highlight java %}
+data.map (new RichMapFunction<String, Integer>() {
+  public Integer map(String value) { return Integer.parseInt(value); }
+});
+{% endhighlight %}
+
+Rich functions provide, in addition to the user-defined function (map,
+reduce, etc), four methods: `open`, `close`, `getRuntimeContext`, and
+`setRuntimeContext`. These are useful for creating and finalizing
+local state, accessing broadcast variables (see
+[Broadcast Variables](#broadcast_variables), and for accessing runtime
+information such as accumulators and counters (see
+[Accumulators and Counters](#accumulators_counters), and information
+on iterations (see [Iterations](#iterations)).
+
+In particular for the `reduceGroup` transformation, using a rich
+function is the only way to define an optional `combine` function. See
+the
+[transformations documentation]({{site.baseurl}}/java_api_transformations.html)
+for a complete example.
 
 [Back to top](#top)
 
@@ -502,7 +639,7 @@ wordCounts.map(new MapFunction<WordCount, Integer>() {
 
 When working with operators that require a Key for grouping or matching records
 you need to implement a `KeySelector` for your custom type (see
-[Section Data Transformations](#transformations)).
+[Section Defining Keys](#keys)).
 
 ```java
 wordCounts.groupBy(new KeySelector<WordCount, String>() {
@@ -1043,4 +1180,3 @@ The script to start the webinterface is located under ```bin/start-webclient.sh`
 You are able to specify program arguments in the textbox at the bottom of the page. Checking the plan visualization checkbox shows the execution plan before executing the actual program.
 
 [Back to top](#top)
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/71cccd6d/docs/java_api_transformations.md
----------------------------------------------------------------------
diff --git a/docs/java_api_transformations.md b/docs/java_api_transformations.md
index bd5c45b..e4980de 100644
--- a/docs/java_api_transformations.md
+++ b/docs/java_api_transformations.md
@@ -241,7 +241,11 @@ DataSet<Double> output = input
 
 #### Combinable GroupReduceFunctions
 
-In contrast to a `ReduceFunction`, a `GroupReduceFunction` is not necessarily combinable. In order to make a `GroupReduceFunction` combinable, you need to implement (override) the `combine()` method and annotate the `GroupReduceFunction` with the `@Combinable` annotation as shown here:
+In contrast to a `ReduceFunction`, a `GroupReduceFunction` is not
+necessarily combinable. In order to make a `GroupReduceFunction`
+combinable, you need to use the `RichGroupReduceFunction` variant,
+implement (override) the `combine()` method, and annotate the
+`GroupReduceFunction` with the `@Combinable` annotation as shown here:
 
 ```java
 // Combinable GroupReduceFunction that computes two sums.
@@ -431,6 +435,28 @@ DataSet<Tuple2<String, Double>>
                    .with(new PointWeighter());
 ```
 
+#### Join with FlatJoinFunction
+
+Analogous to Map and FlatMap, a FlatJoin function behaves in the same
+way as a JoinFunction, but instead of returning one element, it can
+return (collect), zero, one, or more elements.
+{% highlight java %}
+public class PointWeighter
+         implements FlatJoinFunction<Rating, Tuple2<String, Double>, Tuple2<String, Double>> {
+  @Override
+  public void join(Rating rating, Tuple2<String, Double> weight,
+	  Collector<Tuple2<String, Double>> out) {
+	if (weight.f1 > 0.1) {
+		out.collect(new Tuple2<String, Double>(rating.name, rating.points * weight.f1));
+	}
+  }
+}
+
+DataSet<Tuple2<String, Double>>
+            weightedRatings =
+            ratings.join(weights) // [...]
+{% endhighlight %}
+
 #### Join with Projection
 
 A Join transformation can construct result tuples using a projection as shown here:
@@ -625,4 +651,3 @@ DataSet<Tuple2<String, Integer>> unioned = vals1.union(vals2)
 
 
 [Back to top](#top)
-