You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/04/22 16:17:26 UTC

[29/30] flink git commit: [docs] Change doc layout

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ee90cc/docs/apis/dataset_transformations.md
----------------------------------------------------------------------
diff --git a/docs/apis/dataset_transformations.md b/docs/apis/dataset_transformations.md
new file mode 100644
index 0000000..a829b1f
--- /dev/null
+++ b/docs/apis/dataset_transformations.md
@@ -0,0 +1,1689 @@
+---
+title: "DataSet Transformations"
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This document gives a deep-dive into the available transformations on DataSets. For a general introduction to the
+Flink Java API, please refer to the [Programming Guide](programming_guide.html).
+
+* This will be replaced by the TOC
+{:toc}
+
+### Map
+
+The Map transformation applies a user-defined map function on each element of a DataSet.
+It implements a one-to-one mapping, that is, exactly one element must be returned by
+the function.
+
+The following code transforms a DataSet of Integer pairs into a DataSet of Integers:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+// MapFunction that adds two integer values
+public class IntAdder implements MapFunction<Tuple2<Integer, Integer>, Integer> {
+  @Override
+  public Integer map(Tuple2<Integer, Integer> in) {
+    return in.f0 + in.f1;
+  }
+}
+
+// [...]
+DataSet<Tuple2<Integer, Integer>> intPairs = // [...]
+DataSet<Integer> intSums = intPairs.map(new IntAdder());
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val intPairs: DataSet[(Int, Int)] = // [...]
+val intSums = intPairs.map { pair => pair._1 + pair._2 }
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ intSums = intPairs.map(lambda x: sum(x), INT)
+~~~
+
+</div>
+</div>
+
+### FlatMap
+
+The FlatMap transformation applies a user-defined flat-map function on each element of a DataSet.
+This variant of a map function can return arbitrary many result elements (including none) for each input element.
+
+The following code transforms a DataSet of text lines into a DataSet of words:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+// FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens.
+public class Tokenizer implements FlatMapFunction<String, String> {
+  @Override
+  public void flatMap(String value, Collector<String> out) {
+    for (String token : value.split("\\W")) {
+      out.collect(token);
+    }
+  }
+}
+
+// [...]
+DataSet<String> textLines = // [...]
+DataSet<String> words = textLines.flatMap(new Tokenizer());
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val textLines: DataSet[String] = // [...]
+val words = textLines.flatMap { _.split(" ") }
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ words = lines.flat_map(lambda x,c: [line.split() for line in x], STRING)
+~~~
+
+</div>
+</div>
+
+### MapPartition
+
+MapPartition transforms a parallel partition in a single function call. The map-partition function
+gets the partition as Iterable and can produce an arbitrary number of result values. The number of elements in each partition depends on the degree-of-parallelism
+and previous operations.
+
+The following code transforms a DataSet of text lines into a DataSet of counts per partition:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+public class PartitionCounter implements MapPartitionFunction<String, Long> {
+
+  public void mapPartition(Iterable<String> values, Collector<Long> out) {
+    long c = 0;
+    for (String s : values) {
+      c++;
+    }
+    out.collect(c);
+  }
+}
+
+// [...]
+DataSet<String> textLines = // [...]
+DataSet<Long> counts = textLines.mapPartition(new PartitionCounter());
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val textLines: DataSet[String] = // [...]
+// Some is required because the return value must be a Collection.
+// There is an implicit conversion from Option to a Collection.
+val counts = texLines.mapPartition { in => Some(in.size) }
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ counts = lines.map_partition(lambda x,c: [sum(1 for _ in x)], INT)
+~~~
+
+</div>
+</div>
+
+### Filter
+
+The Filter transformation applies a user-defined filter function on each element of a DataSet and retains only those elements for which the function returns `true`.
+
+The following code removes all Integers smaller than zero from a DataSet:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+// FilterFunction that filters out all Integers smaller than zero.
+public class NaturalNumberFilter implements FilterFunction<Integer> {
+  @Override
+  public boolean filter(Integer number) {
+    return number >= 0;
+  }
+}
+
+// [...]
+DataSet<Integer> intNumbers = // [...]
+DataSet<Integer> naturalNumbers = intNumbers.filter(new NaturalNumberFilter());
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val intNumbers: DataSet[Int] = // [...]
+val naturalNumbers = intNumbers.filter { _ > 0 }
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ naturalNumbers = intNumbers.filter(lambda x: x > 0)
+~~~
+
+</div>
+</div>
+
+**IMPORTANT:** The system assumes that the function does not modify the elements on which the predicate is applied. Violating this assumption
+can lead to incorrect results.
+
+### Project (Tuple DataSets only) (Java/Python API Only)
+
+The Project transformation removes or moves Tuple fields of a Tuple DataSet.
+The `project(int...)` method selects Tuple fields that should be retained by their index and defines their order in the output Tuple.
+
+Projections do not require the definition of a user function.
+
+The following code shows different ways to apply a Project transformation on a DataSet:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+DataSet<Tuple3<Integer, Double, String>> in = // [...]
+// converts Tuple3<Integer, Double, String> into Tuple2<String, Integer>
+DataSet<Tuple2<String, Integer>> out = in.project(2,0);
+~~~
+
+#### Projection with Type Hint
+
+Note that the Java compiler cannot infer the return type of `project` operator. This can cause a problem if you call another operator on a result of `project` operator such as:
+
+~~~java
+DataSet<Tuple5<String,String,String,String,String>> ds = ....
+DataSet<Tuple1<String>> ds2 = ds.project(0).distinct(0);
+~~~
+
+This problem can be overcome by hinting the return type of `project` operator like this:
+
+~~~java
+DataSet<Tuple1<String>> ds2 = ds.<Tuple1<String>>project(0).distinct(0);
+~~~
+
+### Transformations on Grouped DataSet
+
+The reduce operations can operate on grouped data sets. Specifying the key to
+be used for grouping can be done in many ways:
+
+- key expressions
+- a key-selector function
+- one or more field position keys (Tuple DataSet only)
+- Case Class fields (Case Classes only)
+
+Please look at the reduce examples to see how the grouping keys are specified.
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+out = in.project(2,0);
+~~~
+
+### Transformations on Grouped DataSet
+
+The reduce operations can operate on grouped data sets. Specifying the key to
+be used for grouping can be done using one or more field position keys (Tuple DataSet only).
+
+Please look at the reduce examples to see how the grouping keys are specified.
+
+</div>
+</div>
+
+### Reduce on Grouped DataSet
+
+A Reduce transformation that is applied on a grouped DataSet reduces each group to a single
+element using a user-defined reduce function.
+For each group of input elements, a reduce function successively combines pairs of elements into one
+element until only a single element for each group remains.
+
+#### Reduce on DataSet Grouped by KeySelector Function
+
+A key-selector function extracts a key value from each element of a DataSet. The extracted key
+value is used to group the DataSet.
+The following code shows how to group a POJO DataSet using a key-selector function and to reduce it
+with a reduce function.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+// some ordinary POJO
+public class WC {
+  public String word;
+  public int count;
+  // [...]
+}
+
+// ReduceFunction that sums Integer attributes of a POJO
+public class WordCounter implements ReduceFunction<WC> {
+  @Override
+  public WC reduce(WC in1, WC in2) {
+    return new WC(in1.word, in1.count + in2.count);
+  }
+}
+
+// [...]
+DataSet<WC> words = // [...]
+DataSet<WC> wordCounts = words
+                         // DataSet grouping on field "word"
+                         .groupBy("word")
+                         // apply ReduceFunction on grouped DataSet
+                         .reduce(new WordCounter());
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+// some ordinary POJO
+class WC(val word: String, val count: Int) {
+  def this() {
+    this(null, -1)
+  }
+  // [...]
+}
+
+val words: DataSet[WC] = // [...]
+val wordCounts = words.groupBy { _.word } reduce {
+  (w1, w2) => new WC(w1.word, w1.count + w2.count)
+}
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+Not supported.
+~~~
+</div>
+</div>
+
+#### Reduce on DataSet Grouped by Field Position Keys (Tuple DataSets only)
+
+Field position keys specify one or more fields of a Tuple DataSet that are used as grouping keys.
+The following code shows how to use field position keys and apply a reduce function
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+DataSet<Tuple3<String, Integer, Double>> tuples = // [...]
+DataSet<Tuple3<String, Integer, Double>> reducedTuples =
+                                         tuples
+                                         // group DataSet on first and second field of Tuple
+                                         .groupBy(0,1)
+                                         // apply ReduceFunction on grouped DataSet
+                                         .reduce(new MyTupleReducer());
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val tuples = DataSet[(String, Int, Double)] = // [...]
+// group on the first and second Tuple field
+val reducedTuples = tuples.groupBy(0, 1).reduce { ... }
+~~~
+
+
+#### Reduce on DataSet grouped by Case Class Fields
+
+When using Case Classes you can also specify the grouping key using the names of the fields:
+
+~~~scala
+case class MyClass(val a: String, b: Int, c: Double)
+val tuples = DataSet[MyClass] = // [...]
+// group on the first and second field
+val reducedTuples = tuples.groupBy("a", "b").reduce { ... }
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ reducedTuples = tuples.group_by(0, 1).reduce( ... )
+~~~
+
+</div>
+</div>
+
+### GroupReduce on Grouped DataSet
+
+A GroupReduce transformation that is applied on a grouped DataSet calls a user-defined
+group-reduce function for each group. The difference
+between this and *Reduce* is that the user defined function gets the whole group at once.
+The function is invoked with an Iterable over all elements of a group and can return an arbitrary
+number of result elements.
+
+#### GroupReduce on DataSet Grouped by Field Position Keys (Tuple DataSets only)
+
+The following code shows how duplicate strings can be removed from a DataSet grouped by Integer.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+public class DistinctReduce
+         implements GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
+
+  @Override
+  public void reduce(Iterable<Tuple2<Integer, String>> in, Collector<Tuple2<Integer, String>> out) {
+
+    Set<String> uniqStrings = new HashSet<String>();
+    Integer key = null;
+
+    // add all strings of the group to the set
+    for (Tuple2<Integer, String> t : in) {
+      key = t.f0;
+      uniqStrings.add(t.f1);
+    }
+
+    // emit all unique strings.
+    for (String s : uniqStrings) {
+      out.collect(new Tuple2<Integer, String>(key, s));
+    }
+  }
+}
+
+// [...]
+DataSet<Tuple2<Integer, String>> input = // [...]
+DataSet<Tuple2<Integer, String>> output = input
+                           .groupBy(0)            // group DataSet by the first tuple field
+                           .reduceGroup(new DistinctReduce());  // apply GroupReduceFunction
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val input: DataSet[(Int, String)] = // [...]
+val output = input.groupBy(0).reduceGroup {
+      (in, out: Collector[(Int, String)]) =>
+        in.toSet foreach (out.collect)
+    }
+~~~
+
+#### GroupReduce on DataSet Grouped by Case Class Fields
+
+Works analogous to grouping by Case Class fields in *Reduce* transformations.
+
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ class DistinctReduce(GroupReduceFunction):
+   def reduce(self, iterator, collector):
+     dic = dict()
+     for value in iterator:
+       dic[value[1]] = 1
+     for key in dic.keys():
+       collector.collect(key)
+
+ output = data.group_by(0).reduce_group(DistinctReduce(), STRING)
+~~~
+
+
+</div>
+</div>
+
+#### GroupReduce on DataSet Grouped by KeySelector Function
+
+Works analogous to key-selector functions in *Reduce* transformations.
+
+#### GroupReduce on sorted groups
+
+A group-reduce function accesses the elements of a group using an Iterable. Optionally, the Iterable can hand out the elements of a group in a specified order. In many cases this can help to reduce the complexity of a user-defined
+group-reduce function and improve its efficiency.
+
+The following code shows another example how to remove duplicate Strings in a DataSet grouped by an Integer and sorted by String.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+// GroupReduceFunction that removes consecutive identical elements
+public class DistinctReduce
+         implements GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
+
+  @Override
+  public void reduce(Iterable<Tuple2<Integer, String>> in, Collector<Tuple2<Integer, String>> out) {
+    Integer key = null;
+    String comp = null;
+
+    for (Tuple2<Integer, String> t : in) {
+      key = t.f0;
+      String next = t.f1;
+
+      // check if strings are different
+      if (com == null || !next.equals(comp)) {
+        out.collect(new Tuple2<Integer, String>(key, next));
+        comp = next;
+      }
+    }
+  }
+}
+
+// [...]
+DataSet<Tuple2<Integer, String>> input = // [...]
+DataSet<Double> output = input
+                         .groupBy(0)                         // group DataSet by first field
+                         .sortGroup(1, Order.ASCENDING)      // sort groups on second tuple field
+                         .reduceGroup(new DistinctReduce());
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val input: DataSet[(Int, String)] = // [...]
+val output = input.groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup {
+      (in, out: Collector[(Int, String)]) =>
+        var prev: (Int, String) = null
+        for (t <- in) {
+          if (prev == null || prev != t)
+            out.collect(t)
+        }
+    }
+
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ class DistinctReduce(GroupReduceFunction):
+   def reduce(self, iterator, collector):
+     dic = dict()
+     for value in iterator:
+       dic[value[1]] = 1
+     for key in dic.keys():
+       collector.collect(key)
+
+ output = data.group_by(0).sort_group(1, Order.ASCENDING).reduce_group(DistinctReduce(), STRING)
+~~~
+
+
+</div>
+</div>
+
+**Note:** A GroupSort often comes for free if the grouping is established using a sort-based execution strategy of an operator before the reduce operation.
+
+#### Combinable GroupReduceFunctions
+
+In contrast to a reduce function, a group-reduce function is not
+necessarily combinable. In order to make a group-reduce function
+combinable, you need to use the `RichGroupReduceFunction` variant,
+implement (override) the `combine()` method, and annotate the
+`RichGroupReduceFunction` with the `@Combinable` annotation as shown here:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+// Combinable GroupReduceFunction that computes two sums.
+// Note that we use the RichGroupReduceFunction because it defines the combine method
+@Combinable
+public class MyCombinableGroupReducer
+         extends RichGroupReduceFunction<Tuple3<String, Integer, Double>,
+                                     Tuple3<String, Integer, Double>> {
+  @Override
+  public void reduce(Iterable<Tuple3<String, Integer, Double>> in,
+                     Collector<Tuple3<String, Integer, Double>> out) {
+
+    String key = null;
+    int intSum = 0;
+    double doubleSum = 0.0;
+
+    for (Tuple3<String, Integer, Double> curr : in) {
+      key = curr.f0;
+      intSum += curr.f1;
+      doubleSum += curr.f2;
+    }
+    // emit a tuple with both sums
+    out.collect(new Tuple3<String, Integer, Double>(key, intSum, doubleSum));
+  }
+
+  @Override
+  public void combine(Iterable<Tuple3<String, Integer, Double>> in,
+                      Collector<Tuple3<String, Integer, Double>> out) {
+    // in some cases combine() calls can simply be forwarded to reduce().
+    this.reduce(in, out);
+  }
+}
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+
+// Combinable GroupReduceFunction that computes two sums.
+// Note that we use the RichGroupReduceFunction because it defines the combine method
+@Combinable
+class MyCombinableGroupReducer
+  extends RichGroupReduceFunction[(String, Int, Double), (String, Int, Double)] {}
+
+  def reduce(
+      in: java.lang.Iterable[(String, Int, Double)],
+      out: Collector[(String, Int, Double)]): Unit = {
+
+    val key: String = null
+    val intSum = 0
+    val doubleSum = 0.0
+
+    for (curr <- in) {
+      key = curr._1
+      intSum += curr._2
+      doubleSum += curr._3
+    }
+    // emit a tuple with both sums
+    out.collect(key, intSum, doubleSum);
+  }
+
+  def combine(
+      in: java.lang.Iterable[(String, Int, Double)],
+      out: Collector[(String, Int, Double)]): Unit = {
+    // in some cases combine() calls can simply be forwarded to reduce().
+    this.reduce(in, out)
+  }
+}
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ class GroupReduce(GroupReduceFunction):
+   def reduce(self, iterator, collector):
+     key, int_sum, float_sum = iterator.next()
+     for value in iterator:
+       int_sum += value[1]
+       float_sum += value[2]
+     collector.collect((key, int_sum, float_sum))
+   # in some cases combine() calls can simply be forwarded to reduce().
+   def combine(self, iterator, collector):
+     return self.reduce(iterator, collector)
+
+data.reduce_group(GroupReduce(), (STRING, INT, FLOAT), combinable=True)
+~~~
+
+</div>
+</div>
+
+### GroupCombine on a Grouped DataSet
+
+The GroupCombine transformation is the generalized form of the combine step in
+the Combinable GroupReduceFunction. It is generalized in the sense that it
+allows combining of input type `I` to an arbitrary output type `O`. In contrast,
+the combine step in the GroupReduce only allows combining from input type `I` to
+output type `I`. This is because the reduce step in the GroupReduceFunction
+expects input type `I`.
+
+In some applications, it is desirable to combine a DataSet into an intermediate
+format before performing additional transformations (e.g. to reduce data
+size). This can be achieved with a ComineGroup transformation with very little
+costs.
+
+**Note:** The GroupCombine on a Grouped DataSet is performed in memory with a
+  greedy strategy which may not process all data at once but in multiple
+  steps. It is also performed on the individual partitions without a data
+  exchange like in a GroupReduce transformation. This may lead to partial
+  results.
+
+The following example demonstrates the use of a CombineGroup transformation for
+an alternative WordCount implementation. In the implementation,
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+DataSet<String> input = [..] // The words received as input
+DataSet<String> groupedInput = input.groupBy(0); // group identical words
+
+DataSet<Tuple2<String, Integer>> combinedWords = groupedInput.combineGroup(new GroupCombineFunction<String, Tuple2<String, Integer>() {
+
+    public void combine(Iterable<String> words, Collector<Tuple2<String, Integer>>) { // combine
+        int count = 0;
+        for (String word : words) {
+            count++;
+        }
+        out.collect(new Tuple2(word, count));
+    }
+});
+
+DataSet<Tuple2<String, Integer>> groupedCombinedWords = combinedWords.groupBy(0); // group by words again
+
+DataSet<Tuple2<String, Integer>> output = combinedWords.reduceGroup(new GroupReduceFunction() { // group reduce with full data exchange
+
+    public void reduce(Iterable<Tuple2<String, Integer>>, Collector<Tuple2<String, Integer>>) {
+        int count = 0;
+        for (Tuple2<String, Integer> word : words) {
+            count++;
+        }
+        out.collect(new Tuple2(word, count));
+    }
+});
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val input: DataSet[String] = [..] // The words received as input
+val groupedInput: DataSet[String] = input.groupBy(0)
+
+val combinedWords: DataSet[(String, Int)] = groupedInput.combineGroup {
+    (words, out: Collector[(String, Int)]) =>
+        var count = 0
+        for (word <- words) {
+            count++
+        }
+        out.collect(word, count)
+}
+
+val groupedCombinedWords: DataSet[(String, Int)] = combinedWords.groupBy(0)
+
+val output: DataSet[(String, Int)] = groupedInput.reduceGroup {
+    (words, out: Collector[(String, Int)]) =>
+        var count = 0
+        for ((word, Int) <- words) {
+            count++
+        }
+        out.collect(word, count)
+}
+
+~~~
+
+</div>
+</div>
+
+The above alternative WordCount implementation demonstrates how the GroupCombine
+combines words before performing the GroupReduce transformation. The above
+example is just a proof of concept. Note, how the combine step changes the type
+of the DataSet which would normally required an additional Map transformation
+before executing the GroupReduce.
+
+### Aggregate on Grouped Tuple DataSet
+
+There are some common aggregation operations that are frequently used. The Aggregate transformation provides the following build-in aggregation functions:
+
+- Sum,
+- Min, and
+- Max.
+
+The Aggregate transformation can only be applied on a Tuple DataSet and supports only field positions keys for grouping.
+
+The following code shows how to apply an Aggregation transformation on a DataSet grouped by field position keys:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+DataSet<Tuple3<Integer, String, Double>> input = // [...]
+DataSet<Tuple3<Integer, String, Double>> output = input
+                                   .groupBy(1)        // group DataSet on second field
+                                   .aggregate(SUM, 0) // compute sum of the first field
+                                   .and(MIN, 2);      // compute minimum of the third field
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val input: DataSet[(Int, String, Double)] = // [...]
+val output = input.groupBy(1).aggregate(SUM, 0).and(MIN, 2)
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+Not supported.
+~~~
+
+</div>
+</div>
+
+To apply multiple aggregations on a DataSet it is necessary to use the `.and()` function after the first aggregate, that means `.aggregate(SUM, 0).and(MIN, 2)` produces the sum of field 0 and the minimum of field 2 of the original DataSet.
+In contrast to that `.aggregate(SUM, 0).aggregate(MIN, 2)` will apply an aggregation on an aggregation. In the given example it would produce the minimum of field 2 after calculating the sum of field 0 grouped by field 1.
+
+**Note:** The set of aggregation functions will be extended in the future.
+
+### Reduce on full DataSet
+
+The Reduce transformation applies a user-defined reduce function to all elements of a DataSet.
+The reduce function subsequently combines pairs of elements into one element until only a single element remains.
+
+The following code shows how to sum all elements of an Integer DataSet:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+// ReduceFunction that sums Integers
+public class IntSummer implements ReduceFunction<Integer> {
+  @Override
+  public Integer reduce(Integer num1, Integer num2) {
+    return num1 + num2;
+  }
+}
+
+// [...]
+DataSet<Integer> intNumbers = // [...]
+DataSet<Integer> sum = intNumbers.reduce(new IntSummer());
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val intNumbers = env.fromElements(1,2,3)
+val sum = intNumbers.reduce (_ + _)
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ intNumbers = env.from_elements(1,2,3)
+ sum = intNumbers.reduce(lambda x,y: x + y)
+~~~
+
+</div>
+</div>
+
+Reducing a full DataSet using the Reduce transformation implies that the final Reduce operation cannot be done in parallel. However, a reduce function is automatically combinable such that a Reduce transformation does not limit scalability for most use cases.
+
+### GroupReduce on full DataSet
+
+The GroupReduce transformation applies a user-defined group-reduce function on all elements of a DataSet.
+A group-reduce can iterate over all elements of DataSet and return an arbitrary number of result elements.
+
+The following example shows how to apply a GroupReduce transformation on a full DataSet:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+DataSet<Integer> input = // [...]
+// apply a (preferably combinable) GroupReduceFunction to a DataSet
+DataSet<Double> output = input.reduceGroup(new MyGroupReducer());
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val input: DataSet[Int] = // [...]
+val output = input.reduceGroup(new MyGroupReducer())
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ output = data.reduce_group(MyGroupReducer(), ... )
+~~~
+
+</div>
+</div>
+
+**Note:** A GroupReduce transformation on a full DataSet cannot be done in parallel if the
+group-reduce function is not combinable. Therefore, this can be a very compute intensive operation.
+See the paragraph on "Combineable Group-Reduce Functions" above to learn how to implement a
+combinable group-reduce function.
+
+### GroupCombine on a full DataSet
+
+The GroupCombine on a full DataSet works similar to the GroupCombine on a
+grouped DataSet. The data is partitioned on all nodes and then combined in a
+greedy fashion (i.e. only data fitting into memory is combined at once).
+
+### Aggregate on full Tuple DataSet
+
+There are some common aggregation operations that are frequently used. The Aggregate transformation
+provides the following build-in aggregation functions:
+
+- Sum,
+- Min, and
+- Max.
+
+The Aggregate transformation can only be applied on a Tuple DataSet.
+
+The following code shows how to apply an Aggregation transformation on a full DataSet:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+DataSet<Tuple2<Integer, Double>> input = // [...]
+DataSet<Tuple2<Integer, Double>> output = input
+                                     .aggregate(SUM, 0)    // compute sum of the first field
+                                     .and(MIN, 1);    // compute minimum of the second field
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val input: DataSet[(Int, String, Double)] = // [...]
+val output = input.aggregate(SUM, 0).and(MIN, 2)
+
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+Not supported.
+~~~
+
+</div>
+</div>
+
+**Note:** Extending the set of supported aggregation functions is on our roadmap.
+
+### Join
+
+The Join transformation joins two DataSets into one DataSet. The elements of both DataSets are joined on one or more keys which can be specified using
+
+- a kex expression
+- a key-selector function
+- one or more field position keys (Tuple DataSet only).
+- Case Class Fields
+
+There are a few different ways to perform a Join transformation which are shown in the following.
+
+#### Default Join (Join into Tuple2)
+
+The default Join transformation produces a new Tuple DataSet with two fields. Each tuple holds a joined element of the first input DataSet in the first tuple field and a matching element of the second input DataSet in the second field.
+
+The following code shows a default Join transformation using field position keys:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+public static class User { public String name; public int zip; }
+public static class Store { public Manager mgr; public int zip; }
+DataSet<User> input1 = // [...]
+DataSet<Store> input2 = // [...]
+// result dataset is typed as Tuple2
+DataSet<Tuple2<User, Store>>
+            result = input1.join(input2)
+                           .where("zip")       // key of the first input (users)
+                           .equalTo("zip");    // key of the second input (stores)
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val input1: DataSet[(Int, String)] = // [...]
+val input2: DataSet[(Double, Int)] = // [...]
+val result = input1.join(input2).where(0).equalTo(1)
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ result = input1.join(input2).where(0).equal_to(1)
+~~~
+
+</div>
+</div>
+
+#### Join with Join-Function
+
+A Join transformation can also call a user-defined join function to process joining tuples.
+A join function receives one element of the first input DataSet and one element of the second input DataSet and returns exactly one element.
+
+The following code performs a join of DataSet with custom java objects and a Tuple DataSet using key-selector functions and shows how to use a user-defined join function:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+// some POJO
+public class Rating {
+  public String name;
+  public String category;
+  public int points;
+}
+
+// Join function that joins a custom POJO with a Tuple
+public class PointWeighter
+         implements JoinFunction<Rating, Tuple2<String, Double>, Tuple2<String, Double>> {
+
+  @Override
+  public Tuple2<String, Double> join(Rating rating, Tuple2<String, Double> weight) {
+    // multiply the points and rating and construct a new output tuple
+    return new Tuple2<String, Double>(rating.name, rating.points * weight.f1);
+  }
+}
+
+DataSet<Rating> ratings = // [...]
+DataSet<Tuple2<String, Double>> weights = // [...]
+DataSet<Tuple2<String, Double>>
+            weightedRatings =
+            ratings.join(weights)
+
+                   // key of the first input
+                   .where("category")
+
+                   // key of the second input
+                   .equalTo("f0")
+
+                   // applying the JoinFunction on joining pairs
+                   .with(new PointWeighter());
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+case class Rating(name: String, category: String, points: Int)
+
+val ratings: DataSet[Ratings] = // [...]
+val weights: DataSet[(String, Double)] = // [...]
+
+val weightedRatings = ratings.join(weights).where("category").equalTo(0) {
+  (rating, weight) => (rating.name, rating.points * weight._2)
+}
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ class PointWeighter(JoinFunction):
+   def join(self, rating, weight):
+     return (rating[0], rating[1] * weight[1]) 
+       if value1[3]:
+
+ weightedRatings = 
+   ratings.join(weights).where(0).equal_to(0). \
+   with(new PointWeighter(), (STRING, FLOAT));
+~~~
+
+</div>
+</div>
+
+#### Join with Flat-Join Function
+
+Analogous to Map and FlatMap, a FlatJoin behaves in the same
+way as a Join, but instead of returning one element, it can
+return (collect), zero, one, or more elements.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+public class PointWeighter
+         implements FlatJoinFunction<Rating, Tuple2<String, Double>, Tuple2<String, Double>> {
+  @Override
+  public void join(Rating rating, Tuple2<String, Double> weight,
+	  Collector<Tuple2<String, Double>> out) {
+	if (weight.f1 > 0.1) {
+		out.collect(new Tuple2<String, Double>(rating.name, rating.points * weight.f1));
+	}
+  }
+}
+
+DataSet<Tuple2<String, Double>>
+            weightedRatings =
+            ratings.join(weights) // [...]
+~~~
+
+#### Join with Projection (Java/Python Only)
+
+A Join transformation can construct result tuples using a projection as shown here:
+
+~~~java
+DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
+DataSet<Tuple2<Integer, Double>> input2 = // [...]
+DataSet<Tuple4<Integer, String, Double, Byte>
+            result =
+            input1.join(input2)
+                  // key definition on first DataSet using a field position key
+                  .where(0)
+                  // key definition of second DataSet using a field position key
+                  .equalTo(0)
+                  // select and reorder fields of matching tuples
+                  .projectFirst(0,2).projectSecond(1).projectFirst(1);
+~~~
+
+`projectFirst(int...)` and `projectSecond(int...)` select the fields of the first and second joined input that should be assembled into an output Tuple. The order of indexes defines the order of fields in the output tuple.
+The join projection works also for non-Tuple DataSets. In this case, `projectFirst()` or `projectSecond()` must be called without arguments to add a joined element to the output Tuple.
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+case class Rating(name: String, category: String, points: Int)
+
+val ratings: DataSet[Ratings] = // [...]
+val weights: DataSet[(String, Double)] = // [...]
+
+val weightedRatings = ratings.join(weights).where("category").equalTo(0) {
+  (rating, weight, out: Collector[(String, Double)] =>
+    if (weight._2 > 0.1) out.collect(left.name, left.points * right._2)
+}
+
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+#### Join with Projection (Java/Python Only)
+
+A Join transformation can construct result tuples using a projection as shown here:
+
+~~~python
+ result = input1.join(input2).where(0).equal_to(0) \ 
+  .project_first(0,2).project_second(1).project_first(1);
+~~~
+
+`project_first(int...)` and `project_second(int...)` select the fields of the first and second joined input that should be assembled into an output Tuple. The order of indexes defines the order of fields in the output tuple.
+The join projection works also for non-Tuple DataSets. In this case, `project_first()` or `project_second()` must be called without arguments to add a joined element to the output Tuple.
+
+</div>
+</div>
+
+#### Join with DataSet Size Hint
+
+In order to guide the optimizer to pick the right execution strategy, you can hint the size of a DataSet to join as shown here:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+DataSet<Tuple2<Integer, String>> input1 = // [...]
+DataSet<Tuple2<Integer, String>> input2 = // [...]
+
+DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
+            result1 =
+            // hint that the second DataSet is very small
+            input1.joinWithTiny(input2)
+                  .where(0)
+                  .equalTo(0);
+
+DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
+            result2 =
+            // hint that the second DataSet is very large
+            input1.joinWithHuge(input2)
+                  .where(0)
+                  .equalTo(0);
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val input1: DataSet[(Int, String)] = // [...]
+val input2: DataSet[(Int, String)] = // [...]
+
+// hint that the second DataSet is very small
+val result1 = input1.joinWithTiny(input2).where(0).equalTo(0)
+
+// hint that the second DataSet is very large
+val result1 = input1.joinWithHuge(input2).where(0).equalTo(0)
+
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+
+ #hint that the second DataSet is very small
+ result1 = input1.join_with_tiny(input2).where(0).equal_to(0)
+
+ #hint that the second DataSet is very large
+ result1 = input1.join_with_huge(input2).where(0).equal_to(0)
+
+~~~
+
+</div>
+</div>
+
+#### Join Algorithm Hints
+
+The Flink runtime can execute joins in various ways. Each possible way outperforms the others under
+different circumstances. The system tries to pick a reasonable way automatically, but allows you
+to manually pick a strategy, in case you want to enforce a specific way of executing the join.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+DataSet<SomeType> input1 = // [...]
+DataSet<AnotherType> input2 = // [...]
+
+DataSet<Tuple2<SomeType, AnotherType> result =
+      input1.join(input2, BROADCAST_HASH_FIRST)
+            .where("id").equalTo("key");
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val input1: DataSet[SomeType] = // [...]
+val input2: DataSet[AnotherType] = // [...]
+
+// hint that the second DataSet is very small
+val result1 = input1.join(input2, BROADCAST_HASH_FIRST).where("id").equalTo("key")
+
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+Not supported.
+~~~
+
+</div>
+</div>
+
+The following hints are available:
+
+* OPTIMIZER_CHOOSES: Equivalent to not giving a hint at all, leaves the choice to the system.
+
+* BROADCAST_HASH_FIRST: Broadcasts the first input and builds a hash table from it, which is
+  probed by the second input. A good strategy if the first input is very small.
+
+* BROADCAST_HASH_SECOND: Broadcasts the second input and builds a hash table from it, which is
+  probed by the first input. A good strategy if the second input is very small.
+
+* REPARTITION_HASH_FIRST: The system partitions (shuffles) each input (unless the input is already
+  partitioned) and builds a hash table from the first input. This strategy is good if the first
+  input is smaller than the second, but both inputs are still large.
+  *Note:* This is the default fallback strategy that the system uses if no size estimates can be made
+  and no pre-existing partitiongs and sort-orders can be re-used.
+
+* REPARTITION_HASH_SECOND: The system partitions (shuffles) each input (unless the input is already
+  partitioned) and builds a hash table from the second input. This strategy is good if the second
+  input is smaller than the first, but both inputs are still large.
+
+* REPARTITION_SORT_MERGE: The system partitions (shuffles) each input (unless the input is already
+  partitioned) and sorts each input (unless it is already sorted). The inputs are joined by
+  a streamed merge of the sorted inputs. This strategy is good if one or both of the inputs are
+  already sorted.
+
+
+### Cross
+
+The Cross transformation combines two DataSets into one DataSet. It builds all pairwise combinations of the elements of both input DataSets, i.e., it builds a Cartesian product.
+The Cross transformation either calls a user-defined cross function on each pair of elements or outputs a Tuple2. Both modes are shown in the following.
+
+**Note:** Cross is potentially a *very* compute-intensive operation which can challenge even large compute clusters!
+
+#### Cross with User-Defined Function
+
+A Cross transformation can call a user-defined cross function. A cross function receives one element of the first input and one element of the second input and returns exactly one result element.
+
+The following code shows how to apply a Cross transformation on two DataSets using a cross function:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+public class Coord {
+  public int id;
+  public int x;
+  public int y;
+}
+
+// CrossFunction computes the Euclidean distance between two Coord objects.
+public class EuclideanDistComputer
+         implements CrossFunction<Coord, Coord, Tuple3<Integer, Integer, Double>> {
+
+  @Override
+  public Tuple3<Integer, Integer, Double> cross(Coord c1, Coord c2) {
+    // compute Euclidean distance of coordinates
+    double dist = sqrt(pow(c1.x - c2.x, 2) + pow(c1.y - c2.y, 2));
+    return new Tuple3<Integer, Integer, Double>(c1.id, c2.id, dist);
+  }
+}
+
+DataSet<Coord> coords1 = // [...]
+DataSet<Coord> coords2 = // [...]
+DataSet<Tuple3<Integer, Integer, Double>>
+            distances =
+            coords1.cross(coords2)
+                   // apply CrossFunction
+                   .with(new EuclideanDistComputer());
+~~~
+
+#### Cross with Projection
+
+A Cross transformation can also construct result tuples using a projection as shown here:
+
+~~~java
+DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
+DataSet<Tuple2<Integer, Double>> input2 = // [...]
+DataSet<Tuple4<Integer, Byte, Integer, Double>
+            result =
+            input1.cross(input2)
+                  // select and reorder fields of matching tuples
+                  .projectSecond(0).projectFirst(1,0).projectSecond(1);
+~~~
+
+The field selection in a Cross projection works the same way as in the projection of Join results.
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+case class Coord(id: Int, x: Int, y: Int)
+
+val coords1: DataSet[Coord] = // [...]
+val coords2: DataSet[Coord] = // [...]
+
+val distances = coords1.cross(coords2) {
+  (c1, c2) =>
+    val dist = sqrt(pow(c1.x - c2.x, 2) + pow(c1.y - c2.y, 2))
+    (c1.id, c2.id, dist)
+}
+~~~
+
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ class Euclid(CrossFunction):
+   def cross(self, c1, c2):
+     return (c1[0], c2[0], sqrt(pow(c1[1] - c2.[1], 2) + pow(c1[2] - c2[2], 2)))
+
+ distances = coords1.cross(coords2).using(Euclid(), (INT,INT,FLOAT))
+~~~
+
+#### Cross with Projection
+
+A Cross transformation can also construct result tuples using a projection as shown here:
+
+~~~python
+result = input1.cross(input2).projectFirst(1,0).projectSecond(0,1);
+~~~
+
+The field selection in a Cross projection works the same way as in the projection of Join results.
+
+</div>
+</div>
+
+#### Cross with DataSet Size Hint
+
+In order to guide the optimizer to pick the right execution strategy, you can hint the size of a DataSet to cross as shown here:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+DataSet<Tuple2<Integer, String>> input1 = // [...]
+DataSet<Tuple2<Integer, String>> input2 = // [...]
+
+DataSet<Tuple4<Integer, String, Integer, String>>
+            udfResult =
+                  // hint that the second DataSet is very small
+            input1.crossWithTiny(input2)
+                  // apply any Cross function (or projection)
+                  .with(new MyCrosser());
+
+DataSet<Tuple3<Integer, Integer, String>>
+            projectResult =
+                  // hint that the second DataSet is very large
+            input1.crossWithHuge(input2)
+                  // apply a projection (or any Cross function)
+                  .projectFirst(0,1).projectSecond(1);
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val input1: DataSet[(Int, String)] = // [...]
+val input2: DataSet[(Int, String)] = // [...]
+
+// hint that the second DataSet is very small
+val result1 = input1.crossWithTiny(input2)
+
+// hint that the second DataSet is very large
+val result1 = input1.crossWithHuge(input2)
+
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ #hint that the second DataSet is very small
+ result1 = input1.cross_with_tiny(input2)
+
+ #hint that the second DataSet is very large
+ result1 = input1.cross_with_huge(input2)
+
+~~~
+
+</div>
+</div>
+
+### CoGroup
+
+The CoGroup transformation jointly processes groups of two DataSets. Both DataSets are grouped on a defined key and groups of both DataSets that share the same key are handed together to a user-defined co-group function. If for a specific key only one DataSet has a group, the co-group function is called with this group and an empty group.
+A co-group function can separately iterate over the elements of both groups and return an arbitrary number of result elements.
+
+Similar to Reduce, GroupReduce, and Join, keys can be defined using the different key-selection methods.
+
+#### CoGroup on DataSets
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+The example shows how to group by Field Position Keys (Tuple DataSets only). You can do the same with Pojo-types and key expressions.
+
+~~~java
+// Some CoGroupFunction definition
+class MyCoGrouper
+         implements CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Double>, Double> {
+
+  @Override
+  public void coGroup(Iterable<Tuple2<String, Integer>> iVals,
+                      Iterable<Tuple2<String, Double>> dVals,
+                      Collector<Double> out) {
+
+    Set<Integer> ints = new HashSet<Integer>();
+
+    // add all Integer values in group to set
+    for (Tuple2<String, Integer>> val : iVals) {
+      ints.add(val.f1);
+    }
+
+    // multiply each Double value with each unique Integer values of group
+    for (Tuple2<String, Double> val : dVals) {
+      for (Integer i : ints) {
+        out.collect(val.f1 * i);
+      }
+    }
+  }
+}
+
+// [...]
+DataSet<Tuple2<String, Integer>> iVals = // [...]
+DataSet<Tuple2<String, Double>> dVals = // [...]
+DataSet<Double> output = iVals.coGroup(dVals)
+                         // group first DataSet on first tuple field
+                         .where(0)
+                         // group second DataSet on first tuple field
+                         .equalTo(0)
+                         // apply CoGroup function on each pair of groups
+                         .with(new MyCoGrouper());
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val iVals: DataSet[(String, Int)] = // [...]
+val dVals: DataSet[(String, Double)] = // [...]
+
+val output = iVals.coGroup(dVals).where(0).equalTo(0) {
+  (iVals, dVals, out: Collector[Double]) =>
+    val ints = iVals map { _._2 } toSet
+
+    for (dVal <- dVals) {
+      for (i <- ints) {
+        out.collect(dVal._2 * i)
+      }
+    }
+}
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ class CoGroup(CoGroupFunction):
+   def co_group(self, ivals, dvals, collector):
+     ints = dict()
+     # add all Integer values in group to set
+     for value in ivals:
+       ints[value[1]] = 1
+     # multiply each Double value with each unique Integer values of group
+     for value in dvals:
+       for i in ints.keys():
+         collector.collect(value[1] * i)
+
+
+ output = ivals.co_group(dvals).where(0).equal_to(0).using(CoGroup(), DOUBLE)
+~~~
+
+</div>
+</div>
+
+
+### Union
+
+Produces the union of two DataSets, which have to be of the same type. A union of more than two DataSets can be implemented with multiple union calls, as shown here:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+DataSet<Tuple2<String, Integer>> vals1 = // [...]
+DataSet<Tuple2<String, Integer>> vals2 = // [...]
+DataSet<Tuple2<String, Integer>> vals3 = // [...]
+DataSet<Tuple2<String, Integer>> unioned = vals1.union(vals2).union(vals3);
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val vals1: DataSet[(String, Int)] = // [...]
+val vals2: DataSet[(String, Int)] = // [...]
+val vals3: DataSet[(String, Int)] = // [...]
+
+val unioned = vals1.union(vals2).union(vals3)
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ unioned = vals1.union(vals2).union(vals3)
+~~~
+
+</div>
+</div>
+
+### Rebalance
+Evenly rebalances the parallel partitions of a DataSet to eliminate data skew.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+DataSet<String> in = // [...]
+// rebalance DataSet and apply a Map transformation.
+DataSet<Tuple2<String, String>> out = in.rebalance()
+                                        .map(new Mapper());
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val in: DataSet[String] = // [...]
+// rebalance DataSet and apply a Map transformation.
+val out = in.rebalance().map { ... }
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+Not supported.
+~~~
+
+</div>
+</div>
+
+
+### Hash-Partition
+
+Hash-partitions a DataSet on a given key.
+Keys can be specified as key expressions or field position keys (see [Reduce examples](#reduce-on-grouped-dataset) for how to specify keys).
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+DataSet<Tuple2<String, Integer>> in = // [...]
+// hash-partition DataSet by String value and apply a MapPartition transformation.
+DataSet<Tuple2<String, String>> out = in.partitionByHash(0)
+                                        .mapPartition(new PartitionMapper());
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val in: DataSet[(String, Int)] = // [...]
+// hash-partition DataSet by String value and apply a MapPartition transformation.
+val out = in.partitionByHash(0).mapPartition { ... }
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+Not supported.
+~~~
+
+</div>
+</div>
+
+### Sort Partition
+
+Locally sorts all partitions of a DataSet on a specified field in a specified order.
+Fields can be specified as field expressions or field positions (see [Reduce examples](#reduce-on-grouped-dataset) for how to specify keys).
+Partitions can be sorted on multiple fields by chaining `sortPartition()` calls.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+DataSet<Tuple2<String, Integer>> in = // [...]
+// Locally sort partitions in ascending order on the second String field and
+// in descending order on the first String field.
+// Apply a MapPartition transformation on the sorted partitions.
+DataSet<Tuple2<String, String>> out = in.sortPartition(1, Order.ASCENDING)
+                                          .sortPartition(0, Order.DESCENDING)
+                                        .mapPartition(new PartitionMapper());
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val in: DataSet[(String, Int)] = // [...]
+// Locally sort partitions in ascending order on the second String field and
+// in descending order on the first String field.
+// Apply a MapPartition transformation on the sorted partitions.
+val out = in.sortPartition(1, Order.ASCENDING)
+              .sortPartition(0, Order.DESCENDING)
+            .mapPartition { ... }
+~~~
+
+</div>
+</div>
+
+### First-n
+
+Returns the first n (arbitrary) elements of a DataSet. First-n can be applied on a regular DataSet, a grouped DataSet, or a grouped-sorted DataSet. Grouping keys can be specified as key-selector functions or field position keys (see [Reduce examples](#reduce-on-grouped-dataset) for how to specify keys).
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+DataSet<Tuple2<String, Integer>> in = // [...]
+// Return the first five (arbitrary) elements of the DataSet
+DataSet<Tuple2<String, Integer>> out1 = in.first(5);
+
+// Return the first two (arbitrary) elements of each String group
+DataSet<Tuple2<String, Integer>> out2 = in.groupBy(0)
+                                          .first(2);
+
+// Return the first three elements of each String group ordered by the Integer field
+DataSet<Tuple2<String, Integer>> out3 = in.groupBy(0)
+                                          .sortGroup(1, Order.ASCENDING)
+                                          .first(3);
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val in: DataSet[(String, Int)] = // [...]
+// Return the first five (arbitrary) elements of the DataSet
+val out1 = in.first(5)
+
+// Return the first two (arbitrary) elements of each String group
+val out2 = in.groupBy(0).first(2)
+
+// Return the first three elements of each String group ordered by the Integer field
+val out3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+Not supported.
+~~~
+
+</div>
+</div>

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ee90cc/docs/apis/example_connectors.md
----------------------------------------------------------------------
diff --git a/docs/apis/example_connectors.md b/docs/apis/example_connectors.md
new file mode 100644
index 0000000..bdfa8f9
--- /dev/null
+++ b/docs/apis/example_connectors.md
@@ -0,0 +1,205 @@
+---
+title:  "Connecting to other systems"
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## Reading from filesystems.
+
+Flink has build-in support for the following file systems:
+
+| Filesystem        | Since           | Scheme  | Notes |
+| ------------- |-------------| -----| ------ |
+| Hadoop Distributed File System (HDFS)  | 0.2 | `hdfs://`| All HDFS versions are supported |
+| Amazon S3    |  0.2 | `s3://` |   |
+| MapR file system      | 0.7-incubating      |  `maprfs://` | The user has to manually place the required jar files in the `lib/` dir |
+| Tachyon   |  0.9 | `tachyon://` | Support through Hadoop file system implementation (see below) |
+
+
+
+### Using Hadoop file systems with Apache Flink
+
+Apache Flink allows users to use any file system implementing the `org.apache.hadoop.fs.FileSystem`
+interface. Hadoop ships adapters for FTP, [Hftp](http://hadoop.apache.org/docs/r1.2.1/hftp.html), and others.
+
+Flink has integrated testcases to validate the integration with [Tachyon](http://tachyon-project.org/).
+Other file systems we tested the integration is the
+[Google Cloud Storage Connector for Hadoop](https://cloud.google.com/hadoop/google-cloud-storage-connector) and [XtreemFS](http://www.xtreemfs.org/).
+
+In order to use a Hadoop file system with Flink, make sure that the `flink-conf.yaml` has set the
+`fs.hdfs.hadoopconf` property set to the Hadoop configuration directory.
+In addition to that, the Hadoop configuration (in that directory) needs to have an entry for each supported file system.
+For example for tachyon support, there must be the following entry in the `core-site.xml` file:
+
+~~~xml
+<property>
+  <name>fs.tachyon.impl</name>
+  <value>tachyon.hadoop.TFS</value>
+</property>
+~~~
+
+Also, the required classes for using the file system need to be placed in the `lib/` folder of the Flink installation (on all machines running Flink). If putting the files into the directory is not possible, Flink is also respecting the `HADOOP_CLASSPATH` environment variable to add Hadoop jar files to the classpath.
+
+
+## Connecting to other systems using Input / Output Format wrappers for Hadoop
+
+Apache Flink allows users to access many different systems as data sources or sinks.
+The system is designed for very easy extensibility. Similar to Apache Hadoop, Flink has the concept
+of so called `InputFormat`s and `OutputFormat`s.
+
+One implementation of these `InputFormat`s is the `HadoopInputFormat`. This is a wrapper that allows
+users to use all existing Hadoop input formats with Flink.
+
+This section shows some examples for connecting Flink to other systems.
+[Read more about Hadoop compatibility in Flink](hadoop_compatibility.html).
+
+## Avro support in Flink
+
+Flink has extensive build-in support for [Apache Avro](http://avro.apache.org/). This allows to easily read from Avro files with Flink.
+Also, the serialization framework of Flink is able to handle classes generated from Avro schemas.
+
+In order to read data from an Avro file, you have to specify an `AvroInputFormat`.
+
+**Example**:
+
+~~~java
+AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+DataSet<User> usersDS = env.createInput(users);
+~~~
+
+Note that `User` is a POJO generated by Avro. Flink also allows to perform string-based key selection of these POJOs. For example:
+
+~~~java
+usersDS.groupBy("name")
+~~~
+
+
+Note that using the `GenericData.Record` type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use.
+
+Flink's POJO field selection also works with POJOs generated from Avro. However, the usage is only possible if the field types are written correctly to the generated class. If a field is of type `Object` you can not use the field as a join or grouping key.
+Specifying a field in Avro like this `{"name": "type_double_test", "type": "double"},` works fine, however specifying it as a UNION-type with only one field (`{"name": "type_double_test", "type": ["double"]},`) will generate a field of type `Object`. Note that specifying nullable types (`{"name": "type_double_test", "type": ["null", "double"]},`) is possible!
+
+
+
+### Access Microsoft Azure Table Storage
+
+_Note: This example works starting from Flink 0.6-incubating_
+
+This example is using the `HadoopInputFormat` wrapper to use an existing Hadoop input format implementation for accessing [Azure's Table Storage](https://azure.microsoft.com/en-us/documentation/articles/storage-introduction/).
+
+1. Download and compile the `azure-tables-hadoop` project. The input format developed by the project is not yet available in Maven Central, therefore, we have to build the project ourselves.
+Execute the following commands:
+
+   ~~~bash
+   git clone https://github.com/mooso/azure-tables-hadoop.git
+   cd azure-tables-hadoop
+   mvn clean install
+   ~~~
+
+2. Setup a new Flink project using the quickstarts:
+
+   ~~~bash
+   curl http://flink.apache.org/q/quickstart.sh | bash
+   ~~~
+
+3. Add the following dependencies (in the `<dependencies>` section) to your `pom.xml` file:
+
+   ~~~xml
+   <dependency>
+       <groupId>org.apache.flink</groupId>
+       <artifactId>flink-hadoop-compatibility</artifactId>
+       <version>{{site.version}}</version>
+   </dependency>
+   <dependency>
+     <groupId>com.microsoft.hadoop</groupId>
+     <artifactId>microsoft-hadoop-azure</artifactId>
+     <version>0.0.4</version>
+   </dependency>
+   ~~~
+
+   `flink-hadoop-compatibility` is a Flink package that provides the Hadoop input format wrappers.
+   `microsoft-hadoop-azure` is adding the project we've build before to our project.
+
+The project is now prepared for starting to code. We recommend to import the project into an IDE, such as Eclipse or IntelliJ. (Import as a Maven project!).
+Browse to the code of the `Job.java` file. Its an empty skeleton for a Flink job.
+
+Paste the following code into it:
+
+~~~java
+import java.util.Map;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import com.microsoft.hadoop.azure.AzureTableConfiguration;
+import com.microsoft.hadoop.azure.AzureTableInputFormat;
+import com.microsoft.hadoop.azure.WritableEntity;
+import com.microsoft.windowsazure.storage.table.EntityProperty;
+
+public class AzureTableExample {
+
+  public static void main(String[] args) throws Exception {
+    // set up the execution environment
+    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+    
+    // create a  AzureTableInputFormat, using a Hadoop input format wrapper
+    HadoopInputFormat<Text, WritableEntity> hdIf = new HadoopInputFormat<Text, WritableEntity>(new AzureTableInputFormat(), Text.class, WritableEntity.class, new Job());
+
+    // set the Account URI, something like: https://apacheflink.table.core.windows.net
+    hdIf.getConfiguration().set(AzureTableConfiguration.Keys.ACCOUNT_URI.getKey(), "TODO"); 
+    // set the secret storage key here
+    hdIf.getConfiguration().set(AzureTableConfiguration.Keys.STORAGE_KEY.getKey(), "TODO");
+    // set the table name here
+    hdIf.getConfiguration().set(AzureTableConfiguration.Keys.TABLE_NAME.getKey(), "TODO");
+    
+    DataSet<Tuple2<Text, WritableEntity>> input = env.createInput(hdIf);
+    // a little example how to use the data in a mapper.
+    DataSet<String> fin = input.map(new MapFunction<Tuple2<Text,WritableEntity>, String>() {
+      @Override
+      public String map(Tuple2<Text, WritableEntity> arg0) throws Exception {
+        System.err.println("--------------------------------\nKey = "+arg0.f0);
+        WritableEntity we = arg0.f1;
+
+        for(Map.Entry<String, EntityProperty> prop : we.getProperties().entrySet()) {
+          System.err.println("key="+prop.getKey() + " ; value (asString)="+prop.getValue().getValueAsString());
+        }
+
+        return arg0.f0.toString();
+      }
+    });
+
+    // emit result (this works only locally)
+    fin.print();
+
+    // execute program
+    env.execute("Azure Example");
+  }
+}
+~~~
+
+The example shows how to access an Azure table and turn data into Flink's `DataSet` (more specifically, the type of the set is `DataSet<Tuple2<Text, WritableEntity>>`). With the `DataSet`, you can apply all known transformations to the DataSet.
+
+## Access MongoDB
+
+This [GitHub repository documents how to use MongoDB with Apache Flink (starting from 0.7-incubating)](https://github.com/okkam-it/flink-mongodb-test).
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ee90cc/docs/apis/examples.md
----------------------------------------------------------------------
diff --git a/docs/apis/examples.md b/docs/apis/examples.md
new file mode 100644
index 0000000..29d1e1c
--- /dev/null
+++ b/docs/apis/examples.md
@@ -0,0 +1,490 @@
+---
+title:  "Bundled Examples"
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+The following example programs showcase different applications of Flink 
+from simple word counting to graph algorithms. The code samples illustrate the 
+use of [Flink's API](programming_guide.html). 
+
+The full source code of the following and more examples can be found in the __flink-java-examples__
+or __flink-scala-examples__ module.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Word Count
+WordCount is the "Hello World" of Big Data processing systems. It computes the frequency of words in a text collection. The algorithm works in two steps: First, the texts are splits the text to individual words. Second, the words are grouped and counted.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+DataSet<String> text = env.readTextFile("/path/to/file"); 
+
+DataSet<Tuple2<String, Integer>> counts = 
+        // split up the lines in pairs (2-tuples) containing: (word,1)
+        text.flatMap(new Tokenizer())
+        // group by the tuple field "0" and sum up tuple field "1"
+        .groupBy(0)
+        .sum(1);
+
+counts.writeAsCsv(outputPath, "\n", " ");
+
+// User-defined functions
+public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+
+    @Override
+    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+        // normalize and split the line
+        String[] tokens = value.toLowerCase().split("\\W+");
+        
+        // emit the pairs
+        for (String token : tokens) {
+            if (token.length() > 0) {
+                out.collect(new Tuple2<String, Integer>(token, 1));
+            }   
+        }
+    }
+}
+~~~
+
+The {% gh_link /flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java  "WordCount example" %} implements the above described algorithm with input parameters: `<text input path>, <output path>`. As test data, any text file will do.
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+// get input data
+val text = env.readTextFile("/path/to/file")
+
+val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
+  .map { (_, 1) }
+  .groupBy(0)
+  .sum(1)
+
+counts.writeAsCsv(outputPath, "\n", " ")
+~~~
+
+The {% gh_link /flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala  "WordCount example" %} implements the above described algorithm with input parameters: `<text input path>, <output path>`. As test data, any text file will do.
+
+
+</div>
+</div>
+
+## Page Rank
+
+The PageRank algorithm computes the "importance" of pages in a graph defined by links, which point from one pages to another page. It is an iterative graph algorithm, which means that it repeatedly applies the same computation. In each iteration, each page distributes its current rank over all its neighbors, and compute its new rank as a taxed sum of the ranks it received from its neighbors. The PageRank algorithm was popularized by the Google search engine which uses the importance of webpages to rank the results of search queries.
+
+In this simple example, PageRank is implemented with a [bulk iteration](iterations.html) and a fixed number of iterations.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+// read the pages and initial ranks by parsing a CSV file
+DataSet<Tuple2<Long, Double>> pagesWithRanks = env.readCsvFile(pagesInputPath)
+						   .types(Long.class, Double.class)
+
+// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
+DataSet<Tuple2<Long, Long[]>> pageLinkLists = getLinksDataSet(env);
+
+// set iterative data set
+IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);
+
+DataSet<Tuple2<Long, Double>> newRanks = iteration
+        // join pages with outgoing edges and distribute rank
+        .join(pageLinkLists).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch())
+        // collect and sum ranks
+        .groupBy(0).sum(1)
+        // apply dampening factor
+        .map(new Dampener(DAMPENING_FACTOR, numPages));
+
+DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(
+        newRanks, 
+        newRanks.join(iteration).where(0).equalTo(0)
+        // termination condition
+        .filter(new EpsilonFilter()));
+
+finalPageRanks.writeAsCsv(outputPath, "\n", " ");
+
+// User-defined functions
+
+public static final class JoinVertexWithEdgesMatch 
+                    implements FlatJoinFunction<Tuple2<Long, Double>, Tuple2<Long, Long[]>, 
+                                            Tuple2<Long, Double>> {
+
+    @Override
+    public void join(<Tuple2<Long, Double> page, Tuple2<Long, Long[]> adj, 
+                        Collector<Tuple2<Long, Double>> out) {
+        Long[] neigbors = adj.f1;
+        double rank = page.f1;
+        double rankToDistribute = rank / ((double) neigbors.length);
+            
+        for (int i = 0; i < neigbors.length; i++) {
+            out.collect(new Tuple2<Long, Double>(neigbors[i], rankToDistribute));
+        }
+    }
+}
+
+public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
+    private final double dampening, randomJump;
+
+    public Dampener(double dampening, double numVertices) {
+        this.dampening = dampening;
+        this.randomJump = (1 - dampening) / numVertices;
+    }
+
+    @Override
+    public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
+        value.f1 = (value.f1 * dampening) + randomJump;
+        return value;
+    }
+}
+
+public static final class EpsilonFilter 
+                implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
+
+    @Override
+    public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) {
+        return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
+    }
+}
+~~~
+
+The {% gh_link /flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java "PageRank program" %} implements the above example.
+It requires the following parameters to run: `<pages input path>, <links input path>, <output path>, <num pages>, <num iterations>`.
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+// User-defined types
+case class Link(sourceId: Long, targetId: Long)
+case class Page(pageId: Long, rank: Double)
+case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
+
+// set up execution environment
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+// read the pages and initial ranks by parsing a CSV file
+val pages = env.readCsvFile[Page](pagesInputPath)
+
+// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
+val links = env.readCsvFile[Link](linksInputPath)
+
+// assign initial ranks to pages
+val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages))
+
+// build adjacency list from link input
+val adjacencyLists = links
+  // initialize lists
+  .map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
+  // concatenate lists
+  .groupBy("sourceId").reduce {
+  (l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
+  }
+
+// start iteration
+val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
+  currentRanks =>
+    val newRanks = currentRanks
+      // distribute ranks to target pages
+      .join(adjacencyLists).where("pageId").equalTo("sourceId") {
+        (page, adjacent, out: Collector[Page]) =>
+        for (targetId <- adjacent.targetIds) {
+          out.collect(Page(targetId, page.rank / adjacent.targetIds.length))
+        }
+      }
+      // collect ranks and sum them up
+      .groupBy("pageId").aggregate(SUM, "rank")
+      // apply dampening factor
+      .map { p =>
+        Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages))
+      }
+
+    // terminate if no rank update was significant
+    val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
+      (current, next, out: Collector[Int]) =>
+        // check for significant update
+        if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
+    }
+
+    (newRanks, termination)
+}
+
+val result = finalRanks
+
+// emit result
+result.writeAsCsv(outputPath, "\n", " ")
+~~~
+
+he {% gh_link /flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala "PageRank program" %} implements the above example.
+It requires the following parameters to run: `<pages input path>, <links input path>, <output path>, <num pages>, <num iterations>`.
+</div>
+</div>
+
+Input files are plain text files and must be formatted as follows:
+- Pages represented as an (long) ID separated by new-line characters.
+    * For example `"1\n2\n12\n42\n63\n"` gives five pages with IDs 1, 2, 12, 42, and 63.
+- Links are represented as pairs of page IDs which are separated by space characters. Links are separated by new-line characters:
+    * For example `"1 2\n2 12\n1 12\n42 63\n"` gives four (directed) links (1)->(2), (2)->(12), (1)->(12), and (42)->(63).
+
+For this simple implementation it is required that each page has at least one incoming and one outgoing link (a page can point to itself).
+
+## Connected Components
+
+The Connected Components algorithm identifies parts of a larger graph which are connected by assigning all vertices in the same connected part the same component ID. Similar to PageRank, Connected Components is an iterative algorithm. In each step, each vertex propagates its current component ID to all its neighbors. A vertex accepts the component ID from a neighbor, if it is smaller than its own component ID.
+
+This implementation uses a [delta iteration](iterations.html): Vertices that have not changed their component ID do not participate in the next step. This yields much better performance, because the later iterations typically deal only with a few outlier vertices.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+// read vertex and edge data
+DataSet<Long> vertices = getVertexDataSet(env);
+DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env).flatMap(new UndirectEdge());
+
+// assign the initial component IDs (equal to the vertex ID)
+DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());
+        
+// open a delta iteration
+DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
+        verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0);
+
+// apply the step logic: 
+DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset()
+        // join with the edges
+        .join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())
+        // select the minimum neighbor component ID
+        .groupBy(0).aggregate(Aggregations.MIN, 1)
+        // update if the component ID of the candidate is smaller
+        .join(iteration.getSolutionSet()).where(0).equalTo(0)
+        .flatMap(new ComponentIdFilter());
+
+// close the delta iteration (delta and new workset are identical)
+DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);
+
+// emit result
+result.writeAsCsv(outputPath, "\n", " ");
+
+// User-defined functions
+
+public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {
+    
+    @Override
+    public Tuple2<T, T> map(T vertex) {
+        return new Tuple2<T, T>(vertex, vertex);
+    }
+}
+
+public static final class UndirectEdge 
+                    implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+    Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();
+    
+    @Override
+    public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
+        invertedEdge.f0 = edge.f1;
+        invertedEdge.f1 = edge.f0;
+        out.collect(edge);
+        out.collect(invertedEdge);
+    }
+}
+
+public static final class NeighborWithComponentIDJoin 
+                implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+    @Override
+    public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
+        return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);
+    }
+}
+
+public static final class ComponentIdFilter 
+                    implements FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, 
+                                            Tuple2<Long, Long>> {
+
+    @Override
+    public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value, 
+                        Collector<Tuple2<Long, Long>> out) {
+        if (value.f0.f1 < value.f1.f1) {
+            out.collect(value.f0);
+        }
+    }
+}
+~~~
+
+The {% gh_link /flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java "ConnectedComponents program" %} implements the above example. It requires the following parameters to run: `<vertex input path>, <edge input path>, <output path> <max num iterations>`.
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+// set up execution environment
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+// read vertex and edge data
+// assign the initial components (equal to the vertex id)
+val vertices = getVerticesDataSet(env).map { id => (id, id) }
+
+// undirected edges by emitting for each input edge the input edges itself and an inverted
+// version
+val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) }
+
+// open a delta iteration
+val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array(0)) {
+  (s, ws) =>
+
+    // apply the step logic: join with the edges
+    val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) =>
+      (edge._2, vertex._2)
+    }
+
+    // select the minimum neighbor
+    val minNeighbors = allNeighbors.groupBy(0).min(1)
+
+    // update if the component of the candidate is smaller
+    val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) {
+      (newVertex, oldVertex, out: Collector[(Long, Long)]) =>
+        if (newVertex._2 < oldVertex._2) out.collect(newVertex)
+    }
+
+    // delta and new workset are identical
+    (updatedComponents, updatedComponents)
+}
+
+verticesWithComponents.writeAsCsv(outputPath, "\n", " ")
+    
+~~~
+
+The {% gh_link /flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala "ConnectedComponents program" %} implements the above example. It requires the following parameters to run: `<vertex input path>, <edge input path>, <output path> <max num iterations>`.
+</div>
+</div>
+
+Input files are plain text files and must be formatted as follows:
+- Vertices represented as IDs and separated by new-line characters.
+    * For example `"1\n2\n12\n42\n63\n"` gives five vertices with (1), (2), (12), (42), and (63).
+- Edges are represented as pairs for vertex IDs which are separated by space characters. Edges are separated by new-line characters:
+    * For example `"1 2\n2 12\n1 12\n42 63\n"` gives four (undirected) links (1)-(2), (2)-(12), (1)-(12), and (42)-(63).
+
+## Relational Query
+
+The Relational Query example assumes two tables, one with `orders` and the other with `lineitems` as specified by the [TPC-H decision support benchmark](http://www.tpc.org/tpch/). TPC-H is a standard benchmark in the database industry. See below for instructions how to generate the input data.
+
+The example implements the following SQL query.
+
+~~~sql
+SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
+    FROM orders, lineitem
+WHERE l_orderkey = o_orderkey
+    AND o_orderstatus = "F" 
+    AND YEAR(o_orderdate) > 1993
+    AND o_orderpriority LIKE "5%"
+GROUP BY l_orderkey, o_shippriority;
+~~~
+
+The Flink program, which implements the above query looks as follows.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+// get orders data set: (orderkey, orderstatus, orderdate, orderpriority, shippriority)
+DataSet<Tuple5<Integer, String, String, String, Integer>> orders = getOrdersDataSet(env);
+// get lineitem data set: (orderkey, extendedprice)
+DataSet<Tuple2<Integer, Double>> lineitems = getLineitemDataSet(env);
+
+// orders filtered by year: (orderkey, custkey)
+DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
+        // filter orders
+        orders.filter(
+            new FilterFunction<Tuple5<Integer, String, String, String, Integer>>() {
+                @Override
+                public boolean filter(Tuple5<Integer, String, String, String, Integer> t) {
+                    // status filter
+                    if(!t.f1.equals(STATUS_FILTER)) {
+                        return false;
+                    // year filter
+                    } else if(Integer.parseInt(t.f2.substring(0, 4)) <= YEAR_FILTER) {
+                        return false;
+                    // order priority filter
+                    } else if(!t.f3.startsWith(OPRIO_FILTER)) {
+                        return false;
+                    }
+                    return true;
+                }
+            })
+        // project fields out that are no longer required
+        .project(0,4).types(Integer.class, Integer.class);
+
+// join orders with lineitems: (orderkey, shippriority, extendedprice)
+DataSet<Tuple3<Integer, Integer, Double>> lineitemsOfOrders = 
+        ordersFilteredByYear.joinWithHuge(lineitems)
+                            .where(0).equalTo(0)
+                            .projectFirst(0,1).projectSecond(1)
+                            .types(Integer.class, Integer.class, Double.class);
+
+// extendedprice sums: (orderkey, shippriority, sum(extendedprice))
+DataSet<Tuple3<Integer, Integer, Double>> priceSums = 
+        // group by order and sum extendedprice
+        lineitemsOfOrders.groupBy(0,1).aggregate(Aggregations.SUM, 2);
+
+// emit result
+priceSums.writeAsCsv(outputPath);
+~~~
+
+The {% gh_link /flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/RelationalQuery.java "Relational Query program" %} implements the above query. It requires the following parameters to run: `<orders input path>, <lineitem input path>, <output path>`.
+
+</div>
+<div data-lang="scala" markdown="1">
+Coming soon...
+
+The {% gh_link /flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/RelationalQuery.scala "Relational Query program" %} implements the above query. It requires the following parameters to run: `<orders input path>, <lineitem input path>, <output path>`.
+
+</div>
+</div>
+
+The orders and lineitem files can be generated using the [TPC-H benchmark](http://www.tpc.org/tpch/) suite's data generator tool (DBGEN). 
+Take the following steps to generate arbitrary large input files for the provided Flink programs:
+
+1.  Download and unpack DBGEN
+2.  Make a copy of *makefile.suite* called *Makefile* and perform the following changes:
+
+~~~bash
+DATABASE = DB2
+MACHINE  = LINUX
+WORKLOAD = TPCH
+CC       = gcc
+~~~
+
+1.  Build DBGEN using *make*
+2.  Generate lineitem and orders relations using dbgen. A scale factor
+    (-s) of 1 results in a generated data set with about 1 GB size.
+
+~~~bash
+./dbgen -T o -s 1
+~~~

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ee90cc/docs/apis/fig/LICENSE.txt
----------------------------------------------------------------------
diff --git a/docs/apis/fig/LICENSE.txt b/docs/apis/fig/LICENSE.txt
new file mode 100644
index 0000000..35b8673
--- /dev/null
+++ b/docs/apis/fig/LICENSE.txt
@@ -0,0 +1,17 @@
+All image files in the folder and its subfolders are
+licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ee90cc/docs/apis/fig/iterations_delta_iterate_operator.png
----------------------------------------------------------------------
diff --git a/docs/apis/fig/iterations_delta_iterate_operator.png b/docs/apis/fig/iterations_delta_iterate_operator.png
new file mode 100644
index 0000000..470485a
Binary files /dev/null and b/docs/apis/fig/iterations_delta_iterate_operator.png differ

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ee90cc/docs/apis/fig/iterations_delta_iterate_operator_example.png
----------------------------------------------------------------------
diff --git a/docs/apis/fig/iterations_delta_iterate_operator_example.png b/docs/apis/fig/iterations_delta_iterate_operator_example.png
new file mode 100644
index 0000000..15f2b54
Binary files /dev/null and b/docs/apis/fig/iterations_delta_iterate_operator_example.png differ

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ee90cc/docs/apis/fig/iterations_iterate_operator.png
----------------------------------------------------------------------
diff --git a/docs/apis/fig/iterations_iterate_operator.png b/docs/apis/fig/iterations_iterate_operator.png
new file mode 100644
index 0000000..aaf4158
Binary files /dev/null and b/docs/apis/fig/iterations_iterate_operator.png differ

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ee90cc/docs/apis/fig/iterations_iterate_operator_example.png
----------------------------------------------------------------------
diff --git a/docs/apis/fig/iterations_iterate_operator_example.png b/docs/apis/fig/iterations_iterate_operator_example.png
new file mode 100644
index 0000000..be4841c
Binary files /dev/null and b/docs/apis/fig/iterations_iterate_operator_example.png differ

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ee90cc/docs/apis/fig/iterations_supersteps.png
----------------------------------------------------------------------
diff --git a/docs/apis/fig/iterations_supersteps.png b/docs/apis/fig/iterations_supersteps.png
new file mode 100644
index 0000000..331dbc7
Binary files /dev/null and b/docs/apis/fig/iterations_supersteps.png differ

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ee90cc/docs/apis/fig/plan_visualizer.png
----------------------------------------------------------------------
diff --git a/docs/apis/fig/plan_visualizer.png b/docs/apis/fig/plan_visualizer.png
new file mode 100644
index 0000000..85b8c55
Binary files /dev/null and b/docs/apis/fig/plan_visualizer.png differ