You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/08 20:32:58 UTC

[1/3] flink git commit: [FLINK-1429] [streaming] Scala programming guide update: intro & operators, minor fixes

Repository: flink
Updated Branches:
  refs/heads/master fb62f6be4 -> 7582390c1


[FLINK-1429] [streaming] Scala programming guide update: intro & operators, minor fixes

This closes #463


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7582390c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7582390c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7582390c

Branch: refs/heads/master
Commit: 7582390c13aa1d492c20debc1c882ef56ec39d8a
Parents: bd1b916
Author: mbalassi <mb...@apache.org>
Authored: Sat Mar 7 15:06:41 2015 +0100
Committer: Gábor Hermann <re...@gmail.com>
Committed: Sun Mar 8 20:24:41 2015 +0100

----------------------------------------------------------------------
 docs/programming_guide.md                       |   2 +-
 docs/streaming_guide.md                         | 596 +++++++++++++++----
 .../flink/streaming/api/scala/DataStream.scala  |  21 +-
 3 files changed, 473 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7582390c/docs/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/programming_guide.md b/docs/programming_guide.md
index 1f4e99e..ee1138c 100644
--- a/docs/programming_guide.md
+++ b/docs/programming_guide.md
@@ -58,7 +58,7 @@ public class WordCountExample {
     public static void main(String[] args) throws Exception {
         final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-	    DataSet<String> text = env.fromElements(
+        DataSet<String> text = env.fromElements(
             "Who's there?",
             "I think I hear them. Stand, ho! Who's there?");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7582390c/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index 7a5835e..fed64f7 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -29,7 +29,7 @@ Introduction
 ------------
 
 
-Flink Streaming is an extension of the core Flink API for high-throughput, low-latency data stream processing. The system can connect to and process data streams from many data sources like RabbitMQ, Flume, Twitter, ZeroMQ and also from any user defined data source. Data streams can be transformed and modified using high-level functions similar to the ones provided by the batch processing API. Flink Streaming provides native support for iterative stream processing. The processed data can be pushed to different output types.
+Flink Streaming is an extension of the batch Flink API for high-throughput, low-latency data stream processing. The system can connect to and process data streams from many data sources like Apache Kafka RabbitMQ, Apache Flume, Twitter and also from any user defined data source. Data streams can be transformed and modified using high-level functions similar to the ones provided by the batch processing API. Flink Streaming provides native support for iterative stream processing. The processed data can be pushed to different output types.
 
 Flink Streaming API
 -----------
@@ -38,15 +38,38 @@ The Streaming API is currently part of the *flink-staging* Maven project. All re
 
 Add the following dependency to your `pom.xml` to use the Flink Streaming.
 
-~~~xml
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight xml %}
 <dependency>
-    <groupId>org.apache.flink</groupId>
-    <artifactId>flink-streaming-core</artifactId>
-    <version>{{site.FLINK_VERSION_SHORT}}</version>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-streaming-core</artifactId>
+  <version>{{site.FLINK_VERSION_SHORT }}</version>
 </dependency>
-~~~
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-clients</artifactId>
+  <version>{{site.FLINK_VERSION_SHORT }}</version>
+</dependency>
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-streaming-scala</artifactId>
+  <version>{{site.FLINK_VERSION_SHORT }}</version>
+</dependency>
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-clients</artifactId>
+  <version>{{site.FLINK_VERSION_SHORT }}</version>
+</dependency>
+{% endhighlight %}
+</div>
+</div>
 
-Create a data stream flow with our Java API as described below. In order to create your own Flink Streaming program, we encourage you to start with the [skeleton](#program-skeleton) and gradually add your own [operations](#operations). The remaining sections act as references for additional operations and advanced features.
+Create a data stream flow with our Java or Scala API as described below. In order to create your own Flink Streaming program we encourage you to start with the [skeleton](#program-skeleton) and gradually add your own [transformations](#transformations). The remaining sections act as references for additional transformations and advanced features.
 
 
 Example Program
@@ -54,7 +77,10 @@ Example Program
 
 The following program is a complete, working example of streaming WordCount. You can copy &amp; paste the code to run it locally.
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+{% highlight java %}
 public class StreamingWordCount {
 
     public static void main(String[] args) {
@@ -82,7 +108,33 @@ public class StreamingWordCount {
     }
     
 }
-~~~
+{% endhighlight %}
+
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+object WordCount {
+  def main(args: Array[String]) {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val text = env.socketTextStream("localhost", 9999)
+
+    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
+      .map { (_, 1) }
+      .groupBy(0)
+      .sum(1)
+
+    counts.print
+
+    env.execute("Scala Socket Stream WordCount")
+  }
+}
+{% endhighlight %}
+</div>
+
+</div>
 
 To run the example program start the input stream with netcat first from a terminal:
 
@@ -97,7 +149,57 @@ The lines typed to this terminal are submitted as a source for your streaming jo
 Program Skeleton
 ----------------
 
-As presented in the [example](#example-program), a Flink Streaming program looks almost identical to a regular Flink program. Each stream processing program consists of the following parts:
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+As presented in the [example](#example-program) a Flink Streaming program looks almost identical to a regular Flink program. Each stream processing program consists of the following parts:
+
+1. Creating a `StreamExecutionEnvironment`,
+2. Connecting to data stream sources,
+3. Specifying transformations on the data streams,
+4. Specifying output for the processed data,
+5. Executing the program.
+
+As these steps are basically the same as in the batch API we will only note the important differences.
+For stream processing jobs, the user needs to obtain a `StreamExecutionEnvironment` in contrast with the [batch API](programming_guide.html) where one would need an `ExecutionEnvironment`. The process otherwise is essentially the same:
+
+{% highlight java %}
+StreamExecutionEnvironment.getExecutionEnvironment();
+StreamExecutionEnvironment.createLocalEnvironment(parallelism);
+StreamExecutionEnvironment.createRemoteEnvironment(…);
+{% endhighlight %}
+
+For connecting to data streams the `StreamExecutionEnvironment` has many different methods, from basic file sources to completely general user defined data sources. We will go into details in the [basics](#basics) section.
+
+{% highlight java %}
+env.socketTextStream(host, port);
+env.fromElements(elements…);
+{% endhighlight %}
+
+After defining the data stream sources the user can specify transformations on the data streams to create a new data stream. Different data streams can be also combined together for joint transformations which are being showcased in the [transformations](#transformations) section.
+
+{% highlight java %}
+dataStream.map(new Mapper()).reduce(new Reducer());
+{% endhighlight %}
+
+The processed data can be pushed to different outputs called sinks. The user can define their own sinks or use any predefined filesystem, message queue or database sink.
+
+{% highlight java %}
+dataStream.writeAsCsv(path);
+dataStream.print();
+{% endhighlight %}
+
+Once the complete program is specified `execute(programName)` is to be called on the `StreamExecutionEnvironment`. This will either execute on the local machine or submit the program for execution on a cluster, depending on the chosen execution environment.
+
+{% highlight java %}
+env.execute(programName);
+{% endhighlight %}
+
+</div>
+
+<div data-lang="scala" markdown="1">
+
+As presented in the [example](#example-program) a Flink Streaming program looks almost identical to a regular Flink program. Each stream processing program consists of the following parts:
 
 1. Creating a `StreamExecutionEnvironment`,
 2. Connecting to data stream sources,
@@ -105,40 +207,44 @@ As presented in the [example](#example-program), a Flink Streaming program looks
 4. Specifying output for the processed data,
 5. Executing the program.
 
-As these steps are basically the same as in the core API we will only note the important differences.
+As these steps are basically the same as in the batch API we will only note the important differences.
 For stream processing jobs, the user needs to obtain a `StreamExecutionEnvironment` in contrast with the batch API where one would need an `ExecutionEnvironment`. The process otherwise is essentially the same:
 
-~~~java 
-StreamExecutionEnvironment.getExecutionEnvironment()
+{% highlight scala %}
+StreamExecutionEnvironment.getExecutionEnvironment
 StreamExecutionEnvironment.createLocalEnvironment(parallelism)
 StreamExecutionEnvironment.createRemoteEnvironment(…)
-~~~
+{% endhighlight %}
 
 For connecting to data streams the `StreamExecutionEnvironment` has many different methods, from basic file sources to completely general user defined data sources. We will go into details in the [basics](#basics) section.
 
-~~~java
+{% highlight scala %}
 env.socketTextStream(host, port)
 env.fromElements(elements…)
-~~~
+{% endhighlight %}
 
-After defining the data stream sources, the user can specify transformations on the data streams to create a new data stream. Different data streams can be also combined together for joint transformations which are being showcased in the [operations](#operations) section.
+After defining the data stream sources the user can specify transformations on the data streams to create a new data stream. Different data streams can be also combined together for joint transformations which are being showcased in the [transformations](#transformations) section.
 
-~~~java
-dataStream.map(new Mapper()).reduce(new Reducer())
-~~~
+{% highlight scala %}
+dataStream.map(new Mapper).reduce(new Reducer)
+{% endhighlight %}
 
-The processed data can be pushed to different outputs called sinks. The user can define their own sinks or use any predefined filesystem or database sink.
+The processed data can be pushed to different outputs called sinks. The user can define their own sinks or use any predefined filesystem, message queue or database sink.
 
-~~~java
+{% highlight scala %}
 dataStream.writeAsCsv(path)
-dataStream.print()
-~~~
+dataStream.print
+{% endhighlight %}
 
 Once the complete program is specified `execute(programName)` is to be called on the `StreamExecutionEnvironment`. This will either execute on the local machine or submit the program for execution on a cluster, depending on the chosen execution environment.
 
-~~~java
+{% highlight scala %}
 env.execute(programName)
-~~~
+{% endhighlight %}
+
+</div>
+
+</div>
 
 [Back to top](#top)
 
@@ -147,9 +253,9 @@ Basics
 
 ### DataStream
 
-The `DataStream` is the basic abstraction provided by the Flink Streaming API. It represents a continuous stream of data of a certain type from either a data source or a transformed data stream. Operations will be applied on individual data points or windows of the `DataStream` based on the type of the operation. For example the map operator transforms each data point individually while window operations work on an interval of data points at the same time.
- 
-The operations may return different `DataStream` types allowing more elaborate transformations, for example the `groupBy(…)` method returns a `GroupedDataStream` which can be used for grouped operations such as aggregating by key.
+The `DataStream` is the basic data abstraction provided by the Flink Streaming API. It represents a continuous stream of data of a certain type from either a data source or a transformed data stream. You can apply transformations on either individual data points or windows of the `DataStream`. For example the map operator transforms each data point individually while window transformations work on intervals of data points at the same time.
+
+The transformations may return different `DataStream` types allowing more elaborate transformations, for example the `groupBy(…)` method returns a `GroupedDataStream` which can be used for grouped transformations such as aggregating by key.
 
 ### Partitioning
 
@@ -161,8 +267,7 @@ Usage: `dataStream.forward()`
 Usage: `dataStream.shuffle()`
  * *Distribute*: Distribute partitioning directs the output data stream to the next operator in a round-robin fashion, achieving a balanced distribution.
 Usage: `dataStream.distribute()`
- * *Field/Key*: Field/Key partitioning partitions the output data stream based on the hash code of a selected key of the tuples. Data points with the same key are directed to the same operator instance. The user can define keys by field positions (for tuple and array types), field expressions (for Pojo types) and custom keys using the `KeySelector` interface. 
-Usage: `dataStream.partitionBy(keys)`
+ * *Field/Key*: Field/Key partitioning partitions the output data stream based on the hash code of a selected key of the tuples. Data points with the same key are directed to the same operator instance. This partition is applied when using the `groupBy` operator.
  * *Broadcast*: Broadcast partitioning sends the output data stream to all parallel instances of the next operator.
 Usage: `dataStream.broadcast()`
  * *Global*: All data points end up at the same operator instance. To achieve this use the parallelism setting of the corresponding operator.
@@ -174,9 +279,9 @@ The user is expected to connect to the outside world through the source and the
 
 #### Sources
 
-The user can connect to data streams by the different implementations of `SourceFunction` using `StreamExecutionEnvironment.addSource(SourceFunction)`. In contrast with other operators, DataStreamSources have a default operator parallelism of 1.
+The user can connect to data streams by the different implementations of `SourceFunction` using `StreamExecutionEnvironment.addSource(sourceFunction)`. In contrast with other operators, DataStreamSources have a default operator parallelism of 1.
 
-To create parallel sources the users source function needs to implement `ParallelSourceFunction` or extend `RichParallelSourceFunction` in which cases the source will have the parallelism of the environment. The degree of parallelism for ParallelSourceFunctions can be changed afterwards using `source.setParallelism(int dop)`.
+To create parallel sources the users source function needs to implement `ParallelSourceFunction` or extend `RichParallelSourceFunction` in which cases the source will have the parallelism of the environment. The degree of parallelism for ParallelSourceFunctions can be changed afterwards using `source.setParallelism(dop)`.
 
 There are several predefined ones similar to the ones of the batch API and some streaming specific ones like:
 
@@ -202,91 +307,247 @@ The user can also implement arbitrary sink functionality by implementing the `Si
 
 [Back to top](#top)
 
-Operations
+Transformations
 ----------------
 
-Operations represent transformations on the `DataStream`. The user can chain and combine multiple operators on the data stream to produce the desired processing steps. Most of the operators work very similar to the core Flink API allowing developers to reason about `DataStream` the same way as they would about `DataSet`. At the same time there are operators that exploit the streaming nature of the data to allow advanced functionality.
+Transformations represent the users' business logic on the data stream. The user can chain and combine multiple operators on the data stream to produce the desired processing steps. Most of the operators work very similar to the batch Flink API allowing developers to reason about `DataStream` the same way as they would about `DataSet`. At the same time there are operators that exploit the streaming nature of the data to allow advanced functionality.
 
-### Basic operators
+### Basic transformations
 
-Basic operators can be seen as functions that transform each data element in the data stream.
- 
-#### Map
-The Map transformation applies a user-defined `MapFunction` on each element of a `DataStream`. It implements a one-to-one mapping, that is, exactly one element must be returned by the function.
-A map operator that doubles the values of the input stream:
+Basic transformations can be seen as functions that operate on records of the data stream.
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td><strong>Map</strong></td>
+      <td>
+        <p>Takes one element and produces one element. A map that doubles the values of the input stream:</p>
+{% highlight java %}
 dataStream.map(new MapFunction<Integer, Integer>() {
             @Override
             public Integer map(Integer value) throws Exception {
                 return 2 * value;
             }
-        })
-~~~
-
-#### FlatMap
-The FlatMap transformation applies a user-defined `FlatMapFunction` on each element of a `DataStream`. This variant of a map function can return arbitrary many result elements (including none) for each input element.
-A flatmap operator that splits sentences to words:
+        });
+{% endhighlight %}
+      </td>
+    </tr>
 
-~~~java
+    <tr>
+      <td><strong>FlatMap</strong></td>
+      <td>
+        <p>Takes one element and produces zero, one, or more elements. A flatmap that splits sentences to words:</p>
+{% highlight java %}
 dataStream.flatMap(new FlatMapFunction<String, String>() {
             @Override
-            public void flatMap(String value, Collector<String> out) throws Exception {
+            public void flatMap(String value, Collector<String> out) 
+                throws Exception {
                 for(String word: value.split(" ")){
                     out.collect(word);
                 }
             }
-        })
-~~~
-
-#### Filter
-The Filter transformation applies a user-defined `FilterFunction` on each element of a `DataStream` and retains only those elements for which the function returns true.
-A filter that filters out zero values:
+        });
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Filter</strong></td>
+      <td>
+        <p>Evaluates a boolean function for each element and retains those for which the function returns true.
+        <br/>
+        
+        <strong>IMPORTANT:</strong> The system assumes that the function does not modify the elements on which the predicate is applied. Violating this assumption
+        can lead to incorrect results.
 
-~~~java
+        <br/>
+        A filter that filters out zero values:
+        </p>
+{% highlight java %}
 dataStream.filter(new FilterFunction<Integer>() { 
             @Override
             public boolean filter(Integer value) throws Exception {
                 return value != 0;
             }
-        })
-~~~
-
-#### Reduce
-The Reduce transformation applies a user-defined `ReduceFunction` to all elements of a `DataStream`. The `ReduceFunction` subsequently combines pairs of elements into one element and outputs the current reduced value as a `DataStream`.
-A reducer that sums up the incoming stream:
+        });
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Reduce</strong></td>
+      <td>
+        <p>Combines a group of elements into a single element by repeatedly combining two elements
+        into one and emits the current state after every reduction. Reduce may be applied on a full, windowed or grouped data stream.
+        <br/>
+        
+        <strong>IMPORTANT:</strong> The streaming and the batch reduce functions have different semantics. A streaming reduce on a full or grouped data stream emits the current reduced value for every new element on a data stream. On a windowed data stream it works as a batch reduce: it produces at most one value.
+        <br/>
 
-~~~java
+         A reducer that sums up the incoming stream:</p>
+{% highlight java %}
 dataStream.reduce(new ReduceFunction<Integer>() {
             @Override
-            public Integer reduce(Integer value1, Integer value2) throws Exception {
-                return value1+value2;
+            public Integer reduce(Integer value1, Integer value2) 
+            throws Exception {
+                return value1 + value2;
             }
-        })
-~~~
+        });
+{% endhighlight %}
+      </td>
+    </tr>
 
-#### Merge
-Merges two or more `DataStream` outputs, creating a new DataStream containing all the elements from all the streams.
+    <tr>
+      <td><strong>Merge</strong></td>
+      <td>
+        <p>Merges two or more datastreams creating a new stream containing all the elements from all the streams.</p>
+{% highlight java %}
+dataStream.merge(otherStream1, otherStream2, …)
+{% endhighlight %}
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+----------
+
+The following transformations are available on data sets of Tuples:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td><strong>Project</strong></td>
+      <td>
+        <p>Selects a subset of fields from the tuples</p>
+{% highlight java %}
+DataSet<Tuple3<Integer, Double, String>> in = // [...]
+DataSet<Tuple2<String, Integer>> out = in.project(2,0);
+{% endhighlight %}
+      </td>
+    </tr>
+  </tbody>
+</table>
 
-~~~java
-dataStream.merge(otherStream1, otherStream2…)
-~~~
+</div>
+
+<div data-lang="scala" markdown="1">
+
+<br />
+
+
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+
+  <tbody>
+
+    <tr>
+      <td><strong>Map</strong></td>
+      <td>
+        <p>Takes one element and produces one element. A map that doubles the values of the input stream:</p>
+{% highlight scala %}
+dataStream.map{ x => x * 2 }
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>FlatMap</strong></td>
+      <td>
+        <p>Takes one element and produces zero, one, or more elements. A flatmap that splits sentences to words:</p>
+{% highlight scala %}
+data.flatMap { str => str.split(" ") }
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Filter</strong></td>
+      <td>
+        <p>Evaluates a boolean function for each element and retains those for which the function returns true.
+        <br/>
+        
+        <strong>IMPORTANT:</strong> The system assumes that the function does not modify the elements on which the predicate is applied. Violating this assumption
+        can lead to incorrect results.
+        <br/>
+        
+        A filter that filters out zero values:
+        </p>
+{% highlight scala %}
+dataStream.filter{ _ != 0 }
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Reduce</strong></td>
+        <td>
+        <p>Combines a group of elements into a single element by repeatedly combining two elements
+        into one and emits the current state after every reduction. Reduce may be applied on a full, windowed or grouped data stream.
+        <br/>
+        
+        <strong>IMPORTANT:</strong> The streaming and the batch reduce functions have different semantics. A streaming reduce on a full or grouped data stream emits the current reduced value for every new element on a data stream. On a windowed data stream it works as a batch reduce: it produces at most one value.
+        <br/>
+
+         A reducer that sums up the incoming stream:</p>
+{% highlight scala %}
+dataStream.reduce{ _ + _ }
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Merge</strong></td>
+      <td>
+        <p>Merges two or more datastreams creating a new stream containing all the elements from all the streams.</p>
+{% highlight scala %}
+dataStream.merge(otherStream1, otherStream2, …)
+{% endhighlight %}
+      </td>
+    </tr>
+
+  </tbody>
+
+
+</table>
+
+</div>
+
+</div>
 
 ### Grouped operators
 
 Some transformations require that the elements of a `DataStream` are grouped on some key. The user can create a `GroupedDataStream` by calling the `groupBy(key)` method of a non-grouped `DataStream`. 
 Keys can be of three types: fields positions (applicable for tuple/array types), field expressions (applicable for pojo types), KeySelector instances. 
 
-The user can apply different reduce transformations on the obtained `GroupedDataStream`:
-
-#### Reduce on GroupedDataStream
-When the reduce operator is applied on a grouped data stream, the user-defined `ReduceFunction` will combine subsequent pairs of elements having the same key value. The combined results are sent to the output stream.
-
 ### Aggregations
 
-The Flink Streaming API supports different types of pre-defined aggregation operators similarly to the core API.
+The Flink Streaming API supports different types of pre-defined aggregation operators similarly to the batch API.
 
-Types of aggregations: `sum(field)`, `min(field)`, `max(field)`, `minBy(field, first)`, `maxBy(field, first)`
+Types of aggregations: `sum(field)`, `min(field)`, `max(field)`, `minBy(field, first)`, `maxBy(field, first)`.
 
 With `sum`, `min`, and `max` for every incoming tuple the selected field is replaced with the current aggregated value. Fields can be selected using either field positions or field expressions (similarly to grouping).
 
@@ -296,7 +557,7 @@ There is also an option to apply user defined aggregations with the usage of the
 
 ### Window operators
 
-Flink streaming provides very flexible windowing semantics to create arbitrary windows (also referred to as discretizations or slices) of the data streams and apply reduce, map or aggregation operations on the windows acquired. Windowing can be used for instance to create rolling aggregations of the most recent N elements, where N could be defined by Time, Count or any arbitrary user defined measure.
+Flink streaming provides very flexible windowing semantics to create arbitrary windows (also referred to as discretizations or slices) of the data streams and apply reduce, map or aggregation transformations on the windows acquired. Windowing can be used for instance to create rolling aggregations of the most recent N elements, where N could be defined by Time, Count or any arbitrary user defined measure.
 
 The user can control the size (eviction) of the windows and the frequency of reduction or aggregation calls (trigger) on them in an intuitive API (some examples):
 
@@ -304,7 +565,7 @@ The user can control the size (eviction) of the windows and the frequency of red
  * `dataStream.window(…).every(…).mapWindow(…).flatten()`
  * `dataStream.window(…).every(…).groupBy(…).aggregate(…).getDiscretizedStream()`
 
-The core abstraction of the Windowing semantics is the `WindowedDataStream` and the `StreamWindow`. The `WindowedDataStream` is created when we call the `.window(…)` method of the DataStream and represents the windowed discretisation of the underlying stream. The user can think about it simply as a `DataStream<StreamWindow<T>>` where additional API functions are supplied to provide efficient transformations of individual windows. 
+The core abstraction of the Windowing semantics is the `WindowedDataStream` and the `StreamWindow`. The `WindowedDataStream` is created when we call the `window(…)` method of the DataStream and represents the windowed discretisation of the underlying stream. The user can think about it simply as a `DataStream<StreamWindow<T>>` where additional API functions are supplied to provide efficient transformations of individual windows. 
 
 The result of a window transformation is again a `WindowedDataStream` which can also be used to further transform the resulting windows. In this sense, window transformations define mapping from stream windows to stream windows.
 
@@ -316,17 +577,35 @@ The user has different ways of using the a result of a window operation:
 
 The next example would create windows that hold elements of the last 5 seconds, and the user defined transformation would be executed on the windows every second (sliding the window by 1 second):
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+dataStream.window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1, TimeUnit.SECONDS));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
 dataStream.window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1, TimeUnit.SECONDS))
-~~~
+{% endhighlight %}
+</div>
+</div>
 
 This approach is often referred to as policy based windowing. Different policies (count, time, etc.) can be mixed as well, for example to downsample our stream, a window that takes the latest 100 elements of the stream every minute is created as follows:
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+dataStream.window(Count.of(100)).every(Time.of(1, TimeUnit.MINUTES));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
 dataStream.window(Count.of(100)).every(Time.of(1, TimeUnit.MINUTES))
-~~~
+{% endhighlight %}
+</div>
+</div>
 
-The user can also omit the `.every(…)` call which results in a tumbling window emptying the window after every transformation call.
+The user can also omit the `every(…)` call which results in a tumbling window emptying the window after every transformation call.
 
 Several predefined policies are provided in the API, including delta-based, count-based and time-based policies. These can be accessed through the static methods provided by the `PolicyHelper` classes:
 
@@ -339,17 +618,28 @@ For detailed description of these policies please refer to the [Javadocs](http:/
 #### Policy based windowing
 The policy based windowing is a highly flexible way to specify stream discretisation also called windowing semantics. Two types of policies are used for such a specification:
 
- * `TriggerPolicy` defines when to trigger the reduce UDF on the current window and emit the result. In the API it completes a window statement such as: `.window(…).every(…)`, while the triggering policy is passed within `every`. 
+ * `TriggerPolicy` defines when to trigger the reduce UDF on the current window and emit the result. In the API it completes a window statement such as: `window(…).every(…)`, while the triggering policy is passed within `every`. 
 
 Several predefined policies are provided in the API, including delta-based, punctuation based, count-based and time-based policies. Policies are in general UDFs and can implement any custom behaviour.
 
- * `EvictionPolicy` defines the length of a window as a means of a predicate for evicting tuples when they are no longer needed. In the API this can be defined by the `.window(…)` operation on a stream. There are mostly the same predefined policy types provided as for trigger policies.
+ * `EvictionPolicy` defines the length of a window as a means of a predicate for evicting tuples when they are no longer needed. In the API this can be defined by the `window(…)` operation on a stream. There are mostly the same predefined policy types provided as for trigger policies.
 
 In addition to the `dataStream.window(…).every(…)` style users can specifically pass the trigger and eviction policies during the window call:
 
-~~~java
-dataStream.window(TriggerPolicy, EvictionPolicy)
-~~~
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+dataStream.window(triggerPolicy, evictionPolicy);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+dataStream.window(triggerPolicy, evictionPolicy)
+{% endhighlight %}
+</div>
+
+</div>
 
 By default most triggers can only trigger when a new element arrives. This might not be suitable for all the use-cases, especially when time based windowing is applied. To also provide triggering between elements so called active policies can be used. The predefined time-based policies are already implemented in such a way and can hold as an example for user defined active policy implementations. 
 
@@ -360,49 +650,105 @@ The `WindowedDataStream<T>.reduceWindow(ReduceFunction<T>)` transformation calls
 
 The following is an example for a window reduce that sums the elements in the last minute with 10 seconds slide interval:
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 dataStream.window(Time.of(1, TimeUnit.MINUTES)).every(Time.of(10,TimeUnit.SECONDS)).sum(field);
-~~~
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+dataStream.window(Time.of(1, TimeUnit.MINUTES)).every(Time.of(10,TimeUnit.SECONDS)).sum(field)
+{% endhighlight %}
+</div>
+
+</div>
+
 
 #### Map on windowed data streams
 The `WindowedDataStream<T>.mapWindow(WindowMapFunction<T,O>)` transformation calls  `mapWindow(…)` for each `StreamWindow` in the discretised stream providing access to all elements in the window through the iterable interface. At each function call the output `StreamWindow<O>` will consist of all the elements collected to the collector. This allows a straightforward way of mapping one stream window to another.
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+windowedDataStream.mapWindow(windowMapFunction);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
 windowedDataStream.mapWindow(windowMapFunction)
-~~~
+{% endhighlight %}
+</div>
+
+</div>
 
-#### Grouped operations on windowed data streams
-Calling the `.groupBy(…)` method on a windowed stream groups the elements by the given fields inside the stream windows. The window sizes (evictions) and slide sizes (triggers) will be calculated on the whole stream (in a global fashion), but the user defined functions will be applied on a per group basis. This means that for a call `windowedStream.groupBy(…).reduceWindow(…)` will transform each window into another window consisting of as many elements as groups, with the reduced values per key. Similarly the `mapWindow` transformation is applied per group as well.
+#### Grouped transformations on windowed data streams
+Calling the `groupBy(…)` method on a windowed stream groups the elements by the given fields inside the stream windows. The window sizes (evictions) and slide sizes (triggers) will be calculated on the whole stream (in a global fashion), but the user defined functions will be applied on a per group basis. This means that for a call `windowedStream.groupBy(…).reduceWindow(…)` will transform each window into another window consisting of as many elements as groups, with the reduced values per key. Similarly the `mapWindow` transformation is applied per group as well.
 
-The user can also create discretisation on a per group basis calling `.window(…).every(…)` on an already grouped data stream. This will apply the discretisation logic independently for each key.
+The user can also create discretisation on a per group basis calling `window(…).every(…)` on an already grouped data stream. This will apply the discretisation logic independently for each key.
 
 To highlight the differences let us look at two examples.
 
 To get the maximal value for each key on the last 100 elements (global) we use the first approach:
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 dataStream.window(Count.of(100)).every(…).groupBy(groupingField).max(field);
-~~~
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+dataStream.window(Count.of(100)).every(…).groupBy(groupingField).max(field)
+{% endhighlight %}
+</div>
+
+</div>
 
 Using this approach we took the last 100 elements, divided it into groups by key then applied the aggregation. To create fixed size windows for every key we need to reverse the order of the groupBy call. So to take the max for the last 100 elements in Each group:
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 dataStream.groupBy(groupingField).window(Count.of(100)).every(…).max(field);
-~~~
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+dataStream.groupBy(groupingField).window(Count.of(100)).every(…).max(field)
+{% endhighlight %}
+</div>
+
+</div>
 
 This will create separate windows for different keys and apply the trigger and eviction policies on a per group basis.
 
 #### Applying multiple transformations on a window
 Using the `WindowedDataStream` abstraction we can apply several transformations one after another on the discretised streams without having to re-discretise it:
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+dataStream.window(Count.of(1000)).groupBy(firstKey).mapWindow(…)
+    .groupBy(secondKey).reduceWindow(…).flatten();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
 dataStream.window(Count.of(1000)).groupBy(firstKey).mapWindow(…)
     .groupBy(secondKey).reduceWindow(…).flatten()
-~~~
+{% endhighlight %}
+</div>
+</div>
 
 The above call would create global windows of 1000 elements group it by the first key and then apply a mapWindow transformation. The resulting windowed stream will then be grouped by the second key and further reduced. The results of the reduce transformation are then flattened.
 
-Notice here that we only defined the window size once at the beginning of the transformation. This means that anything that happens afterwards (`.groupBy(firstKey).mapWindow(…).groupBy(secondKey).reduceWindow(…)`) happens inside the 1000 element windows. Of course the mapWindow might reduce the number of elements but the key idea is that each transformation still corresponds to the same 1000 elements in the original stream.
+Notice that here we only defined the window size once at the beginning of the transformation. This means that anything that happens afterwards (`groupBy(firstKey).mapWindow(…).groupBy(secondKey).reduceWindow(…)`) happens inside the 1000 element windows. Of course the mapWindow might reduce the number of elements but the key idea is that each transformation still corresponds to the same 1000 elements in the original stream.
 
 #### Global vs local discretisation
 By default all window discretisation calls (`dataStream.window(…)`) define global windows meaning that a global window of count 100 will contain the last 100 elements arrived at the discretisation operator in order. In most cases (except for Time) this means that the operator doing the actual discretisation needs to have a degree of parallelism of 1 to be able to correctly execute the discretisation logic.
@@ -532,11 +878,9 @@ dataStream1.connect(dataStream2)
 val dataStream1 : DataStream[Int] = ...
 val dataStream2 : DataStream[String] = ...
 
-dataStream2.flatMap((str : String) => str.split(" "))
-
 (dataStream1 connect dataStream2)
   .flatMap(
-    (num : Int) => num.toString,
+    (num : Int) => List(num.toString),
     (str : String) => str.split(" ")
   )
 {% endhighlight %}
@@ -549,8 +893,8 @@ The windowReduce operator applies a user defined `CoWindowFunction` to time alig
 #### Reduce on ConnectedDataStream
 The Reduce operator for the `ConnectedDataStream` applies a simple reduce transformation on the joined data streams and then maps the reduced elements to a common output type.
 
-<div class="codetabs" markdown="1">
 ### Output splitting
+<div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
 Most data stream operators support directed outputs (output splitting), meaning that different output elements are sent only to specific outputs. The outputs are referenced by their name given at the point of receiving:
@@ -600,8 +944,8 @@ Most data stream operators support directed outputs (output splitting), meaning
 val split = someDataStream.split(
   (num: Int) =>
     (num % 2) match {
-      case 0 => "even"
-      case 1 => "odd"
+      case 0 => List("even")
+      case 1 => List("odd")
     }
 )
 
@@ -620,11 +964,11 @@ Every output will be emitted to the selected outputs exactly once, even if you a
 
 </div>
 
-<div class="codetabs" markdown="1">
 ### Iterations
+<div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
-The Flink Streaming API supports implementing iterative stream processing dataflows similarly to the core Flink API. Iterative streaming programs also implement a step function and embed it into an `IterativeDataStream`.
-Unlike in the core API the user does not define the maximum number of iterations, but at the tail of each iteration part of the output is streamed forward to the next operator and part is streamed back to the iteration head. The user controls the output of the iteration tail using [output splitting](#output-splitting) or [filters](#filter).
+The Flink Streaming API supports implementing iterative stream processing dataflows similarly to the batch Flink API. Iterative streaming programs also implement a step function and embed it into an `IterativeDataStream`.
+Unlike in the batch API the user does not define the maximum number of iterations, but at the tail of each iteration part of the output is streamed forward to the next operator and part is streamed back to the iteration head. The user controls the output of the iteration tail using [output splitting](#output-splitting) or [filters](#filter).
 To start an iterative part of the program the user defines the iteration starting point:
 
 {% highlight java %}
@@ -637,7 +981,7 @@ The operator applied on the iteration starting point is the head of the iteratio
 DataStream<Integer> head = iteration.map(new IterationHead());
 {% endhighlight %}
 
-To close an iteration and define the iteration tail, the user calls `.closeWith(iterationTail)` method of the `IterativeDataStream`. This iteration tail (the DataStream given to the `closeWith` function) will be fed back to the iteration head. A common pattern is to use [filters](#filter) to separate the output of the iteration from the feedback-stream.
+To close an iteration and define the iteration tail, the user calls `closeWith(iterationTail)` method of the `IterativeDataStream`. This iteration tail (the DataStream given to the `closeWith` function) will be fed back to the iteration head. A common pattern is to use [filters](#filter) to separate the output of the iteration from the feedback-stream.
 
 {% highlight java %}
 DataStream<Integer> tail = head.map(new IterationTail());
@@ -646,17 +990,17 @@ iteration.closeWith(tail.filter(isFeedback));
 
 DataStream<Integer> output = tail.filter(isOutput);
 
-output.map(…).project(…)…
+output.map(…).project(…);
 {% endhighlight %}
 
 In this case all values passing the `isFeedback` filter will be fed back to the iteration head, and the values passing the `isOutput` filter will produce the output of the iteration that can be transformed further (here with a `map` and a `projection`) outside the iteration.
 
-Because iterative streaming programs do not have a set number of iterations for each data element, the streaming program has no information on the end of its input. From this it follows that iterative streaming programs run until the user manually stops the program. While this is acceptable under normal circumstances a method is provided to allow iterative programs to shut down automatically if no input received by the iteration head for a predefined number of milliseconds.
+Because iterative streaming programs do not have a set number of iterations for each data element, the streaming program has no information on the end of its input. As a consequence iterative streaming programs run until the user manually stops the program. While this is acceptable under normal circumstances a method is provided to allow iterative programs to shut down automatically if no input received by the iteration head for a predefined number of milliseconds.
 To use this functionality the user needs to add the maxWaitTimeMillis parameter to the `dataStream.iterate(…)` call to control the max wait time. 
 </div>
 <div data-lang="scala" markdown="1">
-The Flink Streaming API supports implementing iterative stream processing dataflows similarly to the core Flink API. Iterative streaming programs also implement a step function and embed it into an `IterativeDataStream`.
-Unlike in the core API the user does not define the maximum number of iterations, but at the tail of each iteration part of the output is streamed forward to the next operator and part is streamed back to the iteration head. The user controls the output of the iteration tail by defining a step function that return two DataStreams: a feedback and an output. The first one is the output that will be fed back to the start of the iteration and the second is the output stream of the iterative part.
+The Flink Streaming API supports implementing iterative stream processing dataflows similarly to the batch Flink API. Iterative streaming programs also implement a step function and embed it into an `IterativeDataStream`.
+Unlike in the batch API the user does not define the maximum number of iterations, but at the tail of each iteration part of the output is streamed forward to the next operator and part is streamed back to the iteration head. The user controls the output of the iteration tail by defining a step function that return two DataStreams: a feedback and an output. The first one is the output that will be fed back to the start of the iteration and the second is the output stream of the iterative part.
 
 A common pattern is to use [filters](#filter) to separate the output from the feedback-stream. In this case all values passing the `isFeedback` filter will be fed back to the iteration head, and the values passing the `isOutput` filter will produce the output of the iteration that can be transformed further (here with a `map` and a `projection`) outside the iteration.
 
@@ -667,17 +1011,17 @@ val iteratedStream = someDataStream.iterate(maxWaitTime) {
     val tail = head.map(iterationTail)
     (tail.filter(isFeedback), tail.filter(isOutput))
   }
-}.map(…).project(…)…
+}.map(…).project(…)
 {% endhighlight %}
 
-Because iterative streaming programs do not have a set number of iterations for each data element, the streaming program has no information on the end of its input. From this it follows that iterative streaming programs run until the user manually stops the program. While this is acceptable under normal circumstances a method is provided to allow iterative programs to shut down automatically if no input received by the iteration head for a predefined number of milliseconds.
+Because iterative streaming programs do not have a set number of iterations for each data element, the streaming program has no information on the end of its input. As a consequence iterative streaming programs run until the user manually stops the program. While this is acceptable under normal circumstances a method is provided to allow iterative programs to shut down automatically if no input received by the iteration head for a predefined number of milliseconds.
 To use this functionality the user needs to add the maxWaitTimeMillis parameter to the `dataStream.iterate(…)` call to control the max wait time. 
 </div>
 
 </div>
 
 ### Rich functions
-The usage of rich functions are essentially the same as in the core Flink API. All transformations that take as argument a user-defined function can instead take a rich function as argument:
+The [usage](programming_guide.html#rich-functions) of rich functions are essentially the same as in the batch Flink API. All transformations that take as argument a user-defined function can instead take a rich function as argument:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -765,12 +1109,12 @@ Operator Settings
 
 ### Parallelism
 
-Setting parallelism for operators works exactly the same way as in the core Flink API. The user can control the number of parallel instances created for each operator by calling the `operator.setParallelism(dop)` method.
+Setting parallelism for operators works exactly the same way as in the batch Flink API. The user can control the number of parallel instances created for each operator by calling the `operator.setParallelism(dop)` method.
 
 ### Buffer timeout
 
 By default data points are not transferred on the network one-by-one, which would cause unnecessary network traffic, but are buffered in the output buffers. The size of the output buffers can be set in the Flink config files. While this method is good for optimizing throughput, it can cause latency issues when the incoming stream is not fast enough.
-To tackle this issue the user can call `env.setBufferTimeout(timeoutMillis)` on the execution environment (or on individual operators) to set a maximum wait time for the buffers to fill up. After this time the buffers are flushed automatically even if they are not full. The default value for this timeout is 100ms which should be appropriate for most use-cases. 
+To tackle this issue the user can call `env.setBufferTimeout(timeoutMillis)` on the execution environment (or on individual operators) to set a maximum wait time for the buffers to fill up. After this time the buffers are flushed automatically even if they are not full. The default value for this timeout is 100 ms which should be appropriate for most use-cases. 
 
 Usage:
 
@@ -794,7 +1138,7 @@ env.genereateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)
 </div>
 
 To maximise the throughput the user can call `setBufferTimeout(-1)` which will remove the timeout and buffers will only be flushed when they are full.
-To minimise latency, set the timeout to a value close to 0 (fro example 5 or 10 ms). Theoretically a buffer timeout of 0 will cause all outputs to be flushed when produced, but this setting should be avoided because it can cause severe performance degradation.
+To minimise latency, set the timeout to a value close to 0 (for example 5 or 10 ms). Theoretically a buffer timeout of 0 will cause all outputs to be flushed when produced, but this setting should be avoided because it can cause severe performance degradation.
 
 
 [Back to top](#top)

http://git-wip-us.apache.org/repos/asf/flink/blob/7582390c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index b673f25..15467bb 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -221,8 +221,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    *
    */
-  def iterate[R](maxWaitTimeMillis:Long = 0)(stepFunction: DataStream[T] => (DataStream[T], DataStream[R]))
-        : DataStream[R] = {
+  def iterate[R](maxWaitTimeMillis:Long = 0)
+                (stepFunction: DataStream[T] => (DataStream[T], DataStream[R])) : DataStream[R] = {
     val iterativeStream = javaStream.iterate(maxWaitTimeMillis)
 
     val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))
@@ -495,23 +495,6 @@ class DataStream[T](javaStream: JavaStream[T]) {
    */
   def split(selector: OutputSelector[T]): SplitDataStream[T] = javaStream.split(selector)
 
-//  /**
-//   * Creates a new SplitDataStream that contains only the elements satisfying the
-//   *  given output selector predicate.
-//   */
-//  def split(fun: T => String): SplitDataStream[T] = {
-//    if (fun == null) {
-//      throw new NullPointerException("OutputSelector must not be null.")
-//    }
-//    val selector = new OutputSelector[T] {
-//      val cleanFun = clean(fun)
-//      def select(in: T): java.lang.Iterable[String] = {
-//        List(cleanFun(in))
-//      }
-//    }
-//    split(selector)
-//  }
-
   /**
    * Creates a new SplitDataStream that contains only the elements satisfying the
    *  given output selector predicate.


[2/3] flink git commit: Fixed simple typos

Posted by gy...@apache.org.
Fixed simple typos


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bd1b916f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bd1b916f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bd1b916f

Branch: refs/heads/master
Commit: bd1b916f945b07fb921a4226bde1e7ca59fabab5
Parents: 8c9ab85
Author: Akshay Dixit <ak...@gmail.com>
Authored: Sat Mar 7 23:33:15 2015 +0530
Committer: Gábor Hermann <re...@gmail.com>
Committed: Sun Mar 8 20:24:41 2015 +0100

----------------------------------------------------------------------
 docs/programming_guide.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bd1b916f/docs/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/programming_guide.md b/docs/programming_guide.md
index c6c5bd1..1f4e99e 100644
--- a/docs/programming_guide.md
+++ b/docs/programming_guide.md
@@ -1710,7 +1710,7 @@ Generic:
 val env  = ExecutionEnvironment.getExecutionEnvironment
 
 // read text file from local files system
-val localLiens = env.readTextFile("file:///path/to/my/textfile")
+val localLines = env.readTextFile("file:///path/to/my/textfile")
 
 // read text file from a HDFS running at nnHost:nnPort
 val hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")


[3/3] flink git commit: [FLINK-1429] [streaming] Scala documentation and minor Scala API features

Posted by gy...@apache.org.
[FLINK-1429] [streaming] Scala documentation and minor Scala API features


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c9ab85e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c9ab85e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c9ab85e

Branch: refs/heads/master
Commit: 8c9ab85e7e62a1a4ea58e09d3790bddd74d25832
Parents: fb62f6b
Author: Gábor Hermann <re...@gmail.com>
Authored: Sun Mar 8 16:41:54 2015 +0100
Committer: Gábor Hermann <re...@gmail.com>
Committed: Sun Mar 8 20:24:41 2015 +0100

----------------------------------------------------------------------
 docs/programming_guide.md                       |   2 +-
 docs/streaming_guide.md                         | 399 +++++++++++++++----
 .../api/scala/ConnectedDataStream.scala         |  35 +-
 .../flink/streaming/api/scala/DataStream.scala  |  50 ++-
 .../streaming/api/scala/SplitDataStream.scala   |   2 -
 5 files changed, 390 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8c9ab85e/docs/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/programming_guide.md b/docs/programming_guide.md
index efedc1b..c6c5bd1 100644
--- a/docs/programming_guide.md
+++ b/docs/programming_guide.md
@@ -2222,7 +2222,7 @@ val initial = env.fromElements(0)
 val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
   val result = iterationInput.map { i => 
     val x = Math.random()
-    val y = Math.randon()
+    val y = Math.random()
     i + (if (x * x + y * y < 1) 1 else 0)
   }
   result

http://git-wip-us.apache.org/repos/asf/flink/blob/8c9ab85e/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index 0fb7dac..7a5835e 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -414,7 +414,7 @@ For example `dataStream.window(Count.of(100)).maxBy(field)` would create global
 
 ### Temporal database style operators
 
-While database style operators like joins (on key) and crosses are hard to define properly on data streams, a straight forward implementation is to apply these operators on windows of the data streams. 
+While database style operators like joins (on key) and crosses are hard to define properly on data streams, a straightforward implementation is to apply these operators on windows of the data streams. 
 
 Currently join and cross operators are supported on time windows.
 
@@ -422,31 +422,53 @@ The Join transformation produces a new Tuple DataStream with two fields. Each tu
 
 The following code shows a default Join transformation using field position keys:
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 dataStream1.join(dataStream2)
     .onWindow(windowing_params)
     .where(key_in_first)
     .equalTo(key_in_second);
-~~~
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+dataStream1.join(dataStream2)
+    .onWindow(windowing_params)
+    .where(key_in_first)
+    .equalTo(key_in_second)
+{% endhighlight %}
+</div>
+</div>
 
-The Cross transformation combines two DataStreams into one DataStreams. It builds all pairwise combinations of the elements of both input DataStreams in the current window, i.e., it builds a temporal Cartesian product.
+The Cross transformation combines two DataStreams into one DataStream. It builds all pairwise combinations of the elements of both input DataStreams in the current window, i.e., it builds a temporal Cartesian product.
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 dataStream1.cross(dataStream2).onWindow(windowing_params);
-~~~
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+dataStream1 cross dataStream2 onWindow (windowing_params)
+{% endhighlight %}
+</div>
+</div>
 
-Please note that this is currently not integrated with the windowing semantics, integration is work in progress.
 
 ### Co operators
 
-Co operators allow the users to jointly transform two `DataStreams` of different types providing a simple way to jointly manipulate a shared state. It is designed to support joint stream transformations where merging is not appropriate due to different data types or in case the user needs explicit tracking of the joined stream origin.
-Co operators can be applied to `ConnectedDataStreams` which represent two `DataStreams` of possibly different types. A `ConnectedDataStream` can be created by calling the `connect(otherDataStream)` method of a `DataStream`. Please note that the two connected `DataStreams` can also be merged data streams.
+Co operators allow the users to jointly transform two DataStreams of different types providing a simple way to jointly manipulate a shared state. It is designed to support joint stream transformations where merging is not appropriate due to different data types or in case the user needs explicit tracking of the joined stream origin.
+Co operators can be applied to ConnectedDataStreams which represent two DataStreams of possibly different types. A ConnectedDataStream can be created by calling the `connect(otherDataStream)` method of a DataStream. Please note that the two connected DataStreams can also be merged data streams.
 
 #### Map on ConnectedDataStream
 Applies a CoMap transformation on two separate DataStreams, mapping them to a common output type. The transformation calls a `CoMapFunction.map1()` for each element of the first input and `CoMapFunction.map2()` for each element of the second input. Each CoMapFunction call returns exactly one element.
 A CoMap operator that outputs true if an Integer value is received and false if a String value is received:
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 DataStream<Integer> dataStream1 = ...
 DataStream<String> dataStream2 = ...
         
@@ -463,29 +485,63 @@ dataStream1.connect(dataStream2)
                 return false;
             }
         })
-~~~
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val dataStream1 : DataStream[Int] = ...
+val dataStream2 : DataStream[String] = ...
+
+(dataStream1 connect dataStream2)
+  .map(
+    (_ : Int) => true,
+    (_ : String) => false
+  )
+{% endhighlight %}
+</div>
+</div>
 
 #### FlatMap on ConnectedDataStream
 The FlatMap operator for the `ConnectedDataStream` works similarly to CoMap, but instead of returning exactly one element after each map call the user can output arbitrarily many values using the Collector interface. 
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 DataStream<Integer> dataStream1 = ...
 DataStream<String> dataStream2 = ...
         
 dataStream1.connect(dataStream2)
-    .flatMap(new CoFlatMapFunction<Integer, String, Boolean>() {
+    .flatMap(new CoFlatMapFunction<Integer, String, String>() {
 
             @Override
-            public void flatMap1(Integer value, Collector<Boolean> out) {
-                out.collect(true);
+            public void flatMap1(Integer value, Collector<String> out) {
+                out.collect(value.toString());
             }
 
             @Override
-            public void flatMap2(String value, Collector<Boolean> out) {
-                out.collect(false);
+            public void flatMap2(String value, Collector<String> out) {
+                for (String word: value.split(" ")) {
+                  out.collect(word);
+                }
             }
         })
-~~~
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val dataStream1 : DataStream[Int] = ...
+val dataStream2 : DataStream[String] = ...
+
+dataStream2.flatMap((str : String) => str.split(" "))
+
+(dataStream1 connect dataStream2)
+  .flatMap(
+    (num : Int) => num.toString,
+    (str : String) => str.split(" ")
+  )
+{% endhighlight %}
+</div>
+</div>
 
 #### WindowReduce on ConnectedDataStream
 The windowReduce operator applies a user defined `CoWindowFunction` to time aligned windows of the two data streams and return zero or more elements of an arbitrary type. The user can define the window and slide intervals and can also implement custom timestamps to be used for calculating windows.
@@ -493,31 +549,32 @@ The windowReduce operator applies a user defined `CoWindowFunction` to time alig
 #### Reduce on ConnectedDataStream
 The Reduce operator for the `ConnectedDataStream` applies a simple reduce transformation on the joined data streams and then maps the reduced elements to a common output type.
 
+<div class="codetabs" markdown="1">
 ### Output splitting
+<div data-lang="java" markdown="1">
 
 Most data stream operators support directed outputs (output splitting), meaning that different output elements are sent only to specific outputs. The outputs are referenced by their name given at the point of receiving:
 
-~~~java
+{% highlight java %}
 SplitDataStream<Integer> split = someDataStream.split(outputSelector);
 DataStream<Integer> even = split.select("even");
 DataStream<Integer> odd = split.select("odd");
-~~~
-
-In the above example the data stream named ‘even’ will only contain elements that are directed to the output named “even”. The user can of course further transform these new stream by for example squaring only the even elements.
+{% endhighlight %}
+In the above example the data stream named “even” will only contain elements that are directed to the output named “even”. The user can of course further transform these new stream by for example squaring only the even elements.
 
 Data streams only receive the elements directed to selected output names. The user can also select multiple output names by `splitStream.select(“output1”, “output2”, …)`. It is common that a stream listens to all the outputs, so `split.selectAll()` provides this functionality without having to select all names.
 
 The outputs of an operator are directed by implementing a selector function (implementing the `OutputSelector` interface):
 
-~~~java
+{% highlight java %}
 Iterable<String> select(OUT value);
-~~~
+{% endhighlight %}
 
 The data is sent to all the outputs returned in the iterable (referenced by their name). This way the direction of the outputs can be determined by the value of the data sent. 
 
 For example to split even and odd numbers:
 
-~~~java
+{% highlight java %}
 @Override
 Iterable<String> select(Integer value) {
 
@@ -531,91 +588,177 @@ Iterable<String> select(Integer value) {
 
     return outputs;
 }
-~~~
+{% endhighlight %}
 
 Every output will be emitted to the selected outputs exactly once, even if you add the same output names more than once.
+</div>
+<div data-lang="scala" markdown="1">
 
+Most data stream operators support directed outputs (output splitting), meaning that different output elements are sent only to specific outputs. The outputs are referenced by their name given at the point of receiving:
+
+{% highlight scala %}
+val split = someDataStream.split(
+  (num: Int) =>
+    (num % 2) match {
+      case 0 => "even"
+      case 1 => "odd"
+    }
+)
+
+val even = split select "even" 
+val odd = split select "odd"
+{% endhighlight %}
+
+In the above example the data stream named “even” will only contain elements that are directed to the output named “even”. The user can of course further transform these new stream by for example squaring only the even elements.
+
+Data streams only receive the elements directed to selected output names. The user can also select multiple output names by `splitStream.select(“output1”, “output2”, …)`. It is common that a stream listens to all the outputs, so `split.selectAll` provides this functionality without having to select all names.
+
+The outputs of an operator are directed by implementing a function that returns the output names for the value. The data is sent to all the outputs returned by the function (referenced by their name). This way the direction of the outputs can be determined by the value of the data sent.
+
+Every output will be emitted to the selected outputs exactly once, even if you add the same output names more than once.
+</div>
+
+</div>
+
+<div class="codetabs" markdown="1">
 ### Iterations
+<div data-lang="java" markdown="1">
 The Flink Streaming API supports implementing iterative stream processing dataflows similarly to the core Flink API. Iterative streaming programs also implement a step function and embed it into an `IterativeDataStream`.
-Unlike in the core API the user does not define the maximum number of iterations, but at the tail of each iteration part of the output is streamed forward to the next operator and part is streamed back to the iteration head. The user controls the output of the iteration tail using [output splitting](#output-splitting).
+Unlike in the core API the user does not define the maximum number of iterations, but at the tail of each iteration part of the output is streamed forward to the next operator and part is streamed back to the iteration head. The user controls the output of the iteration tail using [output splitting](#output-splitting) or [filters](#filter).
 To start an iterative part of the program the user defines the iteration starting point:
 
-~~~java
+{% highlight java %}
 IterativeDataStream<Integer> iteration = source.iterate(maxWaitTimeMillis);
-~~~
+{% endhighlight %}
+
 The operator applied on the iteration starting point is the head of the iteration, where data is fed back from the iteration tail.
 
-~~~java
+{% highlight java %}
 DataStream<Integer> head = iteration.map(new IterationHead());
-~~~
+{% endhighlight %}
 
-To close an iteration and define the iteration tail, the user calls `.closeWith(iterationTail)` method of the `IterativeDataStream`.
+To close an iteration and define the iteration tail, the user calls `.closeWith(iterationTail)` method of the `IterativeDataStream`. This iteration tail (the DataStream given to the `closeWith` function) will be fed back to the iteration head. A common pattern is to use [filters](#filter) to separate the output of the iteration from the feedback-stream.
 
-A common pattern is to use output splitting:
+{% highlight java %}
+DataStream<Integer> tail = head.map(new IterationTail());
 
-~~~java
-SplitDataStream<..> tailOperator = head.map(new IterationTail()).split(outputSelector);
-iteration.closeWith(tailOperator.select("iterate"));
-~~~ 
+iteration.closeWith(tail.filter(isFeedback));
+
+DataStream<Integer> output = tail.filter(isOutput);
 
-In these case all output directed to the “iterate” edge would be fed back to the iteration head.
+output.map(…).project(…)…
+{% endhighlight %}
+
+In this case all values passing the `isFeedback` filter will be fed back to the iteration head, and the values passing the `isOutput` filter will produce the output of the iteration that can be transformed further (here with a `map` and a `projection`) outside the iteration.
 
 Because iterative streaming programs do not have a set number of iterations for each data element, the streaming program has no information on the end of its input. From this it follows that iterative streaming programs run until the user manually stops the program. While this is acceptable under normal circumstances a method is provided to allow iterative programs to shut down automatically if no input received by the iteration head for a predefined number of milliseconds.
 To use this functionality the user needs to add the maxWaitTimeMillis parameter to the `dataStream.iterate(…)` call to control the max wait time. 
+</div>
+<div data-lang="scala" markdown="1">
+The Flink Streaming API supports implementing iterative stream processing dataflows similarly to the core Flink API. Iterative streaming programs also implement a step function and embed it into an `IterativeDataStream`.
+Unlike in the core API the user does not define the maximum number of iterations, but at the tail of each iteration part of the output is streamed forward to the next operator and part is streamed back to the iteration head. The user controls the output of the iteration tail by defining a step function that return two DataStreams: a feedback and an output. The first one is the output that will be fed back to the start of the iteration and the second is the output stream of the iterative part.
+
+A common pattern is to use [filters](#filter) to separate the output from the feedback-stream. In this case all values passing the `isFeedback` filter will be fed back to the iteration head, and the values passing the `isOutput` filter will produce the output of the iteration that can be transformed further (here with a `map` and a `projection`) outside the iteration.
+
+{% highlight scala %}
+val iteratedStream = someDataStream.iterate(maxWaitTime) {
+  iteration => {
+    val head = iteration.map(iterationHead)
+    val tail = head.map(iterationTail)
+    (tail.filter(isFeedback), tail.filter(isOutput))
+  }
+}.map(…).project(…)…
+{% endhighlight %}
+
+Because iterative streaming programs do not have a set number of iterations for each data element, the streaming program has no information on the end of its input. From this it follows that iterative streaming programs run until the user manually stops the program. While this is acceptable under normal circumstances a method is provided to allow iterative programs to shut down automatically if no input received by the iteration head for a predefined number of milliseconds.
+To use this functionality the user needs to add the maxWaitTimeMillis parameter to the `dataStream.iterate(…)` call to control the max wait time. 
+</div>
+
+</div>
 
 ### Rich functions
 The usage of rich functions are essentially the same as in the core Flink API. All transformations that take as argument a user-defined function can instead take a rich function as argument:
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 dataStream.map(new RichMapFunction<Integer, String>() {
-  public String map(Integer value) { return value.toString(); }
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        /* initialization of function */
+    }
+
+    @Override
+    public String map(Integer value) { return value.toString(); }
 });
-~~~
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+dataStream map
+  new RichMapFunction[Int, String] {
+    override def open(config: Configuration) = {
+      /* initialization of function */
+    }
+    override def map(value: Int): String = value.toString
+  }
+{% endhighlight %}
+</div>
+</div>
 
-Rich functions provide, in addition to the user-defined function (`map()`, `reduce()`, etc), the `open()` and `close()` methods for initialization and finalization. (In contrast to the core API, the streaming API currently does not support the  `getRuntimeContext()` and `setRuntimeContext()` methods.)
+Rich functions provide, in addition to the user-defined function (`map()`, `reduce()`, etc), the `open()` and `close()` methods for initialization and finalization.
 
 [Back to top](#top)
 
-### Lambda expressions with Java 8
+Lambda expressions with Java 8
+------------
 
 For a more consice code one can rely on one of the main feautere of Java 8, lambda expressions. The following program has similar functionality to the one provided in the [example](#example-program) section, while showcasing the usage of lambda expressions.
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java8" markdown="1">
+{% highlight java %}
 public class StreamingWordCount {
     public static void main(String[] args) throws Exception {
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-	    DataStream<String> text = env.fromElements(
+        DataStream<String> text = env.fromElements(
                 "Who's there?",
                 "I think I hear them. Stand, ho! Who's there?");
 
             DataStream<Tuple2<String, Integer>> counts = 
-		// normalize and split each line
-		text.map(line -> line.toLowerCase().split("\\W+"))
-		// convert splitted line in pairs (2-tuples) containing: (word,1)
-		.flatMap((String[] tokens, Collector<Tuple2<String, Integer>> out) -> {
-		// emit the pairs with non-zero-length words
-			Arrays.stream(tokens)
-				.filter(t -> t.length() > 0)
-				.forEach(t -> out.collect(new Tuple2<>(t, 1)));
-		})
-		// group by the tuple field "0" and sum up tuple field "1"
-		.groupBy(0)
-		.sum(1);
+        // normalize and split each line
+        text.map(line -> line.toLowerCase().split("\\W+"))
+        // convert splitted line in pairs (2-tuples) containing: (word,1)
+        .flatMap((String[] tokens, Collector<Tuple2<String, Integer>> out) -> {
+        // emit the pairs with non-zero-length words
+            Arrays.stream(tokens)
+                .filter(t -> t.length() > 0)
+                .forEach(t -> out.collect(new Tuple2<>(t, 1)));
+        })
+        // group by the tuple field "0" and sum up tuple field "1"
+        .groupBy(0)
+        .sum(1);
 
         counts.print();
 
         env.execute("Streaming WordCount");
     }
 }
-~~~
+{% endhighlight %}
+</div>
+</div>
 
 For a detailed Java 8 Guide please refer to the [Java 8 Programming Guide](java8_programming_guide.html). Operators specific to streaming, such as Operator splitting also support this usage. [Output splitting](#output-splitting) can be rewritten as follows:
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java8" markdown="1">
+{% highlight java %}
 SplitDataStream<Integer> split = someDataStream
     .split(x -> Arrays.asList(String.valueOf(x % 2)));
-~~~
+{% endhighlight %}
+</div>
+</div>
 
 Operator Settings
 ----------------
@@ -631,12 +774,24 @@ To tackle this issue the user can call `env.setBufferTimeout(timeoutMillis)` on
 
 Usage:
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
 env.setBufferTimeout(timeoutMillis);
 
 env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
-~~~
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment
+env.setBufferTimeout(timeoutMillis)
+
+env.genereateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)
+{% endhighlight %}
+</div>
+</div>
 
 To maximise the throughput the user can call `setBufferTimeout(-1)` which will remove the timeout and buffers will only be flushed when they are full.
 To minimise latency, set the timeout to a value close to 0 (fro example 5 or 10 ms). Theoretically a buffer timeout of 0 will cause all outputs to be flushed when produced, but this setting should be avoided because it can cause severe performance degradation.
@@ -676,11 +831,22 @@ The followings have to be provided for the `KafkaSource(…)` constructor in ord
 
 Example:
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 DataStream<String> stream = env
-	.addSource(new KafkaSource<String>("localhost:2181", "group", "test",new SimpleStringSchema()))
+	.addSource(new KafkaSource<String>("localhost:2181", "group", "test", new SimpleStringSchema()))
 	.print();
-~~~
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+stream = env
+    .addSource(new KafkaSource[String]("localhost:2181", "group", "test", new SimpleStringSchema)
+    .print
+{% endhighlight %}
+</div>
+</div>
 
 #### Kafka Sink
 A class providing an interface for sending data to Kafka. 
@@ -693,10 +859,18 @@ The followings have to be provided for the `KafkaSink()` constructor in order:
 
 Example: 
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 stream.addSink(new KafkaSink<String, String>("test", "localhost:9092", new SimpleStringSchema()));
-~~~
-
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+stream.addSink(new KafkaSink[String, String]("test", "localhost:9092", new SimpleStringSchema))
+{% endhighlight %}
+</div>
+</div>
 
 More about Kafka can be found [here](https://kafka.apache.org/documentation.html).
 
@@ -720,11 +894,22 @@ The followings have to be provided for the `FlumeSource(…)` constructor in ord
 
 Example:
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 DataStream<String> stream = env
 	.addSource(new FlumeSource<String>("localhost", 41414, new SimpleStringSchema()))
 	.print();
-~~~
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+stream = env
+    .addSource(new FlumeSource[String]("localhost", 41414, new SimpleStringSchema))
+    .print
+{% endhighlight %}
+</div>
+</div>
 
 #### Flume Sink
 A class providing an interface for sending data to Flume. 
@@ -737,9 +922,18 @@ The followings have to be provided for the `FlumeSink(…)` constructor in order
 
 Example: 
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 stream.addSink(new FlumeSink<String>("localhost", 42424, new StringToByteSerializer()));
-~~~
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+stream.addSink(new FlumeSink[String]("localhost", 42424, new StringToByteSerializer))
+{% endhighlight %}
+</div>
+</div>
 
 ##### Configuration file<a name="config_file"></a>
 An example of a configuration file:
@@ -793,11 +987,22 @@ The followings have to be provided for the `RMQSource(…)` constructor in order
 
 Example:
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 DataStream<String> stream = env
-	.addSource(new RMQSource<String>("localhost", "hello", new SimpleStringSchema()))
-	.print();
-~~~
+	.addSource(new RMQSource<String>("localhost", "hello", new SimpleStringSchema))
+	.print
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+stream = env
+    .addSource(new RMQSource[String]("localhost", "hello", new SimpleStringSchema))
+    .print
+{% endhighlight %}
+</div>
+</div>
 
 #### RabbitMQ Sink
 A class providing an interface for sending data to RabbitMQ. 
@@ -810,10 +1015,18 @@ The followings have to be provided for the `RMQSink(…)` constructor in order:
 
 Example: 
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 stream.addSink(new RMQSink<String>("localhost", "hello", new StringToByteSerializer()));
-~~~
-
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+stream.addSink(new RMQSink[String]("localhost", "hello", new StringToByteSerializer))
+{% endhighlight %}
+</div>
+</div>
 
 More about RabbitMQ can be found [here](http://www.rabbitmq.com/).
 
@@ -855,16 +1068,34 @@ Both constructors expect a `String authPath` argument determining the location o
 #### Usage
 In constract to other connecters the `TwitterSource` depends on no additional services. For example the following code should run gracefully:
 
-~~~java
-DataStream<String> streamSource = env.AddSource(new TwitterSource("/PATH/TO/myFile.properties"));
-~~~
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<String> streamSource = env.addSource(new TwitterSource("/PATH/TO/myFile.properties"));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+streamSource = env.addSource(new TwitterSource("/PATH/TO/myFile.properties"))
+{% endhighlight %}
+</div>
+</div>
 
 The `TwitterSource` emits strings containing a JSON code. 
 To retrieve information from the JSON code you can add a FlatMap or a Map function handling JSON code. For example use an implementation `JSONParseFlatMap` abstract class among the examples. `JSONParseFlatMap` is an extension of the `FlatMapFunction` and has a
 
-~~~java
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
 String getField(String jsonText, String field);
-~~~
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+getField(jsonText : String, field : String) : String
+{% endhighlight %}
+</div>
+</div>
 
 function which can be use to acquire the value of a given field. 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8c9ab85e/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
index d60e796..9363236 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
@@ -20,18 +20,17 @@ package org.apache.flink.streaming.api.scala
 
 import java.util
 
-import scala.collection.JavaConversions.asScalaBuffer
-import scala.reflect.ClassTag
-
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.api.datastream.{ConnectedDataStream => JavaCStream}
-import org.apache.flink.streaming.api.function.co.{ CoFlatMapFunction, CoMapFunction, CoReduceFunction, CoWindowFunction }
-import org.apache.flink.streaming.api.invokable.operator.co.{ CoFlatMapInvokable, CoMapInvokable, CoReduceInvokable }
+import org.apache.flink.streaming.api.function.co.{CoFlatMapFunction, CoMapFunction, CoReduceFunction, CoWindowFunction}
+import org.apache.flink.streaming.api.invokable.operator.co.{CoFlatMapInvokable, CoMapInvokable, CoReduceInvokable}
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 import org.apache.flink.util.Collector
 
+import scala.collection.JavaConversions.asScalaBuffer
+import scala.reflect.ClassTag
+
 class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
 
   /**
@@ -130,6 +129,30 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
   }
 
   /**
+   * Applies a CoFlatMap transformation on a {@link ConnectedDataStream} and
+   * maps the output to a common type. The transformation calls a
+   * @param fun1 for each element of the first input
+   * and @param fun2 for each element of the second
+   * input. Each CoFlatMapFunction call returns any number of elements
+   * including none.
+   *
+   * @return The transformed { @link DataStream}
+   */
+  def flatMap[R: TypeInformation: ClassTag](fun1: IN1 => TraversableOnce[R],
+      fun2: IN2 => TraversableOnce[R]): DataStream[R] = {
+    if (fun1 == null || fun2 == null) {
+      throw new NullPointerException("FlatMap functions must not be null.")
+    }
+    val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
+      val cleanFun1 = clean(fun1)
+      val cleanFun2 = clean(fun2)
+      def flatMap1(value: IN1, out: Collector[R]) = { cleanFun1(value) foreach out.collect }
+      def flatMap2(value: IN2, out: Collector[R]) = { cleanFun2(value) foreach out.collect }
+    }
+    flatMap(flatMapper)
+  }
+
+  /**
    * GroupBy operation for connected data stream. Groups the elements of
    * input1 and input2 according to keyPosition1 and keyPosition2. Used for
    * applying function on grouped data streams for example

http://git-wip-us.apache.org/repos/asf/flink/blob/8c9ab85e/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index d4df1d6..b673f25 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.scala
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream,
   SingleOutputStreamOperator, GroupedDataStream}
+import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.functions.MapFunction
@@ -183,7 +184,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
   /**
    * Initiates an iterative part of the program that creates a loop by feeding
    * back data streams. To create a streaming iteration the user needs to define
-   * a transformation that creates two DataStreams.The first one one is the output
+   * a transformation that creates two DataStreams. The first one is the output
    * that will be fed back to the start of the iteration and the second is the output
    * stream of the iterative part.
    * <p>
@@ -198,8 +199,30 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    *
    */
-  def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]),  
-        maxWaitTimeMillis:Long = 0): DataStream[R] = {
+  def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R])): DataStream[R] = {
+    iterate(0)(stepFunction)
+  }
+
+  /**
+   * Initiates an iterative part of the program that creates a loop by feeding
+   * back data streams. To create a streaming iteration the user needs to define
+   * a transformation that creates two DataStreams. The first one is the output
+   * that will be fed back to the start of the iteration and the second is the output
+   * stream of the iterative part.
+   * <p>
+   * stepfunction: initialStream => (feedback, output)
+   * <p>
+   * A common pattern is to use output splitting to create feedback and output DataStream.
+   * Please refer to the .split(...) method of the DataStream
+   * <p>
+   * By default a DataStream with iteration will never terminate, but the user
+   * can use the maxWaitTime parameter to set a max waiting time for the iteration head.
+   * If no data received in the set time the stream terminates.
+   *
+   *
+   */
+  def iterate[R](maxWaitTimeMillis:Long = 0)(stepFunction: DataStream[T] => (DataStream[T], DataStream[R]))
+        : DataStream[R] = {
     val iterativeStream = javaStream.iterate(maxWaitTimeMillis)
 
     val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))
@@ -472,18 +495,35 @@ class DataStream[T](javaStream: JavaStream[T]) {
    */
   def split(selector: OutputSelector[T]): SplitDataStream[T] = javaStream.split(selector)
 
+//  /**
+//   * Creates a new SplitDataStream that contains only the elements satisfying the
+//   *  given output selector predicate.
+//   */
+//  def split(fun: T => String): SplitDataStream[T] = {
+//    if (fun == null) {
+//      throw new NullPointerException("OutputSelector must not be null.")
+//    }
+//    val selector = new OutputSelector[T] {
+//      val cleanFun = clean(fun)
+//      def select(in: T): java.lang.Iterable[String] = {
+//        List(cleanFun(in))
+//      }
+//    }
+//    split(selector)
+//  }
+
   /**
    * Creates a new SplitDataStream that contains only the elements satisfying the
    *  given output selector predicate.
    */
-  def split(fun: T => String): SplitDataStream[T] = {
+  def split(fun: T => TraversableOnce[String]): SplitDataStream[T] = {
     if (fun == null) {
       throw new NullPointerException("OutputSelector must not be null.")
     }
     val selector = new OutputSelector[T] {
       val cleanFun = clean(fun)
       def select(in: T): java.lang.Iterable[String] = {
-        List(cleanFun(in))
+        cleanFun(in).toIterable.asJava
       }
     }
     split(selector)

http://git-wip-us.apache.org/repos/asf/flink/blob/8c9ab85e/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
index 9e33f80..105d2c1 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
@@ -26,8 +26,6 @@ import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaS
  * {@link #select} function. To apply a transformation on the whole output simply call
  * the appropriate method on this stream.
  *
- * @param <OUT>
- *            The type of the output.
  */
 class SplitDataStream[T](javaStream: SplitJavaStream[T]) extends DataStream[T](javaStream){