You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/09 12:32:04 UTC

[2/3] flink git commit: [FLINK-2779] Update documentation to reflect new Stream/Window API

http://git-wip-us.apache.org/repos/asf/flink/blob/c9088a49/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index 3f5a98f..edba15f 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -1,6 +1,6 @@
 ---
-title: "Flink Stream Processing API"
-is_beta: true
+title: "Flink DataStream API Programming Guide"
+is_beta: false
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -23,76 +23,49 @@ under the License.
 
 <a href="#top"></a>
 
-Flink Streaming is a system for high-throughput, low-latency data stream processing. Flink Streaming natively supports [stateful computation](#stateful-computation), data-driven [windowing semantics](#window-operators) and [iterative](#iterations) stream processing. The system can connect to and process data streams from different data sources like file sources, web sockets, message queues (Apache Kafka, RabbitMQ, Twitter Streaming API …), and also from any user defined data sources. Data streams can be transformed and modified to create new data streams using high-level functions similar to the ones provided by the batch processing API.
+DataStream programs in Flink are regular programs that implement transformations on data streams
+(e.g., filtering, updating state, defining windows, aggregating). The data streams are initially created from various
+sources (e.g., message queues, socket streams, files). Results are returned via sinks, which may for
+example write the data to files, or to standard output (for example the command line
+terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs.
+The execution can happen in a local JVM, or on clusters of many machines.
 
-* This will be replaced by the TOC
-{:toc}
-
-Flink Streaming API
------------
-
-The Streaming API is currently part of the *flink-staging* Maven project. All relevant classes are located in the *org.apache.flink.streaming* package.
+In order to create your own Flink DataStream program, we encourage you to start with the
+[program skeleton](#program-skeleton) and gradually add your own
+[transformations](#transformations). The remaining sections act as references for additional
+operations and advanced features.
 
-Add the following dependency to your `pom.xml` to use the Flink Streaming.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-streaming-core</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-clients</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-streaming-scala</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-clients</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-</div>
-</div>
 
-In order to create your own Flink Streaming program, we encourage you to start with the [skeleton](#program-skeleton) and gradually add your own [transformations](#transformations). The remaining sections act as references for additional transformations and advanced features.
+* This will be replaced by the TOC
+{:toc}
 
 
 Example Program
 ---------------
 
-The following program is a complete, working example of streaming WordCount, that incrementally counts the words coming from a web socket. You can copy &amp; paste the code to run it locally.
+The following program is a complete, working example of streaming window word count application, that counts the
+words coming from a web socket in 5 second windows. You can copy &amp; paste the code to run it locally.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
 {% highlight java %}
-public class StreamingWordCount {
+public class WindowWordCount {
 
-    public static void main(String[] args) {
+    public static void main(String[] args) throws Exception {
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        
+
         DataStream<Tuple2<String, Integer>> dataStream = env
                 .socketTextStream("localhost", 9999)
                 .flatMap(new Splitter())
-                .groupBy(0)
+                .keyBy(0)
+                .timeWindow(Time.of(5, TimeUnit.SECONDS))
                 .sum(1);
-        
+
         dataStream.print();
-        
-        env.execute("Socket Stream WordCount");
+
+        env.execute("Window WordCount");
     }
     
     public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@@ -112,7 +85,7 @@ public class StreamingWordCount {
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 
-object WordCount {
+object WindowWordCount {
   def main(args: Array[String]) {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
@@ -120,12 +93,13 @@ object WordCount {
 
     val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
       .map { (_, 1) }
-      .groupBy(0)
+      .keyBy(0)
+      .timeWindow(Time.of(5, TimeUnit.SECONDS))
       .sum(1)
 
     counts.print
 
-    env.execute("Scala Socket Stream WordCount")
+    env.execute("Window Stream WordCount")
   }
 }
 {% endhighlight %}
@@ -139,7 +113,82 @@ To run the example program, start the input stream with netcat first from a term
 nc -lk 9999
 ~~~
 
-The lines typed to this terminal will be the source data stream for your streaming job.
+Just type some words hitting return for a new word. These will be the input to the
+word count program. If you want to see counts greater than 1, type the same word again and again within 
+5 seconds (increase the window size from 5 seconds if you cannot type that fast &#9786;).
+
+[Back to top](#top)
+
+
+Linking with Flink
+------------------
+
+To write programs with Flink, you need to include the Flink DataStream library corresponding to
+your programming language in your project.
+
+The simplest way to do this is to use one of the quickstart scripts: either for
+[Java]({{ site.baseurl }}/quickstart/java_api_quickstart.html) or for [Scala]({{ site.baseurl }}/quickstart/scala_api_quickstart.html). They
+create a blank project from a template (a Maven Archetype), which sets up everything for you. To
+manually create the project, you can use the archetype and create a project by calling:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight bash %}
+mvn archetype:generate /
+    -DarchetypeGroupId=org.apache.flink/
+    -DarchetypeArtifactId=flink-quickstart-java /
+    -DarchetypeVersion={{site.version }}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight bash %}
+mvn archetype:generate /
+    -DarchetypeGroupId=org.apache.flink/
+    -DarchetypeArtifactId=flink-quickstart-scala /
+    -DarchetypeVersion={{site.version }}
+{% endhighlight %}
+</div>
+</div>
+
+The archetypes are working for stable releases and preview versions (`-SNAPSHOT`).
+
+If you want to add Flink to an existing Maven project, add the following entry to your
+*dependencies* section in the *pom.xml* file of your project:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-streaming-core</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-clients</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-streaming-scala</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-clients</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+</div>
+</div>
+
+In order to create your own Flink program, we encourage you to start with the
+[program skeleton](#program-skeleton) and gradually add your own
+[transformations](#transformations).
 
 [Back to top](#top)
 
@@ -149,7 +198,10 @@ Program Skeleton
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-As presented in the [example](#example-program), a Flink Streaming program looks almost identical to a regular Flink program. Each stream processing program consists of the following parts:
+<br />
+
+As presented in the [example](#example-program), Flink DataStream programs look like regular Java
+programs with a `main()` method. Each program consists of the same basic parts:
 
 1. Obtaining a `StreamExecutionEnvironment`,
 2. Connecting to data stream sources,
@@ -157,54 +209,97 @@ As presented in the [example](#example-program), a Flink Streaming program looks
 4. Specifying output for the processed data,
 5. Executing the program.
 
-As these steps are basically the same as in the batch API, we will only note the important differences.
-For stream processing jobs, the user needs to obtain a `StreamExecutionEnvironment` in contrast with the [batch API](programming_guide.html#program-skeleton) where one would need an `ExecutionEnvironment`. Otherwise, the process is essentially the same:
+We will now give an overview of each of those steps, please refer to the respective sections for
+more details. 
+
+The `StreamExecutionEnvironment` is the basis for all Flink DataStream programs. You can
+obtain one using these static methods on class `StreamExecutionEnvironment`:
 
 {% highlight java %}
-StreamExecutionEnvironment.getExecutionEnvironment();
-StreamExecutionEnvironment.createLocalEnvironment(parallelism);
-StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles);
+getExecutionEnvironment()
+
+createLocalEnvironment()
+createLocalEnvironment(int parallelism)
+createLocalEnvironment(int parallelism, Configuration customConfiguration)
+
+createRemoteEnvironment(String host, int port, String... jarFiles)
+createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles)
 {% endhighlight %}
 
-For connecting to data streams the `StreamExecutionEnvironment` has many different methods, from basic file sources to completely general user defined data sources. We will go into details in the [basics](#basics) section.
+Typically, you only need to use `getExecutionEnvironment()`, since this
+will do the right thing depending on the context: if you are executing
+your program inside an IDE or as a regular Java program it will create
+a local environment that will execute your program on your local machine. If
+you created a JAR file from your program, and invoke it through the [command line](cli.html)
+or the [web interface](web_client.html),
+the Flink cluster manager will execute your main method and `getExecutionEnvironment()` will return
+an execution environment for executing your program on a cluster.
 
-For example:
+For specifying data sources the execution environment has several methods
+to read from files, sockets, and external systems using various methods. To just read
+data from a socket (useful also for debugging), you can use:
 
 {% highlight java %}
-env.socketTextStream(host, port);
-env.fromElements(elements…);
-env.addSource(sourceFunction)
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+DataStream<String> lines = env.socketTextStream("localhost", 9999)
 {% endhighlight %}
 
-After defining the data stream sources the user can specify transformations on the data streams to create a new data stream. Different data streams can be also combined together for joint transformations which are being showcased in the [transformations](#transformations) section.
+This will give you a DataStream on which you can then apply transformations. For
+more information on data sources and input formats, please refer to
+[Data Sources](#data-sources).
 
-For example:
+Once you have a DataStream you can apply transformations to create a new
+DataStream which you can then write to a socket, transform again,
+combine with other DataStreams, or push to an external system (e.g., a message queue, or a file system).
+You apply transformations by calling
+methods on DataStream with your own custom transformation functions. For example,
+a map transformation looks like this:
 
 {% highlight java %}
-dataStream.map(mapFunction).reduce(reduceFunction);
+DataStream<String> input = ...;
+
+DataStream<Integer> intValues = input.map(new MapFunction<String, Integer>() {
+    @Override
+    public Integer map(String value) {
+        return Integer.parseInt(value);
+    }
+});
 {% endhighlight %}
 
-The processed data can be pushed to different outputs called sinks. The user can define their own sinks or use any predefined filesystem, message queue or database sink.
+This will create a new DataStream by converting every String in the original
+stream to an Integer. For more information and a list of all the transformations,
+please refer to [Transformations](#transformations).
 
-For example:
+Once you have a DataStream containing your final results, you can push the result
+to an external system (HDFS, Kafka, Elasticsearch), write it to a socket, write to a file,
+or print it.
 
 {% highlight java %}
-dataStream.writeAsCsv(path);
-dataStream.print();
-dataStream.addSink(sinkFunction)
-{% endhighlight %}
+writeAsText(String path, ...)
+writeAsCsv(String path, ...)
+writeToSocket(String hostname, int port, ...)
+
+print()
 
-Once the complete program is specified `execute(programName)` is to be called on the `StreamExecutionEnvironment`. This will either execute on the local machine or submit the program for execution on a cluster, depending on the chosen execution environment.
+addSink(...)
+{% endhighlight %}
 
+Once you specified the complete program you need to **trigger the program execution** by 
+calling `execute()` on `StreamExecutionEnvironment`. This will either execute on
+the local machine or submit the program for execution on a cluster, depending on the chosen execution environment.
+        
 {% highlight java %}
-env.execute(programName);
+env.execute();
 {% endhighlight %}
 
 </div>
-
 <div data-lang="scala" markdown="1">
 
-As presented in the [example](#example-program) a Flink Streaming program looks almost identical to a regular Flink program. Each stream processing program consists of the following parts:
+<br />
+
+As presented in the [example](#example-program), Flink DataStream programs look like regular Scala
+programs with a `main()` method. Each program consists of the same basic parts:
 
 1. Obtaining a `StreamExecutionEnvironment`,
 2. Connecting to data stream sources,
@@ -212,216 +307,830 @@ As presented in the [example](#example-program) a Flink Streaming program looks
 4. Specifying output for the processed data,
 5. Executing the program.
 
-As these steps are basically the same as in the batch API we will only note the important differences.
-For stream processing jobs, the user needs to obtain a `StreamExecutionEnvironment` in contrast with the [batch API](programming_guide.html#program-skeleton) where one would need an `ExecutionEnvironment`. The process otherwise is essentially the same:
+We will now give an overview of each of those steps, please refer to the respective sections for
+more details.
+
+The `StreamExecutionEnvironment` is the basis for all Flink DataStream programs. You can
+obtain one using these static methods on class `StreamExecutionEnvironment`:
 
 {% highlight scala %}
-StreamExecutionEnvironment.getExecutionEnvironment
-StreamExecutionEnvironment.createLocalEnvironment(parallelism)
-StreamExecutionEnvironment.createRemoteEnvironment(host: String, port: String, parallelism: Int, jarFiles: String*)
+def getExecutionEnvironment
+
+def createLocalEnvironment(parallelism: Int =  Runtime.getRuntime.availableProcessors())
+
+def createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
+def createRemoteEnvironment(host: String, port: Int, parallelism: Int, jarFiles: String*)
 {% endhighlight %}
 
-For connecting to data streams the `StreamExecutionEnvironment` has many different methods, from basic file sources to completely general user defined data sources. We will go into details in the [basics](#basics) section.
+Typically, you only need to use `getExecutionEnvironment`, since this
+will do the right thing depending on the context: if you are executing
+your program inside an IDE or as a regular Java program it will create
+a local environment that will execute your program on your local machine. If
+you created a JAR file from you program, and invoke it through the [command line](cli.html)
+or the [web interface](web_client.html),
+the Flink cluster manager will execute your main method and `getExecutionEnvironment()` will return
+an execution environment for executing your program on a cluster.
 
-For example:
+For specifying data sources the execution environment has several methods
+to read from files, sockets, and external systems using various methods. To just read
+data from a socket (useful also for debugginf), you can use:
 
 {% highlight scala %}
-env.socketTextStream(host, port)
-env.fromElements(elements…)
-env.addSource(sourceFunction)
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment
+
+DataStream<String> lines = env.socketTextStream("localhost", 9999)
 {% endhighlight %}
 
-After defining the data stream sources the user can specify transformations on the data streams to create a new data stream. Different data streams can be also combined together for joint transformations which are being showcased in the [transformations](#transformations) section.
+This will give you a DataStream on which you can then apply transformations. For
+more information on data sources and input formats, please refer to
+[Data Sources](#data-sources).
 
-For example:
+Once you have a DataStream you can apply transformations to create a new
+DataStream which you can then write to a file, transform again,
+combine with other DataStreams, or push to an external system.
+You apply transformations by calling
+methods on DataStream with your own custom transformation function. For example,
+a map transformation looks like this:
 
 {% highlight scala %}
-dataStream.map(mapFunction).reduce(reduceFunction)
+val input: DataStream[String] = ...
+
+val mapped = input.map { x => x.toInt }
 {% endhighlight %}
 
-The processed data can be pushed to different outputs called sinks. The user can define their own sinks or use any predefined filesystem, message queue or database sink.
+This will create a new DataStream by converting every String in the original
+set to an Integer. For more information and a list of all the transformations,
+please refer to [Transformations](#transformations).
 
-For example:
+Once you have a DataStream containing your final results, you can push the result
+to an external system (HDFS, Kafka, Elasticsearch), write it to a socket, write to a file,
+or print it.
 
 {% highlight scala %}
-dataStream.writeAsCsv(path)
-dataStream.print
-dataStream.addSink(sinkFunction)
+writeAsText(path: String, ...)
+writeAsCsv(path: String, ...)
+writeToSocket(hostname: String, port: Int, ...)
+
+print()
+
+addSink(...)
 {% endhighlight %}
 
-Once the complete program is specified `execute(programName)` is to be called on the `StreamExecutionEnvironment`. This will either execute on the local machine or submit the program for execution on a cluster, depending on the chosen execution environment.
+Once you specified the complete program you need to **trigger the program execution** by
+calling `execute` on `StreamExecutionEnvironment`. This will either execute on
+the local machine or submit the program for execution on a cluster, depending on the chosen execution environment.
 
 {% highlight scala %}
-env.execute(programName)
+env.execute()
 {% endhighlight %}
 
 </div>
-
 </div>
 
 [Back to top](#top)
 
-Basics
-----------------
-
-### DataStream
+DataStream Abstraction
+----------------------
 
-The `DataStream` is the basic data abstraction provided by Flink Streaming. It represents a continuous, parallel, immutable stream of data of a certain type. By applying transformations the user can create new data streams or output the results of the computations. For instance the map transformation creates a new `DataStream` by applying a user defined function on each element of a given `DataStream`
+A `DataStream` is a possibly unbounded immutable collection of data items of a the same type.
 
-The transformations may return different data stream types allowing more elaborate transformations, for example the `groupBy(…)` method returns a `GroupedDataStream` which can be used for grouped transformations such as aggregating by key. We will discover more elaborate data stream types in the upcoming sections.
+Transformations may return different subtypes of `DataStream` allowing specialized transformations.
+For example the `keyBy(…)` method returns a `KeyedDataStream` which is a stream of data that
+is logically partitioned by a certain key, and can be further windowed.
 
-### Object Reuse Behavior
-
-Apache Flink is trying to reduce the number of object allocations for better performance.
+[Back to top](#top)
 
-By default, user defined functions (like `map()` or `reduce()`) are getting new objects on each call
-(or through an iterator). So it is possible to keep references to the objects inside the function
-(for example in a List).
+Lazy Evaluation
+---------------
 
-There is a switch at the `ExectionConfig` which allows users to enable the object reuse mode:
+All Flink DataStream programs are executed lazily: When the program's main method is executed, the data loading
+and transformations do not happen directly. Rather, each operation is created and added to the
+program's plan. The operations are actually executed when the execution is explicitly triggered by 
+an `execute()` call on the `StreamExecutionEnvironment` object. Whether the program is executed locally 
+or on a cluster depends on the type of `StreamExecutionEnvironment`.
 
-```
-env.getExecutionConfig().enableObjectReuse()
-```
+The lazy evaluation lets you construct sophisticated programs that Flink executes as one
+holistically planned unit.
 
-For mutable types, Flink will reuse object
-instances. In practice that means that a `map()` function will always receive the same object
-instance (with its fields set to new values). The object reuse mode will lead to better performance
-because fewer objects are created, but the user has to manually take care of what they are doing
-with the object references.
-
-### Data Shipping Strategies
-
-The data shipping strategy controls how individual elements of a stream are distributed among the parallel instances of a transformation operator. This also controls the ordering of the records in the `DataStream`. There is partial ordering guarantee for the outputs with respect to the shipping strategy (outputs produced from each partition are guaranteed to arrive in the order they were produced).
-
-These are the supported shipping strategies:
-
- * *Forward*: Forward shipping directs the output data to the next operator on the same machine, avoiding expensive network I/O. It can only be used when the parallelism of the input operations matches the parallelism of the downstream operation. This is the default shipping strategy if no strategy is specified and if the parallelism allows it.
-Usage: `dataStream.forward()`
- * *Shuffle*: Shuffle randomly partitions the output data stream to the next operator using uniform distribution. Use this only when it is important that the partitioning is randomised. If you only care about an even load use *Rebalance*.
-Usage: `dataStream.shuffle()`
- * *Rebalance*: Rebalance directs the output data stream to the next operator in a round-robin fashion, achieving a balanced distribution. This is the default strategy if no strategy is defined and forward shipping is not possible because the parallelism of operations differs.
-Usage: `dataStream.rebalance()`
- * *Field/Key Partitioning*: Field/Key partitioning partitions the output data stream based on the hash code of a selected key of the tuples. Data points with the same key are directed to the same operator instance.
-Usage: `dataStream.partitionByHash(fields…)`
-* *Field/Key Grouping*: Field/Key grouping takes field/key partitioning one step further and seperates the elements into disjoint groups based on the hash code. These groups are processed separately by the next downstream operator.
-Usage: `dataStream.groupBy(fields…)`
- * *Broadcast*: Broadcast shipping sends the output data stream to all parallel instances of the next operator.
-Usage: `dataStream.broadcast()`
- * *Global*: All elements are directed to the first downstream instance of the operator.
-Usage: `dataStream.global()`
-
-Custom partitioning can also be used by giving a Partitioner function and a single field key to partition on, similarly to the batch API.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<Tuple2<String,Integer>> in = // [...]
-DataStream<Tuple2<String,Integer>> result =in
-    .partitionCustom(Partitioner<K> partitioner, key)
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
+[Back to top](#top)
 
-{% highlight scala %}
-val in: DataSet[(Int, String)] = // [...]
-val result = in
-    .partitionCustom(partitioner: Partitioner[K], key)
-{% endhighlight %}
-</div>
-</div>
 
-The shipping strategy does not remain in effect after a transformation, so it needs to be set again for subsequent operations.
+Transformations
+---------------
 
-### Connecting to the outside world
+Data transformations transform one or more DataStreams into a new DataStream. Programs can combine
+multiple transformations into sophisticated topologies.
 
-The user is expected to connect to the outside world through the source and the sink interfaces.
+This section gives a description of all the available transformations.
 
-#### Sources
 
-Sources can by created by using `StreamExecutionEnvironment.addSource(sourceFunction)`.
-Either use one of the source functions that come with Flink or write a custom source
-by implementing the `SourceFunction` interface. By default, sources run with
-parallelism of 1. To create parallel sources the user's source function needs to implement
-`ParallelSourceFunction` or extend `RichParallelSourceFunction` in which cases the source will have
-the parallelism of the environment. The parallelism for ParallelSourceFunctions can be changed
-after creation by using `source.setParallelism(parallelism)`.
-
-The `SourceFunction` interface has two methods: `run(SourceContext)` and `cancel()`. The `run()`
-method is not expected to return until the source has either finished by itself or received
-a cancel request. The source can communicate with the outside world using the source context. For
-example, the `emit(element)` method is used to emit one element from the source. Most sources will
-have an infinite while loop inside the `run()` method to read from the input and emit elements.
-Upon invocation of the `cancel()` method the source is required to break out of its internal
-loop and return from the `run()` method. A common implementation for this is the following:
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 
-{% highlight java %}
-public static class MySource implements SourceFunction<Long> {
+<br />
 
-    // utility for job cancellation
-    private volatile boolean isRunning = false;
-    
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+          <td><strong>Map</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Takes one element and produces one element. A map function that doubles the values of the input stream:</p>
+    {% highlight java %}
+DataStream<Integer> dataStream = //...
+dataStream.map(new MapFunction<Integer, Integer>() {
     @Override
-    public void run(SourceContext<Long> ctx) throws Exception {
-        isRunning = true;
-        while (isRunning) {
-            // the source runs, isRunning flag should be checked frequently
-            }
+    public Integer map(Integer value) throws Exception {
+        return 2 * value;
+    }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+
+        <tr>
+          <td><strong>FlatMap</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:</p>
+    {% highlight java %}
+dataStream.flatMap(new FlatMapFunction<String, String>() {
+    @Override
+    public void flatMap(String value, Collector<String> out)
+        throws Exception {
+        for(String word: value.split(" ")){
+            out.collect(word);
         }
     }
-
-    // invoked by the framework in case of job cancellation
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Filter</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Evaluates a boolean function for each element and retains those for which the function returns true.
+            A filter that filters out zero values:
+            </p>
+    {% highlight java %}
+dataStream.filter(new FilterFunction<Integer>() {
     @Override
-    public void cancel() {
-        isRunning = false;
+    public boolean filter(Integer value) throws Exception {
+        return value != 0;
+    }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>KeyBy</strong><br>DataStream &rarr; KeyedStream</td>
+          <td>
+            <p>Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. 
+            Internally, this is implemented with hash partitioning. See <a href="#specifying-keys">keys</a> on how to specify keys.
+            This transformation returns a KeyedDataStream.</p>
+    {% highlight java %}
+dataStream.keyBy("someKey") // Key by field "someKey"
+dataStream.keyBy(0) // Key by the first element of a Tuple
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Reduce</strong><br>KeyedStream &rarr; DataStream</td>
+          <td>
+            <p>A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and
+            emits the new value.
+                    <br/>
+            	<br/>
+            A reduce function that creates a stream of partial sums:</p>
+            {% highlight java %}
+keyedStream.reduce(new ReduceFunction<Integer>() {
+    @Override
+    public Integer reduce(Integer value1, Integer value2)
+    throws Exception {
+        return value1 + value2;
+    }
+});
+            {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Fold</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+          <p>A "rolling" fold on a keyed data stream with an initial value. 
+          Combines the current element with the last folded value and
+          emits the new value.
+          <br/>
+          <br/>
+          A fold function that creates a stream of partial sums:</p>
+          {% highlight java %}
+keyedStream.fold(0, new ReduceFunction<Integer>() {
+  @Override
+  public Integer fold(Integer accumulator, Integer value)
+  throws Exception {
+      return accumulator + value;
+  }
+});
+          {% endhighlight %}
+          </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Aggregations</strong><br>KeyedStream &rarr; DataStream</td>
+          <td>
+            <p>Rolling aggregations on a keyed data stream. The difference between min 
+	    and minBy is that min returns the minimun value, whereas minBy returns
+	    the element that has the minimum value in this field (same for max and maxBy).</p>
+    {% highlight java %}
+keyedStream.sum(0);
+keyedStream.sum("key");
+keyedStream.min(0);
+keyedStream.min("key");
+keyedStream.max(0);
+keyedStream.max("key");
+keyedStream.minBy(0);
+keyedStream.minBy("key");
+keyedStream.maxBy(0);
+keyedStream.maxBy("key");
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window</strong><br>KeyedStream &rarr; WindowedStream</td>
+          <td>
+            <p>Windows can be defined on already partitioned KeyedStreams. Windows group the data in each
+            key according to some characteristic (e.g., the data that arrived within the last 5 seconds).
+            See <a href="#windows">windows</a> for a complete description of windows.
+    {% highlight java %}
+dataStream.keyBy(0).window(TumblingTimeWindows.of(5, TimeUnit.SECONDS)); // Last 5 seconds of data
+    {% endhighlight %}
+        </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>WindowAll</strong><br>DataStream &rarr; AllWindowedDataStream</td>
+          <td>
+              <p>Windows can be defined on regular DataStreams. Windows group all the stream events
+              according to some characteristic (e.g., the data that arrived within the last 5 seconds).
+              See <a href="#windows">windows</a> for a complete description of windows.</p>
+              <p><strong>WARNING:</strong> This is in many cases a <strong>non-parallel</strong> transformation. All records will be
+               gathered in one task for the windowAll operator.</p>
+  {% highlight java %}
+dataStream.windowAll(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))); // Last 5 seconds of data
+  {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Apply</strong><br>WindowedStream &rarr; DataStream<br>AllWindowedStream &rarr; DataStream</td>
+          <td>
+            <p>Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.</p>
+            <p><strong>Note:</strong> If you are using a windowAll transformation, you need to use an AllWindowFunction instead.</p>
+    {% highlight java %}
+windowedStream.apply (new WindowFunction<Tuple2<String,Integer>,Integer>, Tuple, Window>() {
+    public void apply (Tuple tuple,
+            Window window,
+            Iterable<Tuple2<String, Integer>> values,
+            Collector<Integer> out) throws Exception {
+        int sum = 0;
+        for (value t: values) {
+            sum += t.f1;
+        }
+        out.collect (new Integer(sum));
+    }
+};
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Reduce</strong><br>WindowedStream &rarr; DataStream</td>
+          <td>
+            <p>Applies a functional reduce function to the window and returns the reduced value.</p>
+    {% highlight java %}
+windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>() {
+    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
+        return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
+    }
+};
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Fold</strong><br>WindowedStream &rarr; DataStream</td>
+          <td>
+            <p>Applies a functional fold function to the window and returns the folded value.</p>
+    {% highlight java %}
+windowedStream.fold (new Tuple2<String,Integer>("Sum of all", 0),  new FoldFunction<Tuple2<String,Integer>() {
+    public Tuple2<String, Integer> fold(Tuple2<String, Integer> acc, Tuple2<String, Integer> value) throws Exception {
+        return new Tuple2<String,Integer>(acc.f0, acc.f1 + value.f1);
+    }
+};
+    {% endhighlight %}
+          </td>
+        </tr>	
+        <tr>
+          <td><strong>Aggregations on windows</strong><br>WindowedStream &rarr; DataStream</td>
+          <td>
+            <p>Aggregates the contents of a window. The difference between min 
+	    and minBy is that min returns the minimun value, whereas minBy returns
+	    the element that has the minimum value in this field (same for max and maxBy).</p>
+    {% highlight java %}
+windowedStream.sum(0);
+windowedStream.sum("key");
+windowedStream.min(0);
+windowedStream.min("key");
+windowedStream.max(0);
+windowedStream.max("key");
+windowedStream.minBy(0);
+windowedStream.minBy("key");
+windowedStream.maxBy(0);
+windowedStream.maxBy("key");
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Union</strong><br>DataStream* &rarr; DataStream</td>
+          <td>
+            <p>Union of two or more data streams creating a new stream containing all the elements from all the streams. Node: If you union a data stream
+            with itself you will still only get each element once.</p>
+    {% highlight java %}
+dataStream.union(otherStream1, otherStream2, ...);
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Join</strong><br>DataStream,DataStream &rarr; DataStream</td>
+          <td>
+            <p>Join two data streams on a given key and a common window.</p>
+    {% highlight java %}
+dataStream.join(otherStream)
+    .where(0).equalTo(1)
+    .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
+    .apply (new JoinFunction () {...});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window CoGroup</strong><br>DataStream,DataStream &rarr; DataStream</td>
+          <td>
+            <p>Cogroups two data streams on a given key and a common window.</p>
+    {% highlight java %}
+dataStream.coGroup(otherStream)
+    .where(0).equalTo(1)
+    .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
+    .apply (new CoGroupFunction () {...});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Connect</strong><br>DataStream,DataStream &rarr; ConnectedStreams</td>
+          <td>
+            <p>"Connects" two data streams retaining their types. Connect allowing for shared state between
+            the two streams.</p>
+    {% highlight java %}
+DataStream<Integer> someStream = //...
+DataStream<String> otherStream = //...
+
+ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>CoMap, CoFlatMap</strong><br>ConnectedStreams &rarr; DataStream</td>
+          <td>
+            <p>Similar to map and flatMap on a connected data stream</p>
+    {% highlight java %}
+connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
+    @Override
+    public Boolean map1(Integer value) {
+        return true;
     }
 
-}
-{% endhighlight %}
-
-In addition to the bounded data sources (with similar method signatures as the
-[batch API](programming_guide.html#data-sources)) there are several predefined stream sources
-accessible from the `StreamExecutionEnvironment`:
-
-* *Socket text stream*: Creates a new `DataStream` that contains the strings received
-from the given socket. Strings are decoded by the system's default character set. The user
-can optionally set the delimiters or the number of connection retries in case of errors.
-Usage: `env.socketTextStream(hostname, port,…)`
-
-* *Text file stream*: Creates a new `DataStream` that contains the lines of the files created
-(or modified) in a given directory. The system continuously monitors the given path, and processes
-any new files or modifications based on the settings. The file will be read with the system's
-default character set.
-Usage: `env.readFileStream(String path, long checkFrequencyMillis, WatchType watchType)`
-
-* *Message queue connectors*: There are pre-implemented connectors for a number of popular message
-queue services, please refer to the section on [connectors](#stream-connectors) for more details.
-
-* *Custom source*: Creates a new `DataStream` by using a user defined `SourceFunction` implementation.
-Usage: `env.addSource(sourceFunction)`
-
-#### Sinks
-
-`DataStreamSink` represents the different outputs of Flink Streaming programs. The user can either define his own `SinkFunction` implementation or chose one of the available implementations (methods of `DataStream`).
-
-For example:
+    @Override
+    public Boolean map2(String value) {
+        return false;
+    }
+});
+connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
+
+   @Override
+   public void flatMap1(Integer value, Collector<String> out) {
+       out.collect(value.toString());
+   }
+
+   @Override
+   public void flatMap2(String value, Collector<String> out) {
+       for (String word: value.split(" ")) {
+         out.collect(word);
+       }
+   }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Split</strong><br>DataStream &rarr; SplitStream</td>
+          <td>
+            <p>
+                Split the stream into two or more streams according to some criterion.
+                {% highlight java %}
+SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
+    @Override
+    public Iterable<String> select(Integer value) {
+        List<String> output = new ArrayList<String>();
+        if (value % 2 == 0) {
+            output.add("even");
+        }
+        else {
+            output.add("odd");
+        }
+        return output;
+    }
+});
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Select</strong><br>SplitStream &rarr; DataStream</td>
+          <td>
+            <p>
+                Select one or more streams from a split stream.
+                {% highlight java %}
+SplitStream<Integer> split;
+DataStream<Integer> even = split.select("even");
+DataStream<Integer> odd = split.select("odd");
+DataStream<Integer> all = split.select("even","odd");
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Iterate</strong><br>DataStream &rarr; IterativeStream &rarr; DataStream</td>
+          <td>
+            <p>
+                Creates a "feedback" loop in the flow, by redirecting the output of one operator
+                to some previous operator. This is especially useful for defining algorithms that
+                continuously update a model. The following code starts with a stream and applies
+		the iteration body continuously. Elements that are greater than 0 are sent back
+		to the feedback channel, and the rest of the elements are forwarded downstream.
+		See <a href="#iterations">iterations</a> for a complete description.
+                {% highlight java %}
+IterativeStream<Long> iteration = initialStream.iterate();
+DataStream<Long> iterationBody = iteration.map (/*do something*/);
+DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
+    @Override
+    public boolean filter(Integer value) throws Exception {
+        return value > 0;
+    }
+});
+iteration.closeWith(feedback);
+DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
+    @Override
+    public boolean filter(Integer value) throws Exception {
+        return value <= 0;
+    }
+});
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Extract Timestamps</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>
+                Extracts timestamps from records in order to work with windows
+                that use event time semantics. See <a href="#working-with-time">working with time</a>.
+                {% highlight java %}
+stream.assignTimestamps (new TimeStampExtractor() {...});
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+  </tbody>
+</table>
 
- * `dataStream.print()` – Writes the `DataStream` to the standard output, practical for testing purposes
- * `dataStream.writeAsText(parameters)` – Writes the `DataStream` to a text file
- * `dataStream.writeAsCsv(parameters)` – Writes the `DataStream` to CSV format
- * `dataStream.addSink(sinkFunction)` – Custom sink implementation
+</div>
 
-There are pre-implemented connectors for a number of the most popular message queue services, please refer to the section on [connectors](#stream-connectors) for more detail.
+<div data-lang="scala" markdown="1">
 
-[Back to top](#top)
+<br />
 
-Transformations
-----------------
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+          <td><strong>Map</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Takes one element and produces one element. A map function that doubles the values of the input stream:</p>
+    {% highlight scala %}
+dataStream.map { x => x * 2 }
+    {% endhighlight %}
+          </td>
+        </tr>
+
+        <tr>
+          <td><strong>FlatMap</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:</p>
+    {% highlight scala %}
+dataStream.flatMap { str => str.split(" ") }
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Filter</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Evaluates a boolean function for each element and retains those for which the function returns true.
+            A filter that filters out zero values:
+            </p>
+    {% highlight scala %}
+dataStream.filter { _ != 0 }
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>KeyBy</strong><br>DataStream &rarr; KeyedStream</td>
+          <td>
+            <p>Logically partitions a stream into disjoint partitions, each partition containing elements of the same key.
+            Internally, this is implemented with hash partitioning. See <a href="#specifying-keys">keys</a> on how to specify keys.
+            This transformation returns a KeyedDataStream.</p>
+    {% highlight scala %}
+dataStream.keyBy("someKey") // Key by field "someKey"
+dataStream.keyBy(0) // Key by the first element of a Tuple
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Reduce</strong><br>KeyedStream &rarr; DataStream</td>
+          <td>
+            <p>A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and
+            emits the new value.
+                    <br/>
+            	<br/>
+            A reduce function that creates a stream of partial sums:</p>
+            {% highlight scala %}
+keyedStream.reduce { _ + _ }
+            {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Fold</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+          <p>A "rolling" fold on a keyed data stream with an initial value.
+          Combines the current element with the last folded value and
+          emits the new value.
+          <br/>
+          <br/>
+          A fold function that creates a stream of partial sums:</p>
+          {% highlight scala %}
+keyedStream.fold { 0, _ + _ }
+          {% endhighlight %}
+          </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Aggregations</strong><br>KeyedStream &rarr; DataStream</td>
+          <td>
+            <p>Rolling aggregations on a keyed data stream. The difference between min 
+	    and minBy is that min returns the minimun value, whereas minBy returns
+	    the element that has the minimum value in this field (same for max and maxBy).</p>
+    {% highlight scala %}
+keyedStream.sum(0)
+keyedStream.sum("key")
+keyedStream.min(0)
+keyedStream.min("key")
+keyedStream.max(0)
+keyedStream.max("key")
+keyedStream.minBy(0)
+keyedStream.minBy("key")
+keyedStream.maxBy(0)
+keyedStream.maxBy("key")
+    {% endhighlight %}
+          </td>
+        </tr>	
+        <tr>
+          <td><strong>Window</strong><br>KeyedStream &rarr; WindowedStream</td>
+          <td>
+            <p>Windows can be defined on already partitioned KeyedStreams. Windows group the data in each
+            key according to some characteristic (e.g., the data that arrived within the last 5 seconds).
+            See <a href="#windows">windows</a> for a description of windows.
+    {% highlight scala %}
+dataStream.keyBy(0).window(TumblingTimeWindows.of(5, TimeUnit.SECONDS)) // Last 5 seconds of data // Last 5 seconds of data
+    {% endhighlight %}
+        </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>WindowAll</strong><br>DataStream &rarr; AllWindowedDataStream</td>
+          <td>
+              <p>Windows can be defined on regular DataStreams. Windows group all the stream events
+              according to some characteristic (e.g., the data that arrived within the last 5 seconds).
+              See <a href="#windows">windows</a> for a complete description of windows.</p>
+              <p><strong>WARNING:</strong> This is in many cases a <strong>non-parallel</strong> transformation. All records will be
+               gathered in one task for the windowAll operator.</p>
+  {% highlight scala %}
+dataStream.windowAll(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) // Last 5 seconds of data
+  {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Apply</strong><br>WindowedStream &rarr; DataStream<br>AllWindowedStream &rarr; DataStream</td>
+          <td>
+            <p>Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.</p>
+            <p><strong>Note:</strong> If you are using a windowAll transformation, you need to use an AllWindowFunction instead.</p>
+    {% highlight scala %}
+windowedStream.apply { applyFunction }
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Reduce</strong><br>WindowedStream &rarr; DataStream</td>
+          <td>
+            <p>Applies a functional reduce function to the window and returns the reduced value.</p>
+    {% highlight scala %}
+windowedStream.reduce { _ + _ }
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Fold</strong><br>WindowedStream &rarr; DataStream</td>
+          <td>
+            <p>Applies a functional fold function to the window and returns the folded value.</p>
+    {% highlight java %}
+windowedStream.fold { 0, _ + _ }
+    {% endhighlight %}
+          </td>
+	</tr>
+        <tr>
+          <td><strong>Aggregations on windows</strong><br>WindowedStream &rarr; DataStream</td>
+          <td>
+            <p>Aggregates the contents of a window. The difference between min 
+	    and minBy is that min returns the minimun value, whereas minBy returns
+	    the element that has the minimum value in this field (same for max and maxBy).</p>
+    {% highlight scala %}
+windowedStream.sum(0)
+windowedStream.sum("key")
+windowedStream.min(0)
+windowedStream.min("key")
+windowedStream.max(0)
+windowedStream.max("key")
+windowedStream.minBy(0)
+windowedStream.minBy("key")
+windowedStream.maxBy(0)
+windowedStream.maxBy("key")
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Union</strong><br>DataStream* &rarr; DataStream</td>
+          <td>
+            <p>Union of two or more data streams creating a new stream containing all the elements from all the streams. Node: If you union a data stream
+            with itself you will still only get each element once.</p>
+    {% highlight scala %}
+dataStream.union(otherStream1, otherStream2, ...)
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Join</strong><br>DataStream,DataStream &rarr; DataStream</td>
+          <td>
+            <p>Join two data streams on a given key and a common window.</p>
+    {% highlight scala %}
+dataStream.join(otherStream)
+    .where(0).equalTo(1)
+    .onTimeWindow(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
+    .apply { ... }
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window CoGroup</strong><br>DataStream,DataStream &rarr; DataStream</td>
+          <td>
+            <p>Cogroups two data streams on a given key and a common window.</p>
+    {% highlight scala %}
+dataStream.coGroup(otherStream)
+    .where(0).equalTo(1)
+    .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
+    .apply {}
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Connect</strong><br>DataStream,DataStream &rarr; ConnectedStreams</td>
+          <td>
+            <p>"Connects" two data streams retaining their types, allowing for shared state between
+            the two streams.</p>
+    {% highlight scala %}
+someStream : DataStream[Int] = ...
+otherStream : DataStream[String] = ...
+
+val connectedStreams = someStream.connect(otherStream)
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>CoMap, CoFlatMap</strong><br>ConnectedStreams &rarr; DataStream</td>
+          <td>
+            <p>Similar to map and flatMap on a connected data stream</p>
+    {% highlight scala %}
+connectedStreams.map(
+    (_ : Int) => true,
+    (_ : String) => false
+)
+connectedStreams.flatMap(
+    (_ : Int) => true,
+    (_ : String) => false
+)
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Split</strong><br>DataStream &rarr; SplitStream</td>
+          <td>
+            <p>
+                Split the stream into two or more streams according to some criterion.
+                {% highlight scala %}
+val split = someDataStream.split(
+  (num: Int) =>
+    (num % 2) match {
+      case 0 => List("even")
+      case 1 => List("odd")
+    }
+)
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Select</strong><br>SplitStream &rarr; DataStream</td>
+          <td>
+            <p>
+                Select one or more streams from a split stream.
+                {% highlight scala %}
+
+val even = split select "even"
+val odd = split select "odd"
+val all = split.select("even","odd")
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Iterate</strong><br>DataStream &rarr; IterativeStream  &rarr; DataStream</td>
+          <td>
+            <p>
+                Creates a "feedback" loop in the flow, by redirecting the output of one operator
+                to some previous operator. This is especially useful for defining algorithms that
+                continuously update a model. The following code starts with a stream and applies
+		the iteration body continuously. Elements that are greater than 0 are sent back
+		to the feedback channel, and the rest of the elements are forwarded downstream.
+		See <a href="#iterations">iterations</a> for a complete description.
+                {% highlight java %}
+initialStream. iterate {
+  iteration => {
+    val iterationBody = iteration.map {/*do something*/}
+    (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
+  }
+}
+IterativeStream<Long> iteration = initialStream.iterate();
+DataStream<Long> iterationBody = iteration.map (/*do something*/);
+DataStream<Long> feedback = iterationBody.filter ( _ > 0);
+iteration.closeWith(feedback);
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Extract Timestamps</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>
+                Extracts timestamps from records in order to work with windows
+                that use event time semantics.
+                See <a href="#working-with-time">working with time</a>.
+                {% highlight scala %}
+stream.assignTimestamps { timestampExtractor }
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+  </tbody>
+</table>
 
-Transformations, also called operators, represent the users' business logic on the data stream. Operators consume data streams and produce new data streams. The user can chain and combine multiple operators on the data stream to produce the desired processing steps. Most of the operators work very similar to the batch Flink API allowing developers to reason about `DataStream` the same way as they would about `DataSet`. At the same time there are operators that exploit the streaming nature of the data to allow advanced functionality.
+</div>
+</div>
 
-### Basic transformations
+The following transformations are available on data streams of Tuples:
 
-Basic transformations can be seen as functions that operate on records of the data stream.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -435,116 +1144,137 @@ Basic transformations can be seen as functions that operate on records of the da
       <th class="text-center">Description</th>
     </tr>
   </thead>
-
   <tbody>
-    <tr>
-      <td><strong>Map</strong></td>
+   <tr>
+      <td><strong>Project</strong><br>DataStream &rarr; DataStream</td>
       <td>
-        <p>Takes one element and produces one element. A map that doubles the values of the input stream:</p>
+        <p>Selects a subset of fields from the tuples
 {% highlight java %}
-dataStream.map(new MapFunction<Integer, Integer>() {
-            @Override
-            public Integer map(Integer value) throws Exception {
-                return 2 * value;
-            }
-        });
+DataStream<Tuple3<Integer, Double, String>> in = // [...]
+DataStream<Tuple2<String, Integer>> out = in.project(2,0);
 {% endhighlight %}
+        </p>
       </td>
     </tr>
+  </tbody>
+</table>
+
+</div>
+
+<div data-lang="scala" markdown="1">
+
+<br />
 
+<table class="table table-bordered">
+  <thead>
     <tr>
-      <td><strong>FlatMap</strong></td>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td><strong>Project</strong><br>DataStream &rarr; DataStream</td>
       <td>
-        <p>Takes one element and produces zero, one, or more elements. A flatmap that splits sentences to words:</p>
-{% highlight java %}
-dataStream.flatMap(new FlatMapFunction<String, String>() {
-            @Override
-            public void flatMap(String value, Collector<String> out) 
-                throws Exception {
-                for(String word: value.split(" ")){
-                    out.collect(word);
-                }
-            }
-        });
+        <p>Selects a subset of fields from the tuples
+{% highlight scala %}
+val in : DataStream[(Int,Double,String)] = // [...]
+val out = in.project(2,0)
 {% endhighlight %}
+        </p>
       </td>
     </tr>
+  </tbody>
+</table>
+
+</div>
+</div>
+
+
+### Physical partitioning
+
+Flink also gives low-level control (if desired) on the exact stream partitioning after a transformation,
+via the following functions.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
 
+<table class="table table-bordered">
+  <thead>
     <tr>
-      <td><strong>Filter</strong></td>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td><strong>Hash partitioning</strong><br>DataStream &rarr; DataStream</td>
       <td>
-        <p>Evaluates a boolean function for each element and retains those for which the function returns true.
-	<br/>
-	<br/>
-        A filter that filters out zero values:
+        <p>
+            Identical to keyBy but returns a DataStream instead of a KeyedStream.
+            {% highlight java %}
+dataStream.partitionByHash("someKey");
+dataStream.partitionByHash(0);
+            {% endhighlight %}
         </p>
-{% highlight java %}
-dataStream.filter(new FilterFunction<Integer>() { 
-            @Override
-            public boolean filter(Integer value) throws Exception {
-                return value != 0;
-            }
-        });
-{% endhighlight %}
       </td>
     </tr>
-
-    <tr>
-      <td><strong>Reduce</strong></td>
+   <tr>
+      <td><strong>Custom partitioning</strong><br>DataStream &rarr; DataStream</td>
       <td>
-        <p>Combines a stream of elements into another stream by repeatedly combining two elements
-        into one and emits the current state after every reduction. Reduce may only be applied on a windowed or grouped data stream.
-        <br/>
-        
-        <strong>IMPORTANT:</strong> The streaming and the batch reduce functions have different semantics. A streaming reduce on a data stream emits the current reduced value for every new element on a data stream. On a windowed data stream it works as a batch reduce: it produces at most one value per window.
-        <br/>
-	<br/>
-         A reducer that sums up the incoming stream, the result is a stream of intermediate sums:</p>
-{% highlight java %}
-dataStream.reduce(new ReduceFunction<Integer>() {
-            @Override
-            public Integer reduce(Integer value1, Integer value2) 
-            throws Exception {
-                return value1 + value2;
-            }
-        });
-{% endhighlight %}
+        <p>
+            Uses a user-defined Partitioner to select the target task for each element.
+            {% highlight java %}
+dataStream.partitionCustom(new Partitioner(){...}, "someKey");
+dataStream.partitionCustom(new Partitioner(){...}, 0);
+            {% endhighlight %}
+        </p>
       </td>
     </tr>
-
-    <tr>
-      <td><strong>Fold</strong></td>
+   <tr>
+     <td><strong>Random partitioning</strong><br>DataStream &rarr; DataStream</td>
+     <td>
+       <p>
+            Partitions elements randomly according to a uniform distribution.
+            {% highlight java %}
+dataStream.partitionRandom();
+            {% endhighlight %}
+       </p>
+     </td>
+   </tr>
+   <tr>
+      <td><strong>Rebalancing (Round-robin partitioning)</strong><br>DataStream &rarr; DataStream</td>
       <td>
-        <p>Combines a stream element by element with an initial aggregator value. Fold may only be applied on a windowed or grouped data stream.
-        <br/>
-         A folder that appends strings one by one to the empty sting:</p>
-{% highlight java %}
-dataStream.fold("", new FoldFunction<String, String>() {
-            @Override
-            public String fold(String accumulator, String value) throws Exception {
-                return accumulator + value;
-            }
-       });
-{% endhighlight %}
+        <p>
+            Partitions elements round-robin, creating equal load per partition. Useful for performance
+            optimization in the presence of data skew.
+            {% highlight java %}
+dataStream.rebalance();
+            {% endhighlight %}
+        </p>
       </td>
     </tr>
-
-    <tr>
-      <td><strong>Union</strong></td>
+   <tr>
+      <td><strong>Broadcasting</strong><br>DataStream &rarr; DataStream</td>
       <td>
-        <p>Union of two or more data streams creating a new stream containing all the elements from all the streams. Node: If you union a data stream
-        with itself you will still only get each element once.</p>
-{% highlight java %}
-dataStream.union(otherStream1, otherStream2, …)
-{% endhighlight %}
+        <p>
+            Broadcasts elements to every partition.
+            {% highlight java %}
+dataStream.broadcast();
+            {% endhighlight %}
+        </p>
       </td>
     </tr>
   </tbody>
 </table>
 
-----------
+</div>
 
-The following transformations are available on data streams of Tuples:
+<div data-lang="scala" markdown="1">
+
+<br />
 
 <table class="table table-bordered">
   <thead>
@@ -555,25 +1285,89 @@ The following transformations are available on data streams of Tuples:
   </thead>
   <tbody>
    <tr>
-      <td><strong>Project</strong></td>
+      <td><strong>Hash partitioning</strong><br>DataStream &rarr; DataStream</td>
       <td>
-        <p>Selects a subset of fields from the tuples</p>
-{% highlight java %}
-DataStream<Tuple3<Integer, Double, String>> in = // [...]
-DataStream<Tuple2<String, Integer>> out = in.project(2,0);
-{% endhighlight %}
+        <p>
+            Identical to keyBy but returns a DataStream instead of a KeyedStream.
+            {% highlight scala %}
+dataStream.partitionByHash("someKey")
+dataStream.partitionByHash(0)
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+      <td><strong>Custom partitioning</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Uses a user-defined Partitioner to select the target task for each element.
+            {% highlight scala %}
+dataStream.partitionCustom(partitioner, "someKey")
+dataStream.partitionCustom(partitioner, 0)
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+     <td><strong>Random partitioning</strong><br>DataStream &rarr; DataStream</td>
+     <td>
+       <p>
+            Partitions elements randomly according to a uniform distribution.
+            {% highlight scala %}
+dataStream.partitionRandom()
+            {% endhighlight %}
+       </p>
+     </td>
+   </tr>
+   <tr>
+      <td><strong>Rebalancing (Round-robin partitioning)</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Partitions elements round-robin, creating equal load per partition. Useful for performance
+            optimization in the presence of data skew.
+            {% highlight scala %}
+dataStream.rebalance()
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+      <td><strong>Broadcasting</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Broadcasts elements to every partition.
+            {% highlight scala %}
+dataStream.broadcast()
+            {% endhighlight %}
+        </p>
       </td>
     </tr>
   </tbody>
 </table>
 
 </div>
+</div>
 
-<div data-lang="scala" markdown="1">
+### Task chaining and resource groups
 
-<br />
+Chaining two subsequent transformations means col-locating them within the same thread for better
+performance. Flink by default chains operators if this is possible (e.g., two subsequent map
+transformations). The API gives fine-grained control over chaining if desired:
+
+Use `StreamExecutionEnvironment.disableOperatorChaining()` if you want to disable chaining in
+the whole job. For more fine grained control, the following functions are available. Note that
+these functions can only be used right after a DataStream transformation as they refer to the
+previous transformation. For example, you can use `someStream.map(...).startNewChain()`, but
+you cannot use `someStream.startNewChain()`.
+
+A resource group is a slot in Flink, see
+[slots](config#configuring-taskmanager-processing-slots). You can
+manually isolate operators in separate slots if desired.
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 
+<br />
 
 <table class="table table-bordered">
   <thead>
@@ -582,742 +1376,1772 @@ DataStream<Tuple2<String, Integer>> out = in.project(2,0);
       <th class="text-center">Description</th>
     </tr>
   </thead>
-
   <tbody>
-
-    <tr>
-      <td><strong>Map</strong></td>
+   <tr>
+      <td>Start new chain</td>
       <td>
-        <p>Takes one element and produces one element. A map that doubles the values of the input stream:</p>
-{% highlight scala %}
-dataStream.map{ x => x * 2 }
+        <p>Begin a new chain, starting with this operator. The two
+	mappers will be chained, and filter will not be chained to
+	the first mapper.
+{% highlight java %}
+someStream.filter(...).map(...).startNewChain().map(...);
 {% endhighlight %}
+        </p>
       </td>
     </tr>
-
-    <tr>
-      <td><strong>FlatMap</strong></td>
+   <tr>
+      <td>Disable chaining</td>
       <td>
-        <p>Takes one element and produces zero, one, or more elements. A flatmap that splits sentences to words:</p>
-{% highlight scala %}
-data.flatMap { str => str.split(" ") }
+        <p>Do not chain the map operator
+{% highlight java %}
+someStream.map(...).disableChaining();
 {% endhighlight %}
-      </td>
-    </tr>
-
-    <tr>
-      <td><strong>Filter</strong></td>
-      <td>
-        <p>Evaluates a boolean function for each element and retains those for which the function returns true.
-       	<br/>
-	<br/>
-        A filter that filters out zero values:
         </p>
-{% highlight scala %}
-dataStream.filter{ _ != 0 }
-{% endhighlight %}
       </td>
-    </tr>
-
-    <tr>
-      <td><strong>MapWithState</strong></td>
+    </tr>    
+   <tr>
+      <td>Start a new resource group</td>
       <td>
-        <p>Takes one element and produces one element using a stateful function. Note that the user state object needs to be serializable.
-	<br/>
-	<br/>
-	A map that produces a rolling average per key:</p>
-{% highlight scala %}
-dataStream.groupBy(..).mapWithState((in, state: Option[(Long, Int)]) => state match {
-	case Some((sum, count)) => ((sum + in)/(count + 1), Some((sum + in, count + 1)))
-	case None => (in, Some((in, 1)))
-})
+        <p>Start a new resource group containing the map and the subsequent operators.
+{% highlight java %}
+someStream.filter(...).startNewResourceGroup();
 {% endhighlight %}
+        </p>
       </td>
     </tr>
-
-    <tr>
-      <td><strong>FlatMapWithState</strong></td>
+   <tr>
+      <td>Isolate resources</td>
       <td>
-        <p>Takes one element and produces zero, one, or more elements using a stateful function. Note that the user state object needs to be serializable.</p>
-{% highlight scala %}
-dataStream.flatMapWithState((I,Option[S]) => (Traversable[O], Option[S]))
+        <p>Isolate the operator in its own slot.
+{% highlight java %}
+someStream.map(...).isolateResources();
 {% endhighlight %}
+        </p>
       </td>
-    </tr>
+    </tr>        
+  </tbody>
+</table>
+
+</div>
+
+<div data-lang="scala" markdown="1">
+
+<br />
 
+<table class="table table-bordered">
+  <thead>
     <tr>
-      <td><strong>FilterWithState</strong></td>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td>Start new chain</td>
       <td>
-       <p>Evaluates a stateful boolean function for each element and retains those for which the function returns true. Note that the user state object needs to be serializable.
-       	<br/>
-	<br/>
-        A filter that only keeps the first 10 elements at each operator instance:
-        </p>
+        <p>Begin a new chain, starting with this operator. The two
+	mappers will be chained, and filter will not be chained to
+	the first mapper.
 {% highlight scala %}
-dataStream.filterWithState((in, count: Option[Int]) => count match {
-	case Some(c) => (c < 10, Some(c+1))
-	case None => (true, Some(1))
-})
+someStream.filter(...).map(...).startNewChain().map(...)
 {% endhighlight %}
+        </p>
       </td>
     </tr>
-
-
-    <tr>
-      <td><strong>Reduce</strong></td>
+   <tr>
+      <td>Disable chaining</td>
       <td>
-        <p>Combines a stream of elements into another stream by repeatedly combining two elements
-        into one and emits the current state after every reduction. Reduce may only be applied on a windowed or grouped data stream.
-        <br/>
-        
-        <strong>IMPORTANT:</strong> The streaming and the batch reduce functions have different semantics. A streaming reduce on a data stream emits the current reduced value for every new element on a data stream. On a windowed data stream it works as a batch reduce: it produces at most one value per window.
-        <br/>
-	<br/>
-         A reducer that sums up the incoming stream, the result is a stream of intermediate sums:</p>
+        <p>Do not chain the map operator
 {% highlight scala %}
-dataStream.reduce{ _ + _}
+someStream.map(...).disableChaining()
 {% endhighlight %}
+        </p>
       </td>
-    </tr>
-
-    <tr>
-      <td><strong>Fold</strong></td>
-        <td>
-        <p>Combines a stream element by element with an initial aggregator value. Fold may only be applied windowed or grouped data stream.
-        <br/>
-         A folder that appends strings one by one to the empty sting:</p>
+    </tr>    
+   <tr>
+      <td>Start a new resource group</td>
+      <td>
+        <p>Start a new resource group containing the map and the subsequent operators.
 {% highlight scala %}
-dataStream.fold{"", _ + _ }
+someStream.filter(...).startNewResourceGroup()
 {% endhighlight %}
+        </p>
       </td>
     </tr>
-
-    <tr>
-      <td><strong>Union</strong></td>
+   <tr>
+      <td>Isolate resources</td>
       <td>
-        <p>Union of two or more data streams creating a new stream containing all the elements from all the streams.</p>
+        <p>Isolate the operator in its own slot.
 {% highlight scala %}
-dataStream.union(otherStream1, otherStream2, …)
+someStream.map(...).isolateResources()
 {% endhighlight %}
+        </p>
       </td>
-    </tr>
-
+    </tr>        
   </tbody>
-
-
 </table>
 
 </div>
-
 </div>
 
-### Grouped operators
 
-Some transformations require that the elements of a `DataStream` are grouped on some key. The user can create a `GroupedDataStream` by calling the `groupBy(key)` method of a non-grouped `DataStream`.
-Keys can be of three types: field positions (applicable for tuple/array types), field expressions (applicable for pojo types), KeySelector instances.
+[Back to top](#top)
 
-Aggregation or reduce operators called on `GroupedDataStream`s produce elements on a per group basis.
+Specifying Keys
+----------------
 
-### Aggregations
+The `keyBy` transformation requires that a key is defined on
+its argument DataStream.
 
-The Flink Streaming API supports different types of pre-defined aggregations of `GroupedDataStream`s and `WindowedDataStream`s. A common property of these operators, is that they produce the stream of intermediate aggregate values.
+A DataStream is keyed as
+{% highlight java %}
+DataStream<...> input = // [...]
+DataStream<...> windowed = input
+	.keyBy(/*define key here*/)
+	.window(/*define window here*/);
+{% endhighlight %}
 
-Types of aggregations: `sum(field)`, `min(field)`, `max(field)`, `minBy(field, first)`, `maxBy(field, first)`.
+The data model of Flink is not based on key-value pairs. Therefore,
+you do not need to physically pack the data stream types into keys and
+values. Keys are "virtual": they are defined as functions over the
+actual data to guide the grouping operator.
 
-With `sum`, `min`, and `max` for every incoming tuple the selected field is replaced with the current aggregated value. Fields can be selected using either field positions or field expressions (similarly to grouping).
+See [the relevant section of the DataSet API documentation](programming_guide.html#specifying-keys) on how to specify keys.
+Just replace `DataSet` with `DataStream`, and `groupBy` with `keyBy`.
 
-With `minBy` and `maxBy` the output of the operator is the element with the current minimal or maximal value at the given field. If more components share the minimum or maximum value, the user can decide if the operator should return the first or last element. This can be set by the `first` boolean parameter.
 
-### Window operators
 
-Flink streaming provides very flexible data-driven windowing semantics to create arbitrary windows (also referred to as discretizations or slices) of the data streams and apply reduce, map or aggregation transformations on the windows acquired. Windowing can be used for instance to create rolling aggregations of the most recent N elements, where N could be defined by Time, Count or any arbitrary user defined measure. 
+Passing Functions to Flink
+--------------------------
 
-The user can control the size (eviction) of the windows and the frequency of transformation or aggregation calls (trigger) on them in an intuitive API. We will describe the exact semantics of these operators in the [policy based windowing](#policy-based-windowing) section.
+Some transformations take user-defined functions as arguments. 
 
-Some examples:
+See [the relevant section of the DataSet API documentation](programming_guide.html#passing-functions-to-flink).
 
- * `dataStream.window(eviction).every(trigger).reduceWindow(…)`
- * `dataStream.window(…).every(…).mapWindow(…).flatten()`
- * `dataStream.window(…).every(…).groupBy(…).aggregate(…).getDiscretizedStream()`
 
-The core abstraction of the Windowing semantics is the `WindowedDataStream` and the `StreamWindow`. The `WindowedDataStream` is created when we first call the `window(…)` method of the DataStream and represents the windowed discretisation of the underlying stream. The user can think about it simply as a `DataStream<StreamWindow<T>>` where additional API functions are supplied to provide efficient transformations of individual windows. 
+[Back to top](#top)
 
-Please note at this point that the `.every(…)` call belongs together with the preceding `.window(…)` call and does not define a new transformation in itself.
 
-The result of a window transformation is again a `WindowedDataStream` which can also be used to further apply other windowed computations. In this sense, window transformations define mapping from stream windows to stream windows.
+Data Types
+----------
 
-The user has different ways of using the result of a window operation:
+Flink places some restrictions on the type of elements that are used in DataStreams and in results
+of transformations. The reason for this is that the system analyzes the types to determine
+efficient execution strategies.
 
- * `windowedDataStream.flatten()` - streams the results element wise and returns a `DataStream<T>` where T is the type of the underlying windowed stream
- * `windowedDataStream.getDiscretizedStream()` - returns a `DataStream<StreamWindow<T>>` for applying some advanced logic on the stream windows itself. Be careful here, as at this point, we need to materialise the full windows
- * Calling any window transformation further transforms the windows, while preserving the windowing logic
+See [the relevant section of the DataSet API documentation](programming_guide.html#data-types).
 
-The next example would create windows that hold elements of the last 5 seconds, and the user defined transformation would be executed on the windows every second (sliding the window by 1 second):
+[Back to top](#top)
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-dataStream.window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1, TimeUnit.SECONDS));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-dataStream.window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1, TimeUnit.SECONDS))
-{% endhighlight %}
-</div>
-</div>
 
-This approach is often referred to as policy based windowing. Different policies (count, time, etc.) can be mixed as well, for example to downsample our stream, a window that takes the latest 100 elements of the stream every minute is created as follows:
+Data Sources
+------------
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
-{% highlight java %}
-dataStream.window(Count.of(100)).every(Time.of(1, TimeUnit.MINUTES));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-dataStream.window(Count.of(100)).every(Time.of(1, TimeUnit.MINUTES))
-{% endhighlight %}
-</div>
-</div>
 
-The user can also omit the `every(…)` call which results in a tumbling window emptying the window after every transformation call.
+<br />
 
-Several predefined policies are provided in the API, including delta-based, count-based and time-based policies. These can be accessed through the static methods provided by the `PolicyHelper` classes:
+Sources can by created by using `StreamExecutionEnvironment.addSource(sourceFunction)`.
+You can either use one of the source functions that come with Flink or write a custom source
+by implementing the `SourceFunction` for non-parallel sources, or by implementing the
+`ParallelSourceFunction` interface or extending `RichParallelSourceFunction` for parallel sources.
 
- * `Time.of(…)`
- * `Count.of(…)`
- * `Delta.of(…)`
- * `FullStream.window()`
+There are several predefined stream sources accessible from the `StreamExecutionEnvironment`:
 
-For detailed description of these policies please refer to the [Javadocs](http://flink.apache.org/docs/latest/api/java/).
+File-based:
 
-#### Policy based windowing
-The policy based windowing is a highly flexible way to specify stream discretisation also called windowing semantics. Two types of policies are used for such a specification:
+- `readTextFile(path)` / `TextInputFormat` - Reads files line wise and returns them as Strings.
 
- * `TriggerPolicy` defines when to trigger the reduce or transformation UDF on the current window and emit the result. In the API it completes a window statement such as: `window(…).every(…)`, while the triggering policy is passed within `every`. In case the user wants to use tumbling eviction policy (the window is emptied after the transformation) he can omit the `.every(…)` call and pass the trigger policy directly to the `.window(…)` call.
+- `readTextFileWithValue(path)` / `TextValueInputFormat` - Reads files line wise and returns them as
+  StringValues. StringValues are mutable strings.
 
- * `EvictionPolicy` defines the length of a window as a means of a predicate for evicting tuples when they are no longer needed. In the API this can be defined by the `window(…)` operation on a stream. There are mostly the same predefined policy types provided as for trigger policies.
+- `readFile(path)` / Any input format - Reads files as dictated by the input format.
 
-Trigger and eviction policies work totally independently of each other. The eviction policy continuously maintains a window, into which it adds new elements and based on the eviction logic removes older elements in the order of arrival. The trigger policy on the other hand only decided at each new incoming element, whether it should trigger computation (and output results) on the currently maintained window.
+- `readFileOfPrimitives(path, Class)` / `PrimitiveInputFormat` - Parses files of new-line (or another char sequence) delimited primitive data types such as `String` or `Integer`.
 
-Several predefined policies are provided in the API, including delta-based, punctuation based, count-based and time-based policies. Policies are in general UDFs and can implement any custom behaviour.
+- `readFileStream` - create a stream by appending elements when there are changes to a file
 
-In addition to the `dataStream.window(…).every(…)` style, users can specifically pass the trigger and eviction policies during the window call:
+Socket-based:
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-dataStream.window(triggerPolicy, evictionPolicy);
-{% endhighlight %}
-</div>
+- `socketTextStream` - Reads from a socket. Elements can be separated by a delimiter.
 
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-dataStream.window(triggerPolicy, evictionPolicy)
-{% endhighlight %}
-</div>
+Collection-based:
 
-</div>
+- `fromCollection(Collection)` - Creates a data stream from the Java Java.util.Collection. All elements
+  in the collection must be of the same type.
 
-By default triggers can only trigger when a new element arrives. This might not be suitable for all the use-cases with low data rates. To also provide triggering between elements, so called active policies can be used (the two interfaces controlling this special behaviour is `ActiveTriggerPolicy` and `CentralActiveTrigger`). The predefined time-based policies are already implemented in such a way and can hold as an example for user defined active policy implementations.
+- `fromCollection(Iterator, Class)` - Creates a data stream from an iterator. The class specifies the
+  data type of the elements returned by the iterator.
 
-Time-based trigger and eviction policies can work with user defined `TimeStamp` implementations, these policies already cover most use cases.
- 
-#### Reduce on windowed data streams
-The `WindowedDataStream<T>.reduceWindow(ReduceFunction<T>)` transformation calls the user-defined `ReduceFunction` at every trigger on the records currently in the window. The user can also use the different pre-implemented streaming aggregations such as `sum, min, max, minBy` and `maxBy`.
+- `fromElements(T ...)` - Creates a data stream from the given sequence of objects. All objects must be
+  of the same type.
 
-The following is an example for a window reduce that sums the elements in the last minute with 10 seconds slide interval:
+- `fromParallelCollection(SplittableIterator, Class)` - Creates a data stream from an iterator, in
+  parallel. The class specifies the data type of the elements returned by the iterator.
+
+- `generateSequence(from, to)` - Generates the sequence of numbers in the given interval, in
+  parallel.
+
+Custom:
+
+- `addSource` - Attache a new source function. For example, to read from Apache Kafka you can use
+    `addSource(new FlinkKafkaConsumer082<>(...))`. See [connectors](#connectors) for more details.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-dataStream.window(Time.of(1, TimeUnit.MINUTES)).every(Time.of(10,TimeUnit.SECONDS)).sum(field);
-{% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
-{% highlight scala %}
-dataStream.window(Time.of(1, TimeUnit.MINUTES)).every(Time.of(10,TimeUnit.SECONDS)).sum(field)
-{% endhighlight %}
-</div>
 
-</div>
+<br />
 
+Sources can by created by using `StreamExecutionEnvironment.addSource(sourceFunction)`.
+You can either use one of the source functions that come with Flink or write a custom source
+by implementing the `SourceFunction` for non-parallel sources, or by implementing the
+`ParallelSourceFunction` interface or extending `RichParallelSourceFunction` for parallel sources.
 
-#### Map on windowed data streams
-The `WindowedDataStream<T>.mapWindow(WindowMapFunction<T,O>)` transformation calls  `mapWindow(…)` for each `StreamWindow` in the discretised stream, providing access to all elements in the window through the iterable interface. At each function call the output `StreamWindow<O>` will consist of all the elements collected to the collector. This allows a straightforward way of mapping one stream window to another.
+There are several predefined stream sources accessible from the `StreamExecutionEnvironment`:
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-windowedDataStream.mapWindow(windowMapFunction);
-{% endhighlight %}
-</div>
+File-based:
 
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-windowedDataStream.mapWindow(windowMapFunction)
-{% endhighlight %}
-</div>
+- `readTextFile(path)` / `TextInputFormat` - Reads files line wise and returns them as Strings.
 
-</div>
+- `readTextFileWithValue(path)` / `TextValueInputFormat` - Reads files line wise and returns them as
+  StringValues. StringValues are mutable strings.
 
-#### Grouped transformations on windowed data streams
-Calling the `groupBy(…)` method on a windowed stream groups the elements by the given fields inside the stream windows. The window sizes (evictions) and slide sizes (triggers) will be calculated on the whole stream (in a global fashion), but the user defined functions will be applied on a per group basis inside the window. This means that for a call `windowedStream.groupBy(…).reduceWindow(…)` will transform each window into another window consisting of as many elements as keys in the original window, with the reduced values per key. Similarly the `mapWindow` transformation is applied pe

<TRUNCATED>