You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:29:34 UTC

[52/60] git commit: Rewrite Java API Guide as Unified Programming Guide

Rewrite Java API Guide as Unified Programming Guide

This now covers both Java and Scala.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/97d630d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/97d630d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/97d630d5

Branch: refs/heads/master
Commit: 97d630d5dbf8239818722880b933a582e3223055
Parents: feade05
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Sep 19 11:24:49 2014 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 22 09:59:59 2014 +0200

----------------------------------------------------------------------
 docs/iterations.md        |    2 +-
 docs/programming_guide.md | 1121 +++++++++++++++++++++++++++++++++++-----
 docs/scala_api_guide.md   | 1042 -------------------------------------
 3 files changed, 1000 insertions(+), 1165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/97d630d5/docs/iterations.md
----------------------------------------------------------------------
diff --git a/docs/iterations.md b/docs/iterations.md
index cb1a1fb..315b9f0 100644
--- a/docs/iterations.md
+++ b/docs/iterations.md
@@ -9,7 +9,7 @@ Iterative algorithms occur in many domains of data analysis, such as *machine le
 
 Flink programs implement iterative algorithms by defining a **step function** and embedding it into a special iteration operator. There are two  variants of this operator: **Iterate** and **Delta Iterate**. Both operators repeatedly invoke the step function on the current iteration state until a certain termination condition is reached.
 
-Here, we provide background on both operator variants and outline their usage. The [programming guides](java_api_guide.html) explain how to implement the operators in both [Scala](scala_api_guide.html) and [Java](java_api_guide.html#iterations). We also provide a **vertex-centric graph processing API** called [Spargel](spargel_guide.html).
+Here, we provide background on both operator variants and outline their usage. The [programming guide](programming_guide.html) explain how to implement the operators in both Scala and Java. We also provide a **vertex-centric graph processing API** called [Spargel](spargel_guide.html).
 
 The following table provides an overview of both operators:
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/97d630d5/docs/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/programming_guide.md b/docs/programming_guide.md
index 79a4820..883e769 100644
--- a/docs/programming_guide.md
+++ b/docs/programming_guide.md
@@ -11,7 +11,7 @@ title: "Flink Programming Guide"
 Introduction
 ------------
 
-Analysis programs in Flink are regular Java programs that implement transformations on data sets
+Analysis programs in Flink are regular programs that implement transformations on data sets
 (e.g., filtering, mapping, joining, grouping). The data sets are initially created from certain
 sources (e.g., by reading files, or from collections). Results are returned via sinks, which may for
 example write the data to (distributed) files, or to standard output (for example the command line
@@ -19,16 +19,18 @@ terminal). Flink programs run in a variety of contexts, standalone, or embedded
 The execution can happen in a local JVM, or on clusters of many machines.
 
 In order to create your own Flink program, we encourage you to start with the
-[program skeleton](#skeleton) and gradually add your own [transformations](#transformations).
-The remaining sections act as references for additional operations and advanced features.
+[program skeleton](#program-skeleton) and gradually add your own
+[transformations](#transformations). The remaining sections act as references for additional
+operations and advanced features.
 
 
 Example Program
 ---------------
 
 The following program is a complete, working example of WordCount. You can copy &amp; paste the code
-to run it locally. You only have to include Flink's Java API library into your project (see Section
-[Linking with Flink](#linking)) and specify the imports. Then you are ready to go!
+to run it locally. You only have to include the correct Flink's library into your project
+(see Section [Linking with Flink](#linking-with-flink)) and specify the imports. Then you are ready
+to go!
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -71,8 +73,9 @@ object WordCount {
   def main(args: Array[String]) {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val text = env.fromElements("Who's there?",
-            "I think I hear them. Stand, ho! Who's there?")
+    val text = env.fromElements(
+      "Who's there?",
+      "I think I hear them. Stand, ho! Who's there?")
 
     val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
       .map { (_, 1) }
@@ -95,22 +98,38 @@ object WordCount {
 Linking with Flink
 ------------------
 
-To write programs with Flink, you need to include Flinkā€™s Java API library in your project.
+To write programs with Flink, you need to include the Flink library corresponding to
+your programming language in your project.
 
-The simplest way to do this is to use the [quickstart scripts](java_api_quickstart.html). They
+The simplest way to do this is to use one of the quickstart scripts: either for
+[Java](java_api_quickstart.html) or for [Scala](scala_api_quickstart.html). They
 create a blank project from a template (a Maven Archetype), which sets up everything for you. To
 manually create the project, you can use the archetype and create a project by calling:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight bash %}
 mvn archetype:generate /
     -DarchetypeGroupId=org.apache.flink/
     -DarchetypeArtifactId=flink-quickstart-java /
     -DarchetypeVersion={{site.FLINK_VERSION_STABLE }}
 {% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight bash %}
+mvn archetype:generate /
+    -DarchetypeGroupId=org.apache.flink/
+    -DarchetypeArtifactId=flink-quickstart-scala /
+    -DarchetypeVersion={{site.FLINK_VERSION_STABLE }}
+{% endhighlight %}
+</div>
+</div>
 
 If you want to add Flink to an existing Maven project, add the following entry to your
 *dependencies* section in the *pom.xml* file of your project:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight xml %}
 <dependency>
   <groupId>org.apache.flink</groupId>
@@ -123,6 +142,22 @@ If you want to add Flink to an existing Maven project, add the following entry t
   <version>{{site.FLINK_VERSION_STABLE }}</version>
 </dependency>
 {% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-scala</artifactId>
+  <version>{{site.FLINK_VERSION_STABLE }}</version>
+</dependency>
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-clients</artifactId>
+  <version>{{site.FLINK_VERSION_STABLE }}</version>
+</dependency>
+{% endhighlight %}
+</div>
+</div>
 
 If you are using Flink together with Hadoop, the version of the dependency may vary depending on the
 version of Hadoop (or more specifically, HDFS) that you want to use Flink with. Please refer to the
@@ -141,6 +176,9 @@ file and [run it on a cluster](cluster_execution.html), you can skip that depend
 Program Skeleton
 ----------------
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
 As we already saw in the example, Flink programs look like regular Java
 programs with a `main()` method. Each program consists of the same basic parts:
 
@@ -189,14 +227,14 @@ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 DataSet<String> text = env.readTextFile("file:///path/to/file");
 {% endhighlight %}
 
-This will give you a `DataSet` on which you can then apply transformations. For
+This will give you a DataSet on which you can then apply transformations. For
 more information on data sources and input formats, please refer to
 [Data Sources](#data_sources).
 
-Once you have a `DataSet` you can apply transformations to create a new
-`DataSet` which you can then write to a file, transform again, or
-combine with other `DataSet`s. You apply transformations by calling
-methods on `DataSet` with your own custom transformation function. For example,
+Once you have a DataSet you can apply transformations to create a new
+DataSet which you can then write to a file, transform again, or
+combine with other DataSets. You apply transformations by calling
+methods on DataSet with your own custom transformation function. For example,
 a map transformation looks like this:
 
 {% highlight java %}
@@ -210,12 +248,12 @@ DataSet<Integer> tokenized = text.map(new MapFunction<String, Integer>() {
 });
 {% endhighlight %}
 
-This will create a new `DataSet` by converting every String in the original
+This will create a new DataSet by converting every String in the original
 set to an Integer. For more information and a list of all the transformations,
 please refer to [Transformations](#transformations).
 
-Once you have a `DataSet` that needs to be written to disk you call one
-of these methods on `DataSet`:
+Once you have a DataSet that needs to be written to disk you call one
+of these methods on DataSet:
 
 {% highlight java %}
 writeAsText(String path)
@@ -226,14 +264,112 @@ print()
 {% endhighlight %}
 
 The last method is only useful for developing/debugging on a local machine,
-it will output the contents of the `DataSet` to standard output. (Note that in
+it will output the contents of the DataSet to standard output. (Note that in
+a cluster, the result goes to the standard out stream of the cluster nodes and ends
+up in the *.out* files of the workers).
+The first two do as the name suggests, the third one can be used to specify a
+custom data output format. Please refer
+to [Data Sinks](#data_sinks) for more information on writing to files and also
+about custom data output formats.
+
+Once you specified the complete program you need to call `execute` on
+the `ExecutionEnvironment`. This will either execute on your local
+machine or submit your program for execution on a cluster, depending on
+how you created the execution environment.
+
+</div>
+<div data-lang="scala" markdown="1">
+
+As we already saw in the example, Flink programs look like regular Scala
+programs with a `main()` method. Each program consists of the same basic parts:
+
+1. Obtain an `ExecutionEnvironment`,
+2. Load/create the initial data,
+3. Specify transformations on this data,
+4. Specify where to put the results of your computations, and
+5. Execute your program.
+
+We will now give an overview of each of those steps but please refer to the respective sections for
+more details. Note that all core classes of the Scala API are found in the package 
+{% gh_link /flink-scala/src/main/scala/org/apache/flink/api/scala "org.apache.flink.api.scala" %}.
+
+
+The `ExecutionEnvironment` is the basis for all Flink programs. You can
+obtain one using these static methods on class `ExecutionEnvironment`:
+
+{% highlight scala %}
+def getExecutionEnvironment
+
+def createLocalEnvironment(degreeOfParallelism: Int = Runtime.getRuntime.availableProcessors()))
+
+def createRemoteEnvironment(host: String, port: String, jarFiles: String*)
+def createRemoteEnvironment(host: String, port: String, degreeOfParallelism: Int, jarFiles: String*)
+{% endhighlight %}
+
+Typically, you only need to use `getExecutionEnvironment()`, since this
+will do the right thing depending on the context: if you are executing
+your program inside an IDE or as a regular Scala program it will create
+a local environment that will execute your program on your local machine. If
+you created a JAR file from you program, and invoke it through the [command line](cli.html)
+or the [web interface](web_client.html),
+the Flink cluster manager will
+execute your main method and `getExecutionEnvironment()` will return
+an execution environment for executing your program on a cluster.
+
+For specifying data sources the execution environment has several methods
+to read from files using various methods: you can just read them line by line,
+as CSV files, or using completely custom data input formats. To just read
+a text file as a sequence of lines, you can use:
+
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment()
+
+val text = env.readTextFile("file:///path/to/file")
+{% endhighlight %}
+
+This will give you a DataSet on which you can then apply transformations. For
+more information on data sources and input formats, please refer to
+[Data Sources](#data_sources).
+
+Once you have a DataSet you can apply transformations to create a new
+DataSet which you can then write to a file, transform again, or
+combine with other DataSets. You apply transformations by calling
+methods on DataSet with your own custom transformation function. For example,
+a map transformation looks like this:
+
+{% highlight scala %}
+val input: DataSet[String] = ...
+
+val mapped = text.map { x => x.toInt }
+{% endhighlight %}
+
+This will create a new DataSet by converting every String in the original
+set to an Integer. For more information and a list of all the transformations,
+please refer to [Transformations](#transformations).
+
+Once you have a DataSet that needs to be written to disk you can call one
+of these methods on DataSet:
+
+{% highlight scala %}
+def writeAsText(path: String, writeMode: WriteMode = WriteMode.NO_OVERWRITE)
+def writeAsCsv(
+    filePath: String,
+    rowDelimiter: String = "\n",
+    fieldDelimiter: String = ',',
+    writeMode: WriteMode = WriteMode.NO_OVERWRITE)
+def write(outputFormat: FileOutputFormat[T],
+    path: String,
+    writeMode: WriteMode = WriteMode.NO_OVERWRITE)
+
+def print()
+{% endhighlight %}
+
+The last method is only useful for developing/debugging on a local machine,
+it will output the contents of the DataSet to standard output. (Note that in
 a cluster, the result goes to the standard out stream of the cluster nodes and ends
 up in the *.out* files of the workers).
 The first two do as the name suggests, the third one can be used to specify a
-custom data output format. Keep in mind, that these calls do not actually
-write to a file yet. Only when your program is completely specified and you
-call the `execute` method on your `ExecutionEnvironment` are all the
-transformations executed and is data written to disk. Please refer
+custom data output format. Please refer
 to [Data Sinks](#data_sinks) for more information on writing to files and also
 about custom data output formats.
 
@@ -242,6 +378,10 @@ the `ExecutionEnvironment`. This will either execute on your local
 machine or submit your program for execution on a cluster, depending on
 how you created the execution environment.
 
+</div>
+</div>
+
+
 [Back to top](#top)
 
 
@@ -267,9 +407,14 @@ Data transformations transform one or more DataSets into a new DataSet. Programs
 multiple transformations into sophisticated assemblies.
 
 This section gives a brief overview of the available transformations. The [transformations
-documentation](java_api_transformations.html) has full description of all transformations with
+documentation](dataset_transformations.html) has a full description of all transformations with
 examples.
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
+
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -396,7 +541,7 @@ DataSet<Tuple3<Integer, String, Double>> output = input.sum(0).andMin(2);
         Joins two data sets by creating all pairs of elements that are equal on their keys.
         Optionally uses a JoinFunction to turn the pair of elements into a single element, or a
         FlatJoinFunction to turn the pair of elements into arbitararily many (including none)
-        elements. See <a href="#defining-keys">keys</a> on how to define join keys.
+        elements. See <a href="#specifying-keys">keys</a> on how to define join keys.
 {% highlight java %}
 result = input1.join(input2)
                .where(0)       // key of the first input (tuple field 0)
@@ -410,7 +555,7 @@ result = input1.join(input2)
       <td>
         <p>The two-dimensional variant of the reduce operation. Groups each input on one or more
         fields and then joins the groups. The transformation function is called per pair of groups.
-        See <a href="#defining-keys">keys</a> on how to define coGroup keys.</p>
+        See <a href="#specifying-keys">keys</a> on how to define coGroup keys.</p>
 {% highlight java %}
 data1.coGroup(data2)
      .where(0)
@@ -477,16 +622,170 @@ DataSet<Tuple2<String, Integer>> out = in.project(2,0).types(String.class, Integ
   </tbody>
 </table>
 
-The [parallelism](#parallelism) of a transformation can be defined by `setParallelism(int)`.
+</div>
+<div data-lang="scala" markdown="1">
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td><strong>Map</strong></td>
+      <td>
+        <p>Takes one element and produces one element.</p>
+{% highlight scala %}
+data.map { x => x.toInt }
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>FlatMap</strong></td>
+      <td>
+        <p>Takes one element and produces zero, one, or more elements. </p>
+{% highlight scala %}
+data.flatMap { str => str.split(" ") }
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>MapPartition</strong></td>
+      <td>
+        <p>Transforms a parallel partition in a single function call. The function get the partition
+        as a `TraversableOnce` and can produce an arbitrary number of result values. The number of
+        elements in each partition depends on the degree-of-parallelism and previous operations.</p>
+{% highlight scala %}
+data.mapPartition { in => in map { (_, 1) } }
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Filter</strong></td>
+      <td>
+        <p>Evaluates a boolean function for each element and retains those for which the function
+        returns true.</p>
+{% highlight scala %}
+data.filter { _ > 1000 }
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Reduce</strong></td>
+      <td>
+        <p>Combines a group of elements into a single element by repeatedly combining two elements
+        into one. Reduce may be applied on a full data set, or on a grouped data set.</p>
+{% highlight scala %}
+data.reduce { _ + _ }
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>ReduceGroup</strong></td>
+      <td>
+        <p>Combines a group of elements into one or more elements. ReduceGroup may be applied on a
+        full data set, or on a grouped data set.</p>
+{% highlight scala %}
+data.reduceGroup { elements => elements.sum }
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Aggregate</strong></td>
+      <td>
+        <p>Aggregates a group of values into a single value. Aggregation functions can be thought of
+        as built-in reduce functions. Aggregate may be applied on a full data set, or on a grouped
+        data set.</p>
+{% highlight scala %}
+val input: DataSet[(Int, String, Double)] = // [...]
+val output: DataSet[(Int, String, Doublr)] = input.aggregate(SUM, 0).aggregate(MIN, 2);
+{% endhighlight %}
+  <p>You can also use short-hand syntax for minimum, maximum, and sum aggregations.</p>
+{% highlight scala %}
+val input: DataSet[(Int, String, Double)] = // [...]
+val output: DataSet[(Int, String, Doublr)] = input.sum(0).min(2)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    </tr>
+      <td><strong>Join</strong></td>
+      <td>
+        Joins two data sets by creating all pairs of elements that are equal on their keys.
+        Optionally uses a JoinFunction to turn the pair of elements into a single element, or a
+        FlatJoinFunction to turn the pair of elements into arbitararily many (including none)
+        elements. See <a href="#specifying-keys">keys</a> on how to define join keys.
+{% highlight java %}
+// In this case tuple fields are used as keys. "0" is the join field on the first tuple
+// "1" is the join field on the second tuple.
+val result = input1.join(input2).where(0).equalTo(1)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>CoGroup</strong></td>
+      <td>
+        <p>The two-dimensional variant of the reduce operation. Groups each input on one or more
+        fields and then joins the groups. The transformation function is called per pair of groups.
+        See <a href="#specifying-keys">keys</a> on how to define coGroup keys.</p>
+{% highlight java %}
+data1.coGroup(data2).where(0).equalTo(1)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Cross</strong></td>
+      <td>
+        <p>Builds the Cartesian product (cross product) of two inputs, creating all pairs of
+        elements. Optionally uses a CrossFunction to turn the pair of elements into a single
+        element</p>
+{% highlight java %}
+val data1: DataSet[Int] = // [...]
+val data2: DataSet[String] = // [...]
+val result: DataSet[(Int, String)] = data1.cross(data2)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>Union</strong></td>
+      <td>
+        <p>Produces the union of two data sets.</p>
+{% highlight scala %}
+data.union(data2)
+{% endhighlight %}
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+</div>
+
+The [parallelism](#parallelism) of a transformation can be defined by `setParallelism(int)` while
 `name(String)` assigns a custom name to a transformation which is helpful for debugging. The same is
 possible for [Data Sources](#data_sources) and [Data Sinks](#data_sinks).
 
 [Back to Top](#top)
 
 
-Defining Keys
+Specifying Keys
 -------------
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
 Some transformations (join, coGroup) require that a key is defined on
 its argument DataSets, and other transformations (Reduce, GroupReduce,
 Aggregate) allow that the DataSet is grouped on a key before they are
@@ -515,8 +814,8 @@ DataSet<Tuple3<Integer,String,Long> grouped = input
 {% endhighlight %}
 
 The data set is grouped on the first field of the tuples (the one of
-Integer type). The GroupReduceFunction will thus receive groups with
-the same value of the first field.
+Integer type). The GroupReduceFunction will thus receive groups of tuples with
+the same value in the first field.
 
 {% highlight java %}
 DataSet<Tuple3<Integer,String,Long>> input = // [...]
@@ -570,14 +869,100 @@ DataSet<Tuple2<String, Double>>
                             });
 {% endhighlight %}
 
+</div>
+
+<div data-lang="scala" markdown="1">
+Some transformations (join, coGroup) require that a key is defined on
+its argument DataSets, and other transformations (Reduce, GroupReduce,
+Aggregate) allow that the DataSet is grouped on a key before they are
+applied.
+
+A DataSet is grouped as
+{% highlight scala %}
+val input: DataSet[...] = // [...]
+val reduced = input
+  .groupBy(/*define key here*/)
+  .reduceGroup(/*do something*/)
+{% endhighlight %}
+
+The data model of Flink is not based on key-value pairs. Therefore,
+you do not need to physically pack the data set types into keys and
+values. Keys are "virtual": they are defined as functions over the
+actual data to guide the grouping operator.
+
+The simplest case is grouping a data set of Case Classes on one or more
+of it's fields:
+{% highlight scala %}
+case class WordCount(docId: Int, word: String, count: Int)
+val input: DataSet[WordCount] = // [...]
+val grouped = input
+  .groupBy("word")
+  .reduceGroup(/*do something*/)
+{% endhighlight %}
+
+The data set is grouped on the second field of the Case Class (the one of
+String type). The group reduce function will thus receive groups of elements with
+the same value in the second field.
+
+{% highlight scala %}
+val input: DataSet[WordCount] = // [...]
+val grouped = input
+  .groupBy("docId", "word")
+  .reduceGroup(/*do something*/);
+{% endhighlight %}
+
+Here the DataSet is grouped on the composite key consisting of the first and the
+second fields, therefore the group reduce function will receive groups
+with the same value in both fields.
+
+As a special case, fields of Tuple DataSets can also be specified by (zero-based) index:
+{% highlight scala %}
+val input: DataSet[(Int, String, Int)] = // [...]
+val grouped = input
+  .groupBy(0, 1)
+  .reduceGroup(/*do something*/);
+{% endhighlight %}
+
+For DataSets that don't contain Case Classes or Tuples, key definition is done via a "key selector"
+function, which takes as argument one dataset element and must return a key of an
+arbitrary data type. For example:
+{% highlight scala %}
+// some ordinary object
+class WC {
+  val word: String
+  val count: Int
+}
+val words: DataSet[WC] = // [...]
+val counts: DataSet[WC] = words groupBy { _.word } reduce { /*do something*/}
+{% endhighlight %}
+
+Remember that keys are not only used for grouping, but also joining and matching data sets:
+{% highlight scala %}
+// some object
+case class Rating(name: String, category: String, points: Int)
+val ratings: DataSet[Rating] = // [...]
+val weights: DataSet[(String, Double)] = // [...]
+
+// Tuples are also Case Classes in Scala, so we could use this:
+val weightedRatings = ratings.join(weights).where("category").equalTo("_1")
+
+// Or This:
+val weightedRatings2 = ratings.join(weights).where("category").equalTo(0)
+{% endhighlight %}
+</div>
+</div>
+
+
 [Back to top](#top)
 
 
-Functions
----------
+Passing Functions to Flink
+--------------------------
 
-You can define a user-defined function and pass it to the DataSet
-transformations in several ways:
+Operations require user-defined functions. This section lists several ways for doing this.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 
 #### Implementing an interface
 
@@ -618,18 +1003,23 @@ data.reduce((i1,i2) -> i1 + i2);
 
 All transformations that take as argument a user-defined function can
 instead take as argument a *rich* function. For example, instead of
+
 {% highlight java %}
 class MyMapFunction implements MapFunction<String, Integer> {
   public Integer map(String value) { return Integer.parseInt(value); }
 });
 {% endhighlight %}
+
 you can write
+
 {% highlight java %}
 class MyMapFunction extends RichMapFunction<String, Integer> {
   public Integer map(String value) { return Integer.parseInt(value); }
 });
 {% endhighlight %}
+
 and pass the function as usual to a `map` transformation:
+
 {% highlight java %}
 data.map(new MyMapFunction());
 {% endhighlight %}
@@ -641,6 +1031,59 @@ data.map (new RichMapFunction<String, Integer>() {
 });
 {% endhighlight %}
 
+</div>
+<div data-lang="scala" markdown="1">
+
+
+#### Lambda Functions
+
+As already seen in previous examples all operations accept lambda functions for describing
+the operation:
+{% highlight scala %}
+val data: DataSet[String] = // [...]
+data.filter { _.startsWith("http://") }
+{% endhighlight %}
+
+{% highlight scala %}
+val data: DataSet[Int] = // [...]
+data.reduce { (i1,i2) => i1 + i2 }
+// or
+data.reduce { _ + _ }
+{% endhighlight %}
+
+#### Rich functions
+
+All transformations that take as argument a lambda function can
+instead take as argument a *rich* function. For example, instead of
+
+{% highlight scala %}
+data.map { x => x.toInt }
+{% endhighlight %}
+
+you can write
+
+{% highlight scala %}
+class MyMapFunction extends RichMapFunction[String, Int] {
+  def map(in: String):Int = { in.toInt }
+})
+{% endhighlight %}
+
+and pass the function to a `map` transformation:
+
+{% highlight scala %}
+data.map(new MyMapFunction())
+{% endhighlight %}
+
+Rich functions can also be defined as an anonymous class:
+{% highlight scala %}
+data.map (new RichMapFunction[String, Int] {
+  def map(in: String):Int = { in.toInt }
+})
+{% endhighlight %}
+</div>
+
+</div>
+
 Rich functions provide, in addition to the user-defined function (map,
 reduce, etc), four methods: `open`, `close`, `getRuntimeContext`, and
 `setRuntimeContext`. These are useful for creating and finalizing
@@ -653,7 +1096,7 @@ on iterations (see [Iterations](#iterations)).
 In particular for the `reduceGroup` transformation, using a rich
 function is the only way to define an optional `combine` function. See
 the
-[transformations documentation](java_api_transformations.html)
+[transformations documentation](dataset_transformations.html)
 for a complete example.
 
 [Back to top](#top)
@@ -662,30 +1105,30 @@ for a complete example.
 Data Types
 ----------
 
-The Java API is strongly typed: All data sets and transformations accept typed elements. This
-catches type errors very early and supports safe refactoring of programs. The API supports various
-different data types for the input and output of operators. Both `DataSet` and functions like
-`MapFunction`, `ReduceFunction`, etc. are parameterized with data types using Java generics in order
-to ensure type-safety.
+Flink places some restrictions on the type of elements that are used in DataSets and as results
+of transformations. The reason for this is that the system analyzes the types to determine
+efficient execution strategies.
 
-There are four different categories of data types, which are treated slightly different:
+There are four different categories of data types, which are treated slightly different when it
+to [specifying keys](#specifying-keys):
 
-1. **Regular Types**
-2. **Tuples**
+1. **General Types**
+2. **Tuples**/**Case Classes**
 3. **Values**
 4. **Hadoop Writables**
 
 
-#### Regular Types
+#### General Types
 
-Out of the box, the Java API supports all common basic Java types: `Byte`, `Short`, `Integer`,
-`Long`, `Float`, `Double`, `Boolean`, `Character`, `String`.
+Out of the box, Flink supports all primitive types of your programming language of choice.
 
-Furthermore, you can use the vast majority of custom Java classes. Restrictions apply to classes
+Furthermore, you can use the vast majority of custom classes. Restrictions apply to classes
 containing fields that cannot be serialized, like File pointers, I/O streams, or other native
 resources. Classes that follow the Java Beans conventions work well in general. The following
 defines a simple example class to illustrate how you can use custom classes:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 public class WordWithCount {
 
@@ -700,11 +1143,23 @@ public class WordWithCount {
     }
 }
 {% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class WordWithCount(val word: String, val count: Int) {
+    def this() {
+      this(null, -1)
+    }
+}
+{% endhighlight %}
+</div>
+</div>
 
-You can use all of those types to parameterize `DataSet` and function implementations, e.g.
-`DataSet<String>` for a `String` data set or `MapFunction<String, Integer>` for a mapper from
-`String` to `Integer`.
+You can use all of those types to parameterize DataSet and function implementations, e.g.
+`DataSet` or a `MapFunction`.
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 // using a basic data type
 DataSet<String> numbers = env.fromElements("1", "2");
@@ -728,11 +1183,37 @@ wordCounts.map(new MapFunction<WordCount, Integer>() {
     }
 });
 {% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// using a primitive data type
+// Note that the type ascription "DataSet[String]" can be omitted in Scala
+// it is just given to clarify the type of numbers
+val numbers: DataSet[String] = env.fromElements("1", "2")
+
+numbers.map(new MapFunction[String, Int]() {
+    def map(in: String): Int = {
+      in.toInt
+    }
+})
+
+// using a custom class
+val wordCounts = env.fromElements(
+  new WordCount("hello", 1),
+  new WordCount("world", 2))
+
+wordCounts.map { _.count }
+{% endhighlight %}
+</div>
+</div>
+
 
 When working with operators that require a Key for grouping or matching records
-you need to implement a `KeySelector` for your custom type (see
-[Defining Keys](#defining-keys)).
+you need to implement a key selector function for your custom type (see
+[Specifying Keys](#specifying-keys)).
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 wordCounts.groupBy(new KeySelector<WordCount, String>() {
     public String getKey(WordCount v) {
@@ -740,8 +1221,19 @@ wordCounts.groupBy(new KeySelector<WordCount, String>() {
     }
 }).reduce(new MyReduceFunction());
 {% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+wordCounts groupBy { _.word } reduce(new MyReduceFunction())
+{% endhighlight %}
+</div>
+</div>
+
 
-#### Tuples
+#### Tuples/Case Classes
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 
 You can use the `Tuple` classes for composite types. Tuples contain a fix number of fields of
 various types. The Java API provides classes from `Tuple1` up to `Tuple25`. Every field of a tuple
@@ -779,21 +1271,46 @@ the field positions. See this
 {% gh_link /flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery3.java "example" %} for an
 illustration how to make use of that mechanism.
 
+</div>
+<div data-lang="scala" markdown="1">
+
+Flink has special support for Scala's Case Classes and Tuples. When using working with an operator
+that required a key for grouping or matching records this key can be specified using tuple field
+positions or field names: 
+
+{% highlight scala %}
+case class WordCount(word: String, count: Int)
+val input = env.fromElements(
+    WordCount("hello", 1),
+    WordCount("world", 2))
+
+input.groupBy("word").reduce(...)
+
+val input2 = env.fromElements(("hello", 1), ("world", 2))
+
+input2.groupBy(0, 1).reduce(...)
+{% endhighlight %}
+
+Both variants allow specifying more than one key field name or key field position. See
+[specifying keys](#specifying-keys) for more details.
+
+</div>
+</div>
 
 #### Values
 
-*Value types describe their serialization and deserialization manually. Instead of going through a
-*general purpose serialization framework, they provide custom code for those operations by means
-*implementing the `org.apache.flinktypes.Value` interface with the methods `read` and `write`. Using
-*a Value type is reasonable when general purpose serialization would be highly inefficient. An
-*example would be a data type that implements a sparse vector of elements as an array. Knowing that
-*the array is mostly zero, one can use a special encoding for the non-zero elements, while the
-*general purpose serialization would simply write all array elements.
+*Value* types describe their serialization and deserialization manually. Instead of going through a
+general purpose serialization framework, they provide custom code for those operations by means of
+implementing the `org.apache.flinktypes.Value` interface with the methods `read` and `write`. Using
+a Value type is reasonable when general purpose serialization would be highly inefficient. An
+example would be a data type that implements a sparse vector of elements as an array. Knowing that
+the array is mostly zero, one can use a special encoding for the non-zero elements, while the
+general purpose serialization would simply write all array elements.
 
 The `org.apache.flinktypes.CopyableValue` interface supports manual internal cloning logic in a
 similar way.
 
-Flink comes with pre-defined Value types that correspond to Java's basic data types. (`ByteValue`,
+Flink comes with pre-defined Value types that correspond to basic data types. (`ByteValue`,
 `ShortValue`, `IntValue`, `LongValue`, `FloatValue`, `DoubleValue`, `StringValue`, `CharValue`,
 `BooleanValue`). These Value types act as mutable variants of the basic data types: Their value can
 be altered, allowing programmers to reuse objects and take pressure off the garbage collector.
@@ -807,7 +1324,9 @@ defined in the `write()`and `readFields()` methods will be used for serializatio
 
 #### Type Erasure & Type Inferrence
 
-The Java compiler throws away much of the generic type information after the compilation. This is
+*Note: This Section is only relevant for Java.*
+
+The Java compiler throws away much of the generic type information after compilation. This is
 known as *type erasure* in Java. It means that at runtime, an instance of an object does not know
 its generic type any more. For example, instances of `DataSet<String>` and `DataSet<Long>` look the
 same to the JVM.
@@ -832,8 +1351,92 @@ usually be inferred by the result types of the previous operations.
 [Back to top](#top)
 
 
-Data Sources
-------------
+Data Sources
+------------
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+Data sources create the initial data sets, such as from files or from Java collections. The general
+mechanism of of creating data sets is abstracted behind an
+{% gh_link /flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java "InputFormat"%}.
+Flink comes
+with several built-in formats to create data sets from common file formats. Many of them have
+shortcut methods on the *ExecutionEnvironment*.
+
+File-based:
+
+- `readTextFile(path)` / `TextInputFormat` - Reads files line wise and returns them as Strings.
+- `readTextFileWithValue(path)` / `TextValueInputFormat` - Reads files line wise and returns them as
+  StringValues. StringValues are mutable strings.
+- `readCsvFile(path)` / `CsvInputFormat` - Parses files of comma (or another char) delimited fields.
+  Returns a DataSet of tuples. Supports the basic java types and their Value counterparts as field
+  types.
+
+Collection-based:
+
+- `fromCollection(Collection)` - Creates a data set from the Java Java.util.Collection. All elements
+  in the collection must be of the same type.
+- `fromCollection(Iterator, Class)` - Creates a data set from an iterator. The class specifies the
+  data type of the elements returned by the iterator.
+- `fromElements(T ...)` - Creates a data set from the given sequence of objects. All objects must be
+  of the same type.
+- `fromParallelCollection(SplittableIterator, Class)` - Creates a data set from an iterator, in
+  parallel. The class specifies the data type of the elements returned by the iterator.
+- `generateSequence(from, to)` - Generates the squence of numbers in the given interval, in
+  parallel.
+
+Generic:
+
+- `readFile(inputFormat, path)` / `FileInputFormat` - Accepts a file input format.
+- `createInput(inputFormat)` / `InputFormat` - Accepts a generic input format.
+
+**Examples**
+
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+// read text file from local files system
+DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");
+
+// read text file from a HDFS running at nnHost:nnPort
+DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");
+
+// read a CSV file with three fields
+DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
+	                       .types(Integer.class, String.class, Double.class);
+
+// read a CSV file with five fields, taking only two of them
+DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
+                               .includeFields("10010")  // take the first and the fourth field
+	                       .types(String.class, Double.class);
+
+// create a set from some given elements
+DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar");
+
+// generate a number sequence
+DataSet<Long> numbers = env.generateSequence(1, 10000000);
+
+// Read data from a relational database using the JDBC input format
+DataSet<Tuple2<String, Integer> dbData = 
+    env.createInput(
+      // create and configure input format
+      JDBCInputFormat.buildJDBCInputFormat()
+                     .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+                     .setDBUrl("jdbc:derby:memory:persons")
+                     .setQuery("select name, age from persons")
+                     .finish(),
+      // specify type information for DataSet
+      new TupleTypeInfo(Tuple2.class, STRING_TYPE_INFO, INT_TYPE_INFO)
+    );
+
+// Note: Flink's program compiler needs to infer the data types of the data items which are returned
+// by an InputFormat. If this information cannot be automatically inferred, it is necessary to
+// manually provide the type information as shown in the examples above.
+{% endhighlight %}
+
+</div>
+<div data-lang="scala" markdown="1">
 
 Data sources create the initial data sets, such as from files or from Java collections. The general
 mechanism of of creating data sets is abstracted behind an
@@ -853,69 +1456,64 @@ File-based:
 
 Collection-based:
 
-- `fromCollection(Collection)` - Creates a data set from the Java Java.util.Collection. All elements
+- `fromCollection(Seq)` - Creates a data set from a Seq. All elements
   in the collection must be of the same type.
-- `fromCollection(Iterator, Class)` - Creates a data set from an iterator. The class specifies the
+- `fromCollection(Iterator)` - Creates a data set from an Iterator. The class specifies the
   data type of the elements returned by the iterator.
-- `fromElements(T ...)` - Creates a data set from the given sequence of objects. All objects must be
-  of the same type.
-- `fromParallelCollection(SplittableIterator, Class)` - Creates a data set from an iterator, in
+- `fromElements(elements: _*)` - Creates a data set from the given sequence of objects. All objects
+  must be of the same type.
+- `fromParallelCollection(SplittableIterator)` - Creates a data set from an iterator, in
   parallel. The class specifies the data type of the elements returned by the iterator.
 - `generateSequence(from, to)` - Generates the squence of numbers in the given interval, in
   parallel.
 
 Generic:
 
-- `createInput(path)` / `InputFormat` - Accepts a generic input format.
+- `readFile(inputFormat, path)` / `FileInputFormat` - Accepts a file input format.
+- `createInput(inputFormat)` / `InputFormat` - Accepts a generic input format.
 
 **Examples**
 
-{% highlight java %}
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+{% highlight scala %}
+val env  = ExecutionEnvironment.getExecutionEnvironment
 
 // read text file from local files system
-DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");
+val localLiens = env.readTextFile("file:///path/to/my/textfile")
 
 // read text file from a HDFS running at nnHost:nnPort
-DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");
+val hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")
 
 // read a CSV file with three fields
-DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
-	                       .types(Integer.class, String.class, Double.class);
+val csvInput = env.readCsvFile[(Int, String, Double)]("hdfs:///the/CSV/file")
 
 // read a CSV file with five fields, taking only two of them
-DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
-                               .includeFields("10010")  // take the first and the fourth fild
-	                       .types(String.class, Double.class);
+val csvInput = env.readCsvFile[(String, Double)](
+  "hdfs:///the/CSV/file",
+  includedFields = Array(0, 3)) // take the first and the fourth field
+
+// CSV input can also be used with Case Classes
+case class MyInput(str: String, dbl: Double)
+val csvInput = env.readCsvFile[MyInput](
+  "hdfs:///the/CSV/file",
+  includedFields = Array(0, 3)) // take the first and the fourth field
 
 // create a set from some given elements
-DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar");
+val values = env.fromElements("Foo", "bar", "foobar", "fubar")
 
 // generate a number sequence
-DataSet<Long> numbers = env.generateSequence(1, 10000000);
-
-// Read data from a relational database using the JDBC input format
-DataSet<Tuple2<String, Integer> dbData = 
-    env.createInput(
-      // create and configure input format
-      JDBCInputFormat.buildJDBCInputFormat()
-                     .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
-                     .setDBUrl("jdbc:derby:memory:persons")
-                     .setQuery("select name, age from persons")
-                     .finish(),
-      // specify type information for DataSet
-      new TupleTypeInfo(Tuple2.class, STRING_TYPE_INFO, INT_TYPE_INFO)
-    );
-
-// Note: Flink's program compiler needs to infer the data types of the data items which are returned
-// by an InputFormat. If this information cannot be automatically inferred, it is necessary to
-// manually provide the type information as shown in the examples above. {% endhighlight %}
+val numbers = env.generateSequence(1, 10000000);
+{% endhighlight %}
+</div>
+</div>
 
 [Back to top](#top)
 
 Data Sinks
 ----------
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
 Data sinks consume DataSets and are used to store or return them. Data sink operations are described
 using an
 {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormat.java "OutputFormat" %}.
@@ -926,7 +1524,7 @@ DataSet:
   obtained by calling the *toString()* method of each element.
 - `writeAsFormattedText()` / `TextOutputFormat` - Write elements line-wise as Strings. The Strings
   are obtained by calling a user-defined *format()* method for each element.
-- `writeAsCsv` / `CsvOutputFormat` - Writes tuples as comma-separated value files. Row and field
+- `writeAsCsv(...)` / `CsvOutputFormat` - Writes tuples as comma-separated value files. Row and field
   delimiters are configurable. The value for each field comes from the *toString()* method of the objects.
 - `print()` / `printToErr()` - Prints the *toString()* value of each element on the
   standard out / strandard error stream.
@@ -987,8 +1585,61 @@ myResult.output(
     );
 {% endhighlight %}
 
-[Back to top](#top)
+</div>
+<div data-lang="scala" markdown="1">
+Data sinks consume DataSets and are used to store or return them. Data sink operations are described
+using an
+{% gh_link /flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormat.java "OutputFormat" %}.
+Flink comes with a variety of built-in output formats that are encapsulated behind operations on the
+DataSet:
+
+- `writeAsText()` / `TextOuputFormat` - Writes elements line-wise as Strings. The Strings are
+  obtained by calling the *toString()* method of each element.
+- `writeAsCsv(...)` / `CsvOutputFormat` - Writes tuples as comma-separated value files. Row and field
+  delimiters are configurable. The value for each field comes from the *toString()* method of the objects.
+- `print()` / `printToErr()` - Prints the *toString()* value of each element on the
+  standard out / strandard error stream.
+- `write()` / `FileOutputFormat` - Method and base class for custom file outputs. Supports
+  custom object-to-bytes conversion.
+- `output()`/ `OutputFormat` - Most generic output method, for data sinks that are not file based
+  (such as storing the result in a database).
+
+A DataSet can be input to multiple operations. Programs can write or print a data set and at the
+same time run additional transformations on them.
+
+**Examples**
+
+Standard data sink methods:
+
+{% highlight scala %}
+// text data 
+val textData: DataSet[String] = // [...]
+
+// write DataSet to a file on the local file system
+textData.writeAsText("file:///my/result/on/localFS")
+
+// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
+textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS")
+
+// write DataSet to a file and overwrite the file if it exists
+textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE)
+
+// tuples as lines with pipe as the separator "a|b|c"
+val values: DataSet[(String, Int, Double)] = // [...]
+values.writeAsCsv("file:///path/to/the/result/file", "\n", "|")
+
+// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
+values.writeAsText("file:///path/to/the/result/file");
+
+// this wites values as strings using a user-defined formatting
+values map { tuple => tuple._1 + " - " + tuple._2 }
+  .writeAsText("file:///path/to/the/result/file")
+{% endhighlight %}
+
+</div>
+</div>
 
+[Back to top](#top)
 
 Debugging
 ---------
@@ -1008,6 +1659,9 @@ start the LocalEnvironement from an IDE, you can set breakpoint in your code and
 program.
 
 A LocalEnvironment is created and used as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
 
@@ -1015,18 +1669,32 @@ DataSet<String> lines = env.readTextFile(pathToTextFile);
 // build your program
 
 env.execute();
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
 
+{% highlight scala %}
+val env = ExecutionEnvironment.createLocalEnvironment()
+
+val lines = env.readTextFile(pathToTextFile)
+// build your program
+
+env.execute();
 {% endhighlight %}
+</div>
+</div>
 
 ### Collection Data Sources and Sinks
 
-Providing input for an analysis program and checking its output is cumbersome done by creating input
-files and reading output files. Flink features special data sources and sinks which are backed by
-Java collections to ease testing. Once a program has been tested, the sources and sinks can be
+Providing input for an analysis program and checking its output is cumbersome when done by creating
+input files and reading output files. Flink features special data sources and sinks which are backed
+by Java collections to ease testing. Once a program has been tested, the sources and sinks can be
 easily replaced by sources and sinks that read from / write to external data stores such as HDFS.
 
 Collection data sources can be used as follows:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
 
@@ -1042,10 +1710,6 @@ Iterator<Long> longIt = ...
 DataSet<Long> myLongs = env.fromCollection(longIt, Long.class);
 {% endhighlight %}
 
-**Note: Currently, the collection data source requires that data types and iterators implement
-**`Serializable`. Furthermore, collection data sources can not be executed in parallel (degree of
-**parallelism = 1).
-
 A collection data sink is specified as follows:
 
 {% highlight java %}
@@ -1057,8 +1721,30 @@ myResult.output(new LocalCollectionOutputFormat(outData));
 
 **Note:** Currently, the collection data sink is restricted to local execution, as a debugging tool.
 
-[Back to top](#top)
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.createLocalEnvironment()
+
+// Create a DataSet from a list of elements
+val myInts = env.fromElements(1, 2, 3, 4, 5)
 
+// Create a DataSet from any Collection
+val data: Seq[(String, Int)] = ...
+val myTuples = env.fromCollection(data)
+
+// Create a DataSet from an Iterator
+val longIt: Iterator[Long] = ...
+val myLongs = env.fromCollection(longIt)
+{% endhighlight %}
+</div>
+</div>
+
+**Note:** Currently, the collection data source requires that data types and iterators implement
+`Serializable`. Furthermore, collection data sources can not be executed in parallel (degree of
+parallelism = 1).
+
+[Back to top](#top)
 
 Iteration Operators
 -------------------
@@ -1071,9 +1757,12 @@ into the next iteration. There are two types of iterations in Flink: **BulkItera
 This section provides quick examples on how to use both operators. Check out the [Introduction to
 Iterations](iterations.html) page for a more detailed introduction.
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
 #### Bulk Iterations
 
-To create a BulkIteration call the `iterate(int)` method of the `DataSet` the iteration should start
+To create a BulkIteration call the `iterate(int)` method of the DataSet the iteration should start
 at. This will return an `IterativeDataSet`, which can be transformed with the regular operators. The
 single argument to the iterate call specifies the maximum number of iterations.
 
@@ -1173,6 +1862,98 @@ iteration.closeWith(deltas, nextWorkset)
 	.writeAsCsv(outputPath);
 {% endhighlight %}
 
+</div>
+<div data-lang="scala" markdown="1">
+#### Bulk Iterations
+
+To create a BulkIteration call the `iterate(int)` method of the DataSet the iteration should start
+at and also specify a step function. The step function gets the input DataSet for the current
+iteration and must return a new DataSet. The parameter of the iterate call is the maximum number
+of iterations after which to stop.
+
+There is also the `iterateWithTermination(int)` function that accepts a step function that
+returns two DataSets: The result of the iteration step and a termination criterion. The iterations
+are stopped once the termination criterion DataSet is empty.
+
+The following example iteratively estimates the number Pi. The goal is to count the number of random
+points, which fall into the unit circle. In each iteration, a random point is picked. If this point
+lies inside the unit circle, we increment the count. Pi is then estimated as the resulting count
+divided by the number of iterations multiplied by 4.
+
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment()
+
+// Create initial DataSet
+val initial = env.fromElements(0)
+
+val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
+  val result = iterationInput.map { i => 
+    val x = Math.random()
+    val y = Math.randon()
+    i + (if (x * x + y * y < 1) 1 else 0)
+  }
+  result
+}
+
+val result = count map { c => c / 10000.0 * 4 }
+
+result.print()
+
+env.execute("Iterative Pi Example");
+{% endhighlight %}
+
+You can also check out the
+{% gh_link /flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala "K-Means example" %},
+which uses a BulkIteration to cluster a set of unlabeled points.
+
+#### Delta Iterations
+
+Delta iterations exploit the fact that certain algorithms do not change every data point of the
+solution in each iteration.
+
+In addition to the partial solution that is fed back (called workset) in every iteration, delta
+iterations maintain state across iterations (called solution set), which can be updated through
+deltas. The result of the iterative computation is the state after the last iteration. Please refer
+to the [Introduction to Iterations](iterations.html) for an overview of the basic principle of delta
+iterations.
+
+Defining a DeltaIteration is similar to defining a BulkIteration. For delta iterations, two data
+sets form the input to each iteration (workset and solution set), and two data sets are produced as
+the result (new workset, solution set delta) in each iteration.
+
+To create a DeltaIteration call the `iterateDelta(initialWorkset, maxIterations, key)` on the
+initial solution set. The step function takes two parameters: (solutionSet, workset), and must
+return two values: (solutionSetDelta, newWorkset).
+
+Below is an example for the syntax of a delta iteration
+
+{% highlight scala %}
+// read the initial data sets
+val initialSolutionSet: DataSet[(Long, Double)] = // [...]
+
+val initialWorkset: DataSet[(Long, Double)] = // [...]
+
+val maxIterations = 100
+val keyPosition = 0
+
+val result = initialSolutionSet.iterateDelta(initialWorkset, maxIterations, Array(keyPosition)) {
+  (solution, workset) =>
+    val candidateUpdates = workset.groupBy(1).reduceGroup(new ComputeCandidateChanges())
+    val deltas = candidateUpdates.join(solution).where(0).equalTo(0)(new CompareChangesToCurrent())
+
+    val nextWorkset = deltas.filter(new FilterByThreshold())
+
+    (deltas, nextWorkset)
+}
+
+result.writeAsCsv(outputPath)
+
+env.execute()
+{% endhighlight %}
+
+</div>
+</div>
+
 [Back to top](#top)
 
 
@@ -1186,9 +1967,11 @@ allow the system to reason about reusing sort orders or partitions across multip
 semantic annotations may eventually save the program from unnecessary data shuffling or unnecessary
 sorts.
 
-Semantic annotations can be attached to functions through Java annotations, or passed as arguments
+Semantic annotations can be attached to functions through Annotations, or passed as arguments
 when invoking a function on a DataSet. The following example illustrates that:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 @ConstantFields("1")
 public class DivideFirstbyTwo implements MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
@@ -1199,6 +1982,20 @@ public class DivideFirstbyTwo implements MapFunction<Tuple2<Integer, Integer>, T
   }
 }
 {% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@ConstantFields("1")
+class DivideFirstbyTwo extends MapFunction[(Int, Int), (Int, Int)]{
+   def map(input: (Int, Int): (Int, Int) = {
+    value.f0 /= 2;
+    (input._1 / 2, input._2)
+  }
+}
+{% endhighlight %}
+
+</div>
+</div>
 
 The following annotations are currently available:
 
@@ -1220,8 +2017,8 @@ The following annotations are currently available:
 * `@ConstantFieldsSecondExcept`: Declares that all fields of the second input are constant, except
   for the specified fields. Applicable to functions with a two input data sets.
 
-*(Note: The system currently evaluated annotations only on Tuple data types. 
-his will be extended in the next versions)*
+*(Note: The system currently only evaluates annotations only Tuple DataSets.  This will be extended
+in the next versions)*
 
 **Note**: It is important to be conservative when providing annotations. Only annotate fields,
 when they are always constant for every call to the function. Otherwise the system has incorrect
@@ -1236,12 +2033,14 @@ Broadcast Variables
 
 Broadcast variables allow you to make a data set available to all parallel instances of an
 operation, in addition to the regular input of the operation. This is useful for auxiliary data
-sets, or data-dependent parameterization. The data set will then be accessible at the operator as an
-`Collection<T>`.
+sets, or data-dependent parameterization. The data set will then be accessible at the operator as a
+Collection.
 
 - **Broadcast**: broadcast sets are registered by name via `withBroadcastSet(DataSet, String)`, and
 - **Access**: accessible via `getRuntimeContext().getBroadcastVariable(String)` at the target operator.
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 // 1. The DataSet to be broadcasted
 DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);
@@ -1266,6 +2065,34 @@ data.map(new MapFunction<String, String>() {
 Make sure that the names (`broadcastSetName` in the previous example) match when registering and
 accessing broadcasted data sets. For a complete example program, have a look at
 {% gh_link /flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/clustering/KMeans.java#L96 "KMeans Algorithm" %}.
+</div>
+<div data-lang="scala" markdown="1">
+
+{% highlight scala %}
+// 1. The DataSet to be broadcasted
+val toBroadcast = env.fromElements(1, 2, 3)
+
+val data = env.fromElements("a", "b")
+
+data.map(new RichMapFunction[String, String]() {
+    var broadcastSet: Traversable[String] = null
+
+    override def open(config: Configuration): Unit = {
+      // 3. Access the broadcasted DataSet as a Collection
+      broadcastSet = getRuntimeContext().getBroadcastVariable[String]("broadcastSetName").asScala
+    }
+
+    def map(in: String): String = {
+        ...
+    }
+}).withBroadcastSet(toBroadcast, "broadcastSetName") // 2. Broadcast the DataSet
+{% endhighlight %}
+
+Make sure that the names (`broadcastSetName` in the previous example) match when registering and
+accessing broadcasted data sets. For a complete example program, have a look at
+{% gh_link /flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala#L96 "KMeans Algorithm" %}.
+</div>
+</div>
 
 **Note**: As the content of broadcast variables is kept in-memory on each node, it should not become
 too large. For simpler things like scalar values you can simply make parameters part of the closure
@@ -1291,8 +2118,8 @@ interface. If the Flink program is invoked differently than through these interf
 environment will act like a local environment.
 
 To package the program, simply export all involved classes as a JAR file. The JAR file's manifest
-must point to the class that contains the program's *entry point* (the class with the `public void
-main(String[])` method). The simplest way to do this is by putting the *main-class* entry into the
+must point to the class that contains the program's *entry point* (the class with the public
+`main` method). The simplest way to do this is by putting the *main-class* entry into the
 manifest (such as `main-class: org.apache.flinkexample.MyProgram`). The *main-class* attribute is
 the same one that is used by the Java Virtual Machine to find the main method when executing a JAR
 files through the command `java -jar pathToTheJarFile`. Most IDEs offer to include that attribute
@@ -1301,11 +2128,11 @@ automatically when exporting JAR files.
 
 #### Packaging Programs through Plans
 
-Additionally, the Java API supports packaging programs as *Plans*. This method resembles the way
-that the *Scala API* packages programs. Instead of defining a progam in the main method and calling
+Additionally, we support packaging programs as *Plans*. Instead of defining a progam in the main
+method and calling
 `execute()` on the environment, plan packaging returns the *Program Plan*, which is a description of
 the program's data flow. To do that, the program must implement the
-`org.apache.flinkapi.common.Program` interface, defining the `getPlan(String...)` method. The
+`org.apache.flink.api.common.Program` interface, defining the `getPlan(String...)` method. The
 strings passed to that method are the command line arguments. The program's plan can be created from
 the environment via the `ExecutionEnvironment#createProgramPlan()` method. When packaging the
 program's plan, the JAR manifest must point to the class implementing the
@@ -1339,8 +2166,8 @@ which is available after the job ended.
 
 The most straightforward accumulator is a **counter**: You can increment it using the
 ```Accumulator.add(V value)``` method. At the end of the job Flink will sum up (merge) all partial
-results and send the result to the client. Since accumulators are very easy to use, they can be
-useful during debugging or if you quickly want to find out more about your data.
+results and send the result to the client. Accumulators are useful during debugging or if you
+quickly want to find out more about your data.
 
 Flink currently has the following **built-in accumulators**. Each of them implements the
 {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java "Accumulator" %}
@@ -1431,6 +2258,8 @@ The parallelism of an individual operator, data source, or data sink can be defi
 [WordCount](#example) example program can be set to `5` as follows :
 
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -1443,6 +2272,22 @@ wordCounts.print();
 
 env.execute("Word Count Example");
 {% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val text = [...]
+val wordCounts = text
+    .flatMap{ _.split(" ") map { (_, 1) } }
+    .groupBy(0)
+    .sum(1).setParallelism(5)
+wordCounts.print()
+
+env.execute("Word Count Example")
+{% endhighlight %}
+</div>
+</div>
 
 ### Execution Environment Level
 
@@ -1452,10 +2297,12 @@ it executes. Execution environment parallelism can be overwritten by explicitly
 parallelism of an operator.
 
 The default parallelism of an execution environment can be specified by calling the
-`setDefaultLocalParallelism()` method. To execute all operators, data sources, and data sinks of the
+`setDegreeOfParallelism()` method. To execute all operators, data sources, and data sinks of the
 [WordCount](#example) example program with a parallelism of `3`, set the default parallelism of the
 execution environment as follows:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 env.setDegreeOfParallelism(3);
@@ -1466,6 +2313,23 @@ wordCounts.print();
 
 env.execute("Word Count Example");
 {% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+env.setDegreeOfParallelism(3)
+
+val text = [...]
+val wordCounts = text
+    .flatMap{ _.split(" ") map { (_, 1) } }
+    .groupBy(0)
+    .sum(1)
+wordCounts.print()
+
+env.execute("Word Count Example")
+{% endhighlight %}
+</div>
+</div>
 
 ### System Level
 
@@ -1484,13 +2348,15 @@ useful to know how exactly Flink will execute your program.
 
 __Plan Visualization Tool__
 
-Flink 0.5 comes packaged with a visualization tool for execution plans. The HTML document containing
+Flink comes packaged with a visualization tool for execution plans. The HTML document containing
 the visualizer is located under ```tools/planVisualizer.html```. It takes a JSON representation of
 the job execution plan and visualizes it as a graph with complete annotations of execution
 strategies.
 
 The following code shows how to print the execution plan JSON from your program:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -1498,6 +2364,17 @@ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 System.out.println(env.getExecutionPlan());
 {% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+...
+
+println(env.getExecutionPlan())
+{% endhighlight %}
+</div>
+</div>
 
 
 To visualize the execution plan, do the following: