You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:29:29 UTC

[47/60] [doc] Switch parser to kramdown, normalize Headings

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4ddc3f72/docs/java_api_transformations.md
----------------------------------------------------------------------
diff --git a/docs/java_api_transformations.md b/docs/java_api_transformations.md
deleted file mode 100644
index c1dcc19..0000000
--- a/docs/java_api_transformations.md
+++ /dev/null
@@ -1,677 +0,0 @@
----
-title: "Java API Transformations"
----
-
-<section id="top">
-DataSet Transformations
------------------------
-
-This document gives a deep-dive into the available transformations on DataSets. For a general introduction to the
-Flink Java API, please refer to the [API guide](java_api_guide.html)
-
-
-### Map
-
-The Map transformation applies a user-defined `MapFunction` on each element of a DataSet.
-It implements a one-to-one mapping, that is, exactly one element must be returned by
-the function.
-
-The following code transforms a `DataSet` of Integer pairs into a `DataSet` of Integers:
-
-```java
-// MapFunction that adds two integer values
-public class IntAdder implements MapFunction<Tuple2<Integer, Integer>, Integer> {
-  @Override
-  public Integer map(Tuple2<Integer, Integer> in) {
-    return in.f0 + in.f1;
-  }
-}
-
-// [...]
-DataSet<Tuple2<Integer, Integer>> intPairs = // [...]
-DataSet<Integer> intSums = intPairs.map(new IntAdder());
-```
-
-### FlatMap
-
-The FlatMap transformation applies a user-defined `FlatMapFunction` on each element of a `DataSet`.
-This variant of a map function can return arbitrary many result elements (including none) for each input element.
-
-The following code transforms a `DataSet` of text lines into a `DataSet` of words:
-
-```java
-// FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens.
-public class Tokenizer implements FlatMapFunction<String, String> {
-  @Override
-  public void flatMap(String value, Collector<String> out) {
-    for (String token : value.split("\\W")) {
-      out.collect(token);
-    }
-  }
-}
-
-// [...]
-DataSet<String> textLines = // [...]
-DataSet<String> words = textLines.flatMap(new Tokenizer());
-```
-
-### MapPartition
-
-The MapPartition function transforms a parallel partition in a single function call. The function get the partition as an `Iterable` stream and
-can produce an arbitrary number of result values. The number of elements in each partition depends on the degree-of-parallelism
-and previous operations.
-
-The following code transforms a `DataSet` of text lines into a `DataSet` of counts per partition:
-
-```java
-public class PartitionCounter implements MapPartitionFunction<String, Long> {
-
-  public void mapPartition(Iterable<String> values, Collector<Long> out) {
-    long c = 0;
-    for (String s : values) {
-      c++;
-    }
-    out.collect(c);
-  }
-}
-
-// [...]
-DataSet<String> textLines = // [...]
-DataSet<Long> counts = textLines.mapPartition(new PartitionCounter());
-```
-
-### Filter
-
-The Filter transformation applies a user-defined `FilterFunction` on each element of a `DataSet` and retains only those elements for which the function returns `true`.
-
-The following code removes all Integers smaller than zero from a `DataSet`:
-
-```java
-// FilterFunction that filters out all Integers smaller than zero.
-public class NaturalNumberFilter implements FilterFunction<Integer> {
-  @Override
-  public boolean filter(Integer number) {
-    return number >= 0;
-  }
-}
-
-// [...]
-DataSet<Integer> intNumbers = // [...]
-DataSet<Integer> naturalNumbers = intNumbers.filter(new NaturalNumberFilter());
-```
-
-### Project (Tuple DataSets only)
-
-The Project transformation removes or moves `Tuple` fields of a `Tuple` `DataSet`.
-The `project(int...)` method selects `Tuple` fields that should be retained by their index and defines their order in the output `Tuple`.
-The `types(Class<?> ...)`method must give the types of the output `Tuple` fields.
-
-Projections do not require the definition of a user function.
-
-The following code shows different ways to apply a Project transformation on a `DataSet`:
-
-```java
-DataSet<Tuple3<Integer, Double, String>> in = // [...]
-// converts Tuple3<Integer, Double, String> into Tuple2<String, Integer>
-DataSet<Tuple2<String, Integer>> out = in.project(2,0).types(String.class, Integer.class);
-```
-
-### Transformations on grouped DataSet
-
-The reduce operations can operate on grouped data sets. Specifying the key to
-be used for grouping can be done in two ways:
-
-- a `KeySelector` function or
-- one or more field position keys (`Tuple` `DataSet` only).
-
-Please look at the reduce examples to see how the grouping keys are specified.
-
-### Reduce on grouped DataSet
-
-A Reduce transformation that is applied on a grouped `DataSet` reduces each group to a single element using a user-defined `ReduceFunction`.
-For each group of input elements, a `ReduceFunction` successively combines pairs of elements into one element until only a single element for each group remains.
-
-#### Reduce on DataSet grouped by KeySelector Function
-
-A `KeySelector` function extracts a key value from each element of a `DataSet`. The extracted key value is used to group the `DataSet`.
-The following code shows how to group a POJO `DataSet` using a `KeySelector` function and to reduce it with a `ReduceFunction`.
-
-```java
-// some ordinary POJO
-public class WC {
-  public String word;
-  public int count;
-  // [...]
-}
-
-// ReduceFunction that sums Integer attributes of a POJO
-public class WordCounter implements ReduceFunction<WC> {
-  @Override
-  public WC reduce(WC in1, WC in2) {
-    return new WC(in1.word, in1.count + in2.count);
-  }
-}
-
-// [...]
-DataSet<WC> words = // [...]
-DataSet<WC> wordCounts = words
-                         // DataSet grouping with inline-defined KeySelector function
-                         .groupBy(
-                           new KeySelector<WC, String>() {
-                             public String getKey(WC wc) { return wc.word; }
-                           })
-                         // apply ReduceFunction on grouped DataSet
-                         .reduce(new WordCounter());
-```
-
-#### Reduce on DataSet grouped by Field Position Keys (Tuple DataSets only)
-
-Field position keys specify one or more fields of a `Tuple` `DataSet` that are used as grouping keys.
-The following code shows how to use field position keys and apply a `ReduceFunction`.
-
-```java
-DataSet<Tuple3<String, Integer, Double>> tuples = // [...]
-DataSet<Tuple3<String, Integer, Double>> reducedTuples =
-                                         tuples
-                                         // group DataSet on first and second field of Tuple
-                                         .groupBy(0,1)
-                                         // apply ReduceFunction on grouped DataSet
-                                         .reduce(new MyTupleReducer());
-```
-
-### GroupReduce on grouped DataSet
-
-A GroupReduce transformation that is applied on a grouped `DataSet` calls a user-defined `GroupReduceFunction` for each group. The difference
-between this and `Reduce` is that the user defined function gets the whole group at once.
-The function is invoked with an Iterable over all elements of a group and can return an arbitrary number of result elements using the collector.
-
-#### GroupReduce on DataSet grouped by Field Position Keys (Tuple DataSets only)
-
-The following code shows how duplicate strings can be removed from a `DataSet` grouped by Integer.
-
-```java
-public class DistinctReduce
-         implements GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
-
-  @Override
-  public void reduce(Iterable<Tuple2<Integer, String>> in, Collector<Tuple2<Integer, String>> out) {
-
-    Set<String> uniqStrings = new HashSet<String>();
-    Integer key = null;
-  
-    // add all strings of the group to the set
-    for (Tuple2<Integer, String> t : in) {
-      key = t.f0;
-      uniqStrings.add(t.f1);
-    }
-
-    // emit all unique strings.
-    for (String s : uniqStrings) {
-      out.collect(new Tuple2<Integer, String>(key, s));
-    }
-  }
-}
-
-// [...]
-DataSet<Tuple2<Integer, String>> input = // [...]
-DataSet<Tuple2<Integer, String>> output = input
-                           .groupBy(0)            // group DataSet by the first tuple field
-                           .reduceGroup(new DistinctReduce());  // apply GroupReduceFunction
-```
-
-#### GroupReduce on DataSet grouped by KeySelector Function
-
-Works analogous to `KeySelector` functions in Reduce transformations.
-
-#### GroupReduce on sorted groups (Tuple DataSets only)
-
-A `GroupReduceFunction` accesses the elements of a group using an Iterable. Optionally, the Iterable can hand out the elements of a group in a specified order. In many cases this can help to reduce the complexity of a user-defined `GroupReduceFunction` and improve its efficiency.
-Right now, this feature is only available for DataSets of Tuples.
-
-The following code shows another example how to remove duplicate Strings in a `DataSet` grouped by an Integer and sorted by String.
-
-```java
-// GroupReduceFunction that removes consecutive identical elements
-public class DistinctReduce
-         implements GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
-
-  @Override
-  public void reduce(Iterable<Tuple2<Integer, String>> in, Collector<Tuple2<Integer, String>> out) {
-    Integer key = null;
-    String comp = null;
-
-    for (Tuple2<Integer, String> t : in) {
-      key = t.f0;
-      String next = t.f1;
-
-      // check if strings are different
-      if (com == null || !next.equals(comp)) {
-        out.collect(new Tuple2<Integer, String>(key, next));
-        comp = next;
-      }
-    }
-  }
-}
-
-// [...]
-DataSet<Tuple2<Integer, String>> input = // [...]
-DataSet<Double> output = input
-                         .groupBy(0)                         // group DataSet by first field
-                         .sortGroup(1, Order.ASCENDING)      // sort groups on second tuple field
-                         .reduceGroup(new DistinctReduce());
-```
-
-**Note:** A GroupSort often comes for free if the grouping is established using a sort-based execution strategy of an operator before the reduce operation.
-
-#### Combinable GroupReduceFunctions
-
-In contrast to a `ReduceFunction`, a `GroupReduceFunction` is not
-necessarily combinable. In order to make a `GroupReduceFunction`
-combinable, you need to use the `RichGroupReduceFunction` variant,
-implement (override) the `combine()` method, and annotate the
-`GroupReduceFunction` with the `@Combinable` annotation as shown here:
-
-```java
-// Combinable GroupReduceFunction that computes two sums.
-// Note that we use the RichGroupReduceFunction because it defines the combine method
-@Combinable
-public class MyCombinableGroupReducer
-         extends RichGroupReduceFunction<Tuple3<String, Integer, Double>,
-                                     Tuple3<String, Integer, Double>> {
-  @Override
-  public void reduce(Iterable<Tuple3<String, Integer, Double>> in,
-                     Collector<Tuple3<String, Integer, Double>> out) {
-
-    String key = null
-    int intSum = 0;
-    double doubleSum = 0.0;
-
-    for (Tuple3<String, Integer, Double> curr : in) {
-      key = curr.f0;
-      intSum += curr.f1;
-      doubleSum += curr.f2;
-    }
-    // emit a tuple with both sums
-    out.collect(new Tuple3<String, Integer, Double>(key, intSum, doubleSum));
-  }
-
-  @Override
-  public void combine(Iterable<Tuple3<String, Integer, Double>> in,
-                      Collector<Tuple3<String, Integer, Double>> out)) {
-    // in some cases combine() calls can simply be forwarded to reduce().
-    this.reduce(in, out);
-  }
-}
-```
-
-### Aggregate on grouped Tuple DataSet
-
-There are some common aggregation operations that are frequently used. The Aggregate transformation provides the following build-in aggregation functions:
-
-- Sum,
-- Min, and
-- Max.
-
-The Aggregate transformation can only be applied on a `Tuple` `DataSet` and supports only field positions keys for grouping.
-
-The following code shows how to apply an Aggregation transformation on a `DataSet` grouped by field position keys:
-
-```java
-DataSet<Tuple3<Integer, String, Double>> input = // [...]
-DataSet<Tuple3<Integer, String, Double>> output = input
-                                   .groupBy(1)        // group DataSet on second field
-                                   .aggregate(SUM, 0) // compute sum of the first field
-                                   .and(MIN, 2);      // compute minimum of the third field
-```
-
-To apply multiple aggregations on a DataSet it is necessary to use the `.and()` function after the first aggregate, that means `.aggregate(SUM, 0).and(MIN, 2)` produces the sum of field 0 and the minimum of field 2 of the original DataSet. 
-In contrast to that `.aggregate(SUM, 0).aggregate(MIN, 2)` will apply an aggregation on an aggregation. In the given example it would produce the minimum of field 2 after calculating the sum of field 0 grouped by field 1.
-
-**Note:** The set of aggregation functions will be extended in the future.
-
-### Reduce on full DataSet
-
-The Reduce transformation applies a user-defined `ReduceFunction` to all elements of a `DataSet`.
-The `ReduceFunction` subsequently combines pairs of elements into one element until only a single element remains.
-
-The following code shows how to sum all elements of an Integer `DataSet`:
-
-```java
-// ReduceFunction that sums Integers
-public class IntSummer implements ReduceFunction<Integer> {
-  @Override
-  public Integer reduce(Integer num1, Integer num2) {
-    return num1 + num2;
-  }
-}
-
-// [...]
-DataSet<Integer> intNumbers = // [...]
-DataSet<Integer> sum = intNumbers.reduce(new IntSummer());
-```
-
-Reducing a full `DataSet` using the Reduce transformation implies that the final Reduce operation cannot be done in parallel. However, a `ReduceFunction` is automatically combinable such that a Reduce transformation does not limit scalability for most use cases.
-
-### GroupReduce on full DataSet
-
-The GroupReduce transformation applies a user-defined `GroupReduceFunction` on all elements of a `DataSet`.
-A `GroupReduceFunction` can iterate over all elements of `DataSet` and return an arbitrary number of result elements.
-
-The following example shows how to apply a GroupReduce transformation on a full `DataSet`:
-
-```java
-DataSet<Integer> input = // [...]
-// apply a (preferably combinable) GroupReduceFunction to a DataSet
-DataSet<Double> output = input.reduceGroup(new MyGroupReducer());
-```
-
-**Note:** A GroupReduce transformation on a full `DataSet` cannot be done in parallel if the `GroupReduceFunction` is not combinable. Therefore, this can be a very compute intensive operation. See the paragraph on "Combineable `GroupReduceFunction`s" above to learn how to implement a combinable `GroupReduceFunction`.
-
-### Aggregate on full Tuple DataSet
-
-There are some common aggregation operations that are frequently used. The Aggregate transformation provides the following build-in aggregation functions:
-
-- Sum,
-- Min, and
-- Max.
-
-The Aggregate transformation can only be applied on a `Tuple` `DataSet`.
-
-The following code shows how to apply an Aggregation transformation on a full `DataSet`:
-
-```java
-DataSet<Tuple2<Integer, Double>> input = // [...]
-DataSet<Tuple2<Integer, Double>> output = input
-                                     .aggregate(SUM, 0)    // compute sum of the first field
-                                     .and(MIN, 1);    // compute minimum of the second field
-```
-
-**Note:** Extending the set of supported aggregation functions is on our roadmap.
-
-### Join
-
-The Join transformation joins two `DataSet`s into one `DataSet`. The elements of both `DataSet`s are joined on one or more keys which can be specified using
-
-- a `KeySelector` function or
-- one or more field position keys (`Tuple` `DataSet` only).
-
-There are a few different ways to perform a Join transformation which are shown in the following.
-
-#### Default Join (Join into Tuple2)
-
-The default Join transformation produces a new `Tuple``DataSet` with two fields. Each tuple holds a joined element of the first input `DataSet` in the first tuple field and a matching element of the second input `DataSet` in the second field.
-
-The following code shows a default Join transformation using field position keys:
-
-```java
-DataSet<Tuple2<Integer, String>> input1 = // [...]
-DataSet<Tuple2<Double, Integer>> input2 = // [...]
-// result dataset is typed as Tuple2
-DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Double, Integer>>>
-            result = input1.join(input2)
-                           .where(0)       // key of the first input
-                           .equalTo(1);    // key of the second input
-```
-
-#### Join with JoinFunction
-
-A Join transformation can also call a user-defined `JoinFunction` to process joining tuples.
-A `JoinFunction` receives one element of the first input `DataSet` and one element of the second input `DataSet` and returns exactly one element.
-
-The following code performs a join of `DataSet` with custom java objects and a `Tuple` `DataSet` using `KeySelector` functions and shows how to call a user-defined `JoinFunction`:
-
-```java
-// some POJO
-public class Rating {
-  public String name;
-  public String category;
-  public int points;
-}
-
-// Join function that joins a custom POJO with a Tuple
-public class PointWeighter
-         implements JoinFunction<Rating, Tuple2<String, Double>, Tuple2<String, Double>> {
-
-  @Override
-  public Tuple2<String, Double> join(Rating rating, Tuple2<String, Double> weight) {
-    // multiply the points and rating and construct a new output tuple
-    return new Tuple2<String, Double>(rating.name, rating.points * weight.f1);
-  }
-}
-
-DataSet<Rating> ratings = // [...]
-DataSet<Tuple2<String, Double>> weights = // [...]
-DataSet<Tuple2<String, Double>>
-            weightedRatings =
-            ratings.join(weights)
-
-                   // key of the first input
-                   .where(new KeySelection<Rating, String>() {
-                            public String getKey(Rating r) { return r.category; }
-                          })
-
-                   // key of the second input
-                   .equalTo(new KeySelection<Tuple2<String, Double>, String>() {
-                              public String getKey(Tuple2<String, Double> t) { return t.f0; }
-                            })
-
-                   // applying the JoinFunction on joining pairs
-                   .with(new PointWeighter());
-```
-
-#### Join with FlatJoinFunction
-
-Analogous to Map and FlatMap, a FlatJoin function behaves in the same
-way as a JoinFunction, but instead of returning one element, it can
-return (collect), zero, one, or more elements.
-{% highlight java %}
-public class PointWeighter
-         implements FlatJoinFunction<Rating, Tuple2<String, Double>, Tuple2<String, Double>> {
-  @Override
-  public void join(Rating rating, Tuple2<String, Double> weight,
-	  Collector<Tuple2<String, Double>> out) {
-	if (weight.f1 > 0.1) {
-		out.collect(new Tuple2<String, Double>(rating.name, rating.points * weight.f1));
-	}
-  }
-}
-
-DataSet<Tuple2<String, Double>>
-            weightedRatings =
-            ratings.join(weights) // [...]
-{% endhighlight %}
-
-#### Join with Projection
-
-A Join transformation can construct result tuples using a projection as shown here:
-
-```java
-DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
-DataSet<Tuple2<Integer, Double>> input2 = // [...]
-DataSet<Tuple4<Integer, String, Double, Byte>
-            result =
-            input1.join(input2)
-                  // key definition on first DataSet using a field position key
-                  .where(0)
-                  // key definition of second DataSet using a field position key
-                  .equalTo(0)
-                  // select and reorder fields of matching tuples
-                  .projectFirst(0,2).projectSecond(1).projectFirst(1)
-                  .types(Integer.class, String.class, Double.class, Byte.class);
-```
-
-`projectFirst(int...)` and `projectSecond(int...)` select the fields of the first and second joined input that should be assembled into an output `Tuple`. The order of indexes defines the order of fields in the output tuple.
-The join projection works also for non-`Tuple` `DataSet`s. In this case, `projectFirst()` or `projectSecond()` must be called without arguments to add a joined element to the output `Tuple`.
-
-#### Join with DataSet Size Hint
-
-In order to guide the optimizer to pick the right execution strategy, you can hint the size of a `DataSet` to join as shown here:
-
-```java
-DataSet<Tuple2<Integer, String>> input1 = // [...]
-DataSet<Tuple2<Integer, String>> input2 = // [...]
-
-DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
-            result1 =
-            // hint that the second DataSet is very small
-            input1.joinWithTiny(input2)
-                  .where(0)
-                  .equalTo(0);
-
-DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
-            result2 =
-            // hint that the second DataSet is very large
-            input1.joinWithHuge(input2)
-                  .where(0)
-                  .equalTo(0);
-```
-
-### Cross
-
-The Cross transformation combines two `DataSet`s into one `DataSet`. It builds all pairwise combinations of the elements of both input `DataSet`s, i.e., it builds a Cartesian product.
-The Cross transformation either calls a user-defined `CrossFunction` on each pair of elements or applies a projection. Both modes are shown in the following.
-
-**Note:** Cross is potentially a *very* compute-intensive operation which can challenge even large compute clusters!
-
-#### Cross with User-Defined Function
-
-A Cross transformation can call a user-defined `CrossFunction`. A `CrossFunction` receives one element of the first input and one element of the second input and returns exactly one result element.
-
-The following code shows how to apply a Cross transformation on two `DataSet`s using a `CrossFunction`:
-
-```java
-public class Coord {
-  public int id;
-  public int x;
-  public int y;
-}
-
-// CrossFunction computes the Euclidean distance between two Coord objects.
-public class EuclideanDistComputer
-         implements CrossFunction<Coord, Coord, Tuple3<Integer, Integer, Double>> {
-
-  @Override
-  public Tuple3<Integer, Integer, Double> cross(Coord c1, Coord c2) {
-    // compute Euclidean distance of coordinates
-    double dist = sqrt(pow(c1.x - c2.x, 2) + pow(c1.y - c2.y, 2));
-    return new Tuple3<Integer, Integer, Double>(c1.id, c2.id, dist);
-  }
-}
-
-DataSet<Coord> coords1 = // [...]
-DataSet<Coord> coords2 = // [...]
-DataSet<Tuple3<Integer, Integer, Double>>
-            distances =
-            coords1.cross(coords2)
-                   // apply CrossFunction
-                   .with(new EuclideanDistComputer());
-```
-
-#### Cross with Projection
-
-A Cross transformation can also construct result tuples using a projection as shown here:
-
-```java
-DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
-DataSet<Tuple2<Integer, Double>> input2 = // [...]
-DataSet<Tuple4<Integer, Byte, Integer, Double>
-            result =
-            input1.cross(input2)
-                  // select and reorder fields of matching tuples
-                  .projectSecond(0).projectFirst(1,0).projectSecond(1)
-                  .types(Integer.class, Byte.class, Integer.class, Double.class);
-```
-
-The field selection in a Cross projection works the same way as in the projection of Join results.
-
-#### Cross with DataSet Size Hint
-
-In order to guide the optimizer to pick the right execution strategy, you can hint the size of a `DataSet` to cross as shown here:
-
-```java
-DataSet<Tuple2<Integer, String>> input1 = // [...]
-DataSet<Tuple2<Integer, String>> input2 = // [...]
-
-DataSet<Tuple4<Integer, String, Integer, String>>
-            udfResult =
-                  // hint that the second DataSet is very small
-            input1.crossWithTiny(input2)
-                  // apply any Cross function (or projection)
-                  .with(new MyCrosser());
-
-DataSet<Tuple3<Integer, Integer, String>>
-            projectResult =
-                  // hint that the second DataSet is very large
-            input1.crossWithHuge(input2)
-                  // apply a projection (or any Cross function)
-                  .projectFirst(0,1).projectSecond(1).types(Integer.class, String.class, String.class)
-```
-
-### CoGroup
-
-The CoGroup transformation jointly processes groups of two `DataSet`s. Both `DataSet`s are grouped on a defined key and groups of both `DataSet`s that share the same key are handed together to a user-defined `CoGroupFunction`. If for a specific key only one `DataSet` has a group, the `CoGroupFunction` is called with this group and an empty group.
-A `CoGroupFunction` can separately iterate over the elements of both groups and return an arbitrary number of result elements.
-
-Similar to Reduce, GroupReduce, and Join, keys can be defined using
-
-- a `KeySelector` function or
-- one or more field position keys (`Tuple` `DataSet` only).
-
-#### CoGroup on DataSets grouped by Field Position Keys (Tuple DataSets only)
-
-```java
-// Some CoGroupFunction definition
-class MyCoGrouper
-         implements CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Double>, Double> {
-
-  @Override
-  public void coGroup(Iterable<Tuple2<String, Integer>> iVals,
-                      Iterable<Tuple2<String, Double>> dVals,
-                      Collector<Double> out) {
-
-    Set<Integer> ints = new HashSet<Integer>();
-
-    // add all Integer values in group to set
-    for (Tuple2<String, Integer>> val : iVale) {
-      ints.add(val.f1);
-    }
-
-    // multiply each Double value with each unique Integer values of group
-    for (Tuple2<String, Double> val : dVals) {
-      for (Integer i : ints) {
-        out.collect(val.f1 * i);
-      }
-    }
-  }
-}
-
-// [...]
-DataSet<Tuple2<String, Integer>> iVals = // [...]
-DataSet<Tuple2<String, Double>> dVals = // [...]
-DataSet<Double> output = iVals.coGroup(dVals)
-                         // group first DataSet on first tuple field
-                         .where(0)
-                         // group second DataSet on first tuple field
-                         .equalTo(0)
-                         // apply CoGroup function on each pair of groups
-                         .with(new MyCoGrouper());
-```
-
-#### CoGroup on DataSets grouped by Key Selector Function
-
-Works analogous to key selector functions in Join transformations.
-
-### Union
-
-Produces the union of two `DataSet`s, which have to be of the same type. A union of more than two `DataSet`s can be implemented with multiple union calls, as shown here:
-
-```java
-DataSet<Tuple2<String, Integer>> vals1 = // [...]
-DataSet<Tuple2<String, Integer>> vals2 = // [...]
-DataSet<Tuple2<String, Integer>> vals3 = // [...]
-DataSet<Tuple2<String, Integer>> unioned = vals1.union(vals2)
-                    .union(vals3);
-```
-
-
-[Back to top](#top)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4ddc3f72/docs/js/codetabs.js
----------------------------------------------------------------------
diff --git a/docs/js/codetabs.js b/docs/js/codetabs.js
new file mode 100644
index 0000000..878aa32
--- /dev/null
+++ b/docs/js/codetabs.js
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* Note: This file is originally from the Apache Spark project. */
+
+/* Custom JavaScript code in the MarkDown docs */
+
+// Enable language-specific code tabs
+function codeTabs() {
+  var counter = 0;
+  var langImages = {
+    "scala": "img/scala-sm.png",
+    "python": "img/python-sm.png",
+    "java": "img/java-sm.png"
+  };
+  $("div.codetabs").each(function() {
+    $(this).addClass("tab-content");
+
+    // Insert the tab bar
+    var tabBar = $('<ul class="nav nav-tabs" data-tabs="tabs"></ul>');
+    $(this).before(tabBar);
+
+    // Add each code sample to the tab bar:
+    var codeSamples = $(this).children("div");
+    codeSamples.each(function() {
+      $(this).addClass("tab-pane");
+      var lang = $(this).data("lang");
+      var image = $(this).data("image");
+      var notabs = $(this).data("notabs");
+      var capitalizedLang = lang.substr(0, 1).toUpperCase() + lang.substr(1);
+      var id = "tab_" + lang + "_" + counter;
+      $(this).attr("id", id);
+      if (image != null && langImages[lang]) {
+        var buttonLabel = "<img src='" +langImages[lang] + "' alt='" + capitalizedLang + "' />";
+      } else if (notabs == null) {
+        var buttonLabel = "<b>" + capitalizedLang + "</b>";
+      } else {
+        var buttonLabel = ""
+      }
+      tabBar.append(
+        '<li><a class="tab_' + lang + '" href="#' + id + '">' + buttonLabel + '</a></li>'
+      );
+    });
+
+    codeSamples.first().addClass("active");
+    tabBar.children("li").first().addClass("active");
+    counter++;
+  });
+  $("ul.nav-tabs a").click(function (e) {
+    // Toggling a tab should switch all tabs corresponding to the same language
+    // while retaining the scroll position
+    e.preventDefault();
+    var scrollOffset = $(this).offset().top - $(document).scrollTop();
+    $("." + $(this).attr('class')).tab('show');
+    $(document).scrollTop($(this).offset().top - scrollOffset);
+  });
+}
+
+function makeCollapsable(elt, accordionClass, accordionBodyId, title) {
+  $(elt).addClass("accordion-inner");
+  $(elt).wrap('<div class="accordion ' + accordionClass + '"></div>')
+  $(elt).wrap('<div class="accordion-group"></div>')
+  $(elt).wrap('<div id="' + accordionBodyId + '" class="accordion-body collapse"></div>')
+  $(elt).parent().before(
+    '<div class="accordion-heading">' +
+      '<a class="accordion-toggle" data-toggle="collapse" href="#' + accordionBodyId + '">' +
+             title +
+      '</a>' +
+    '</div>'
+  );
+}
+
+// Enable "view solution" sections (for exercises)
+function viewSolution() {
+  var counter = 0
+  $("div.solution").each(function() {
+    var id = "solution_" + counter
+    makeCollapsable(this, "", id,
+      '<i class="icon-ok-sign" style="text-decoration: none; color: #0088cc">' +
+      '</i>' + "View Solution");
+    counter++;
+  });
+}
+
+// A script to fix internal hash links because we have an overlapping top bar.
+// Based on https://github.com/twitter/bootstrap/issues/193#issuecomment-2281510
+function maybeScrollToHash() {
+  console.log("HERE");
+  if (window.location.hash && $(window.location.hash).length) {
+    console.log("HERE2", $(window.location.hash), $(window.location.hash).offset().top);
+    var newTop = $(window.location.hash).offset().top - 57;
+    $(window).scrollTop(newTop);
+  }
+}
+
+$(function() {
+  codeTabs();
+  viewSolution();
+
+  $(window).bind('hashchange', function() {
+    maybeScrollToHash();
+  });
+
+  // Scroll now too in case we had opened the page on a hash, but wait a bit because some browsers
+  // will try to do *their* initial scroll after running the onReady handler.
+  $(window).load(function() { setTimeout(function() { maybeScrollToHash(); }, 25); }); 
+});

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4ddc3f72/docs/local_execution.md
----------------------------------------------------------------------
diff --git a/docs/local_execution.md b/docs/local_execution.md
index 4a126bb..48d9ae7 100644
--- a/docs/local_execution.md
+++ b/docs/local_execution.md
@@ -2,7 +2,7 @@
 title:  "Local Execution"
 ---
 
-# Local Execution/Debugging
+## Local Execution/Debugging
 
 Flink can run on a single machine, even in a single Java Virtual Machine. This allows users to test and debug Flink programs locally. This section gives an overview of the local execution mechanisms.
 
@@ -17,19 +17,19 @@ The `JobExecutionResult` object, which is returned after the execution finished,
 *Note:* The local execution environments do not start any web frontend to monitor the execution.
 
 
-# Maven Dependency
+## Maven Dependency
 
 If you are developing your program in a Maven project, you have to add the `flink-clients` module using this dependency:
 
-```xml
+~~~xml
 <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-clients</artifactId>
   <version>{{site.FLINK_VERSION_STABLE}}</version>
 </dependency>
-```
+~~~
 
-# Local Environment
+## Local Environment
 
 The `LocalEnvironment` is a handle to local execution for Flink programs. Use it to run a program within a local JVM - standalone or embedded in other programs.
 
@@ -37,7 +37,7 @@ The local environment is instantiated via the method `ExecutionEnvironment.creat
 
 In most cases, calling `ExecutionEnvironment.getExecutionEnvironment()` is the even better way to go. That method returns a `LocalEnvironment` when the program is started locally (outside the command line interface), and it returns a pre-configured environment for cluster execution, when the program is invoked by the [command line interface](cli.html).
 
-```java
+~~~java
 public static void main(String[] args) throws Exception {
     ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
 
@@ -53,16 +53,16 @@ public static void main(String[] args) throws Exception {
 
     env.execute();
 }
-```
+~~~
 
 
-# Local Executor
+## Local Executor
 
 The *LocalExecutor* is similar to the local environment, but it takes a *Plan* object, which describes the program as a single executable unit. The *LocalExecutor* is typically used with the Scala API. 
 
 The following code shows how you would use the `LocalExecutor` with the Wordcount example for Scala Programs:
 
-```scala
+~~~scala
 public static void main(String[] args) throws Exception {
     val input = TextFile("hdfs://path/to/file")
 
@@ -74,15 +74,15 @@ public static void main(String[] args) throws Exception {
     val plan = new ScalaPlan(Seq(output), "Word Count")
     LocalExecutor.executePlan(p);
 }
-```
+~~~
 
 
-# LocalDistributedExecutor
+## LocalDistributedExecutor
 
 Flink also offers a `LocalDistributedExecutor` which starts multiple TaskManagers within one JVM. The standard `LocalExecutor` starts one JobManager and one TaskManager in one JVM.
 With the `LocalDistributedExecutor` you can define the number of TaskManagers to start. This is useful for debugging network related code and more of a developer tool than a user tool.
 
-```java
+~~~java
 public static void main(String[] args) throws Exception {
     ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -101,6 +101,6 @@ public static void main(String[] args) throws Exception {
     lde.startNephele(2); // start two TaskManagers
     lde.run(p);
 }
-```
+~~~
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4ddc3f72/docs/local_setup.md
----------------------------------------------------------------------
diff --git a/docs/local_setup.md b/docs/local_setup.md
index d12401b..58fd847 100644
--- a/docs/local_setup.md
+++ b/docs/local_setup.md
@@ -2,62 +2,64 @@
 title:  "Local Setup"
 ---
 
+* This will be replaced by the TOC
+{:toc}
+
 This documentation is intended to provide instructions on how to run Flink locally on a single machine.
 
-# Download
+## Download
 
 Go to the [downloads page]({{site.baseurl}}/downloads/) and get the ready to run package. If you want to interact with Hadoop (e.g. HDFS or HBase), make sure to pick the Flink package **matching your Hadoop version**. When in doubt or you plan to just work with the local file system pick the package for Hadoop 1.2.x.
 
-# Requirements
+## Requirements
 
-Flink runs on **Linux**, **Mac OS X** and **Windows**. The only requirement for a local setup is **Java 1.6.x** or higher. The following manual assumes a *UNIX-like environment*, for Windows see [Flink on Windows](#windows).
+Flink runs on **Linux**, **Mac OS X** and **Windows**. The only requirement for a local setup is **Java 1.6.x** or higher. The following manual assumes a *UNIX-like environment*, for Windows see [Flink on Windows](#flink-on-windows).
 
 You can check the correct installation of Java by issuing the following command:
 
-```bash
+~~~bash
 java -version
-```
+~~~
 
 The command should output something comparable to the following:
 
-```bash
+~~~bash
 java version "1.6.0_22"
 Java(TM) SE Runtime Environment (build 1.6.0_22-b04)
 Java HotSpot(TM) 64-Bit Server VM (build 17.1-b03, mixed mode)
-```
+~~~
 
-# Configuration
+## Configuration
 
 **For local mode Flink is ready to go out of the box and you don't need to change the default configuration.**
 
 The out of the box configuration will use your default Java installation. You can manually set the environment variable `JAVA_HOME` or the configuration key `env.java.home` in `conf/flink-conf.yaml` if you want to manually override the Java runtime to use. Consult the [configuration page](config.html) for further details about configuring Flink.
 
-# Starting Flink
+## Starting Flink
 
 **You are now ready to start Flink.** Unpack the downloaded archive and change to the newly created `flink` directory. There you can start Flink in local mode:
 
-```bash
+~~~bash
 $ tar xzf flink-*.tgz
 $ cd flink
 $ bin/start-local.sh
 Starting job manager
-```
+~~~
 
 You can check that the system is running by checking the log files in the `logs` directory:
 
-```bash
+~~~bash
 $ tail log/flink-*-jobmanager-*.log
 INFO ... - Initializing memory manager with 409 megabytes of memory
 INFO ... - Trying to load org.apache.flinknephele.jobmanager.scheduler.local.LocalScheduler as scheduler
 INFO ... - Setting up web info server, using web-root directory ...
 INFO ... - Web info server will display information about nephele job-manager on localhost, port 8081.
 INFO ... - Starting web info server for JobManager on port 8081
-```
+~~~
 
 The JobManager will also start a web frontend on port 8081, which you can check with your browser at `http://localhost:8081`.
 
-<section id="windows">
-# Flink on Windows
+## Flink on Windows
 
 If you want to run Flink on Windows you need to download, unpack and configure the Flink archive as mentioned above. After that you can either use the **Windows Batch** file (`.bat`) or use **Cygwin**  to run the Flink Jobmanager.
 
@@ -67,13 +69,13 @@ To start Flink in local mode from the *Windows Batch*, open the command window,
 
 Note: The ``bin`` folder of your Java Runtime Environment must be included in Window's ``%PATH%`` variable. Follow this [guide](http://www.java.com/en/download/help/path.xml) to add Java to the ``%PATH%`` variable.
 
-```bash
+~~~bash
 $ cd flink
 $ cd bin
 $ start-local.bat
 Starting Flink job manager. Webinterface by default on http://localhost:8081/.
 Do not close this batch window. Stop job manager by pressing Ctrl+C.
-```
+~~~
 
 After that, you need to open a second terminal to run jobs using `flink.bat`.
 
@@ -81,19 +83,19 @@ After that, you need to open a second terminal to run jobs using `flink.bat`.
 
 With *Cygwin* you need to start the Cygwin Terminal, navigate to your Flink directory and run the `start-local.sh` script:
 
-```bash
+~~~bash
 $ cd flink
 $ bin/start-local.sh
 Starting Nephele job manager
-```
+~~~
 
 ### Installing Flink from Git
 
 If you are installing Flink from the git repository and you are using the Windows git shell, Cygwin can produce a failure similiar to this one:
 
-```bash
+~~~bash
 c:/flink/bin/start-local.sh: line 30: $'\r': command not found
-```
+~~~
 
 This error occurs, because git is automatically transforming UNIX line endings to Windows style line endings when running in Windows. The problem is, that Cygwin can only deal with UNIX style line endings. The solution is to adjust the Cygwin settings to deal with the correct line endings by following these three steps:
 
@@ -101,17 +103,17 @@ This error occurs, because git is automatically transforming UNIX line endings t
 
 2. Determine your home directory by entering
 
-```bash
+~~~bash
 cd; pwd
-```
+~~~
 
 It will return a path under the Cygwin root path.
 
 2.  Using NotePad, WordPad or a different text editor open the file `.bash_profile` in the home directory and append the following: (If the file does not exist you have to create it)
 
-```bash
+~~~bash
 export SHELLOPTS
 set -o igncr
-```
+~~~
 
 Save the file and open a new bash shell.
\ No newline at end of file