You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/11 21:00:40 UTC

[1/3] flink git commit: [streaming] ITCase for WindowWordCount

Repository: flink
Updated Branches:
  refs/heads/master 36fcdae58 -> 46573a6ae


[streaming] ITCase for WindowWordCount


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/71b2d664
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/71b2d664
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/71b2d664

Branch: refs/heads/master
Commit: 71b2d664e0f98d9f5e9af8c84d2e749d4cece273
Parents: 36fcdae
Author: mbalassi <mb...@apache.org>
Authored: Thu Apr 9 12:35:52 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Apr 10 18:14:18 2015 +0200

----------------------------------------------------------------------
 .../examples/windowing/WindowWordCount.java     | 20 ++++++--
 .../examples/test/join/WindowJoinITCase.java    |  1 -
 .../test/windowing/WindowWordCountITCase.java   | 50 ++++++++++++++++++++
 3 files changed, 65 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/71b2d664/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
index cef760f..bd3acc6 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
@@ -46,6 +46,10 @@ import org.apache.flink.streaming.examples.wordcount.WordCount;
  */
 public class WindowWordCount {
 
+	// window parameters with default values
+	private static int windowSize = 250;
+	private static int slideSize = 150;
+
 	// *************************************************************************
 	// PROGRAM
 	// *************************************************************************
@@ -65,8 +69,8 @@ public class WindowWordCount {
 		DataStream<Tuple2<String, Integer>> counts =
 		// split up the lines in pairs (2-tuples) containing: (word,1)
 		text.flatMap(new WordCount.Tokenizer())
-				// create windows of 250 records slided every 150 records
-				.window(Count.of(250)).every(Count.of(150))
+				// create windows of windowSize records slided every slideSize records
+				.window(Count.of(windowSize)).every(Count.of(slideSize))
 				// group by the tuple field "0" and sum up tuple field "1"
 				.groupBy(0).sum(1)
 				// flatten the windows to a single stream
@@ -97,17 +101,23 @@ public class WindowWordCount {
 		if (args.length > 0) {
 			// parse input arguments
 			fileOutput = true;
-			if (args.length == 2) {
+			if (args.length >= 2 && args.length <= 4) {
 				textPath = args[0];
 				outputPath = args[1];
+				if (args.length >= 3){
+					windowSize = Integer.parseInt(args[2]);
+
+					// if no slide size is specified use the
+					slideSize = args.length == 3 ? windowSize : Integer.parseInt(args[2]);
+				}
 			} else {
-				System.err.println("Usage: WindowWordCount <text path> <result path>");
+				System.err.println("Usage: WindowWordCount <text path> <result path> [<window size>] [<slide size>]");
 				return false;
 			}
 		} else {
 			System.out.println("Executing WindowWordCount example with built-in default data.");
 			System.out.println("  Provide parameters to read input data from a file.");
-			System.out.println("  Usage: WindowWordCount <text path> <result path>");
+			System.out.println("  Usage: WindowWordCount <text path> <result path> [<window size>] [<slide size>]");
 		}
 		return true;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/71b2d664/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java
index 0c1fb39..a1bef5c 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java
@@ -30,7 +30,6 @@ public class WindowJoinITCase extends StreamingProgramTestBase {
 
 	@Override
 	protected void preSubmit() throws Exception {
-		setParallelism(1);
 		gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT);
 		salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT);
 		resultPath = getTempDirPath("result");

http://git-wip-us.apache.org/repos/asf/flink/blob/71b2d664/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/WindowWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/WindowWordCountITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/WindowWordCountITCase.java
new file mode 100644
index 0000000..6fdd4ef
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/WindowWordCountITCase.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.test.windowing;
+
+import org.apache.flink.streaming.examples.windowing.WindowWordCount;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class WindowWordCountITCase extends StreamingProgramTestBase {
+
+	protected String textPath;
+	protected String resultPath;
+	protected String windowSize = "250";
+	protected String slideSize = "150";
+
+	@Override
+	protected void preSubmit() throws Exception {
+		textPath = createTempFile("text.txt", WordCountData.TEXT);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		// since the parallel tokenizers might have different speed
+		// the exact output can not be checked just whether it is well-formed
+		// checks that the result lines look like e.g. (faust, 2)
+		checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d)+\\)");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		WindowWordCount.main(new String[]{textPath, resultPath, windowSize, slideSize});
+	}
+}


[3/3] flink git commit: [streaming] Removed unnecessary BatchIterator class

Posted by gy...@apache.org.
[streaming] Removed unnecessary BatchIterator class


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/46573a6a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/46573a6a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/46573a6a

Branch: refs/heads/master
Commit: 46573a6ae31d7fc7e794462f6428b4f8917cc4cb
Parents: e01894c
Author: Gyula Fora <gy...@apache.org>
Authored: Fri Apr 10 18:18:58 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Apr 10 18:18:58 2015 +0200

----------------------------------------------------------------------
 .../api/invokable/operator/BatchIterator.java   | 25 --------------------
 1 file changed, 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/46573a6a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
deleted file mode 100644
index c3475e9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-public interface BatchIterator<IN> extends Iterator<IN>, Serializable {
-	public void reset();
-}


[2/3] flink git commit: [FLINK-1838] [doc] Streaming programming guide update

Posted by gy...@apache.org.
[FLINK-1838] [doc] Streaming programming guide update


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e01894c4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e01894c4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e01894c4

Branch: refs/heads/master
Commit: e01894c442f9f10d0fa3c26286cfa3d0ca4cfa57
Parents: 71b2d66
Author: Gyula Fora <gy...@apache.org>
Authored: Tue Apr 7 18:53:56 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Apr 10 18:14:56 2015 +0200

----------------------------------------------------------------------
 docs/streaming_guide.md | 191 +++++++++++++++++++++++++------------------
 1 file changed, 112 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e01894c4/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index 2b7f360..3a9a1c4 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -30,7 +30,7 @@ Introduction
 ------------
 
 
-Flink Streaming is an extension of the batch Flink API for high-throughput, low-latency data stream processing. The system can connect to and process data streams from many data sources like Apache Kafka, RabbitMQ, Apache Flume, Twitter and also from any user defined data source. Data streams can be transformed and modified using high-level functions similar to the ones provided by the batch processing API. Flink Streaming provides native support for iterative stream processing. The processed data can be pushed to different output types.
+Flink Streaming is a system for high-throughput, low-latency data stream processing. The system can connect to and process data streams from different data sources like file sources, web sockets, message queues(Apache Kafka, RabbitMQ, Apache Flume, Twitter…) and also from any user defined data source using a very simple interface. Data streams can be transformed and modified to create new data streams using high-level functions similar to the ones provided by the batch processing API. Flink Streaming natively supports flexible, data-driven windowing semantics and iterative stream processing. The processed data can be pushed to different output types.
 
 Flink Streaming API
 -----------
@@ -70,13 +70,13 @@ Add the following dependency to your `pom.xml` to use the Flink Streaming.
 </div>
 </div>
 
-Create a data stream flow with our Java or Scala API as described below. In order to create your own Flink Streaming program we encourage you to start with the [skeleton](#program-skeleton) and gradually add your own [transformations](#transformations). The remaining sections act as references for additional transformations and advanced features.
+In order to create your own Flink Streaming program we encourage you to start with the [skeleton](#program-skeleton) and gradually add your own [transformations](#transformations). The remaining sections act as references for additional transformations and advanced features.
 
 
 Example Program
 ---------------
 
-The following program is a complete, working example of streaming WordCount. You can copy &amp; paste the code to run it locally.
+The following program is a complete, working example of streaming WordCount that incrementally counts the words coming from a web socket. You can copy &amp; paste the code to run it locally.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -86,7 +86,7 @@ public class StreamingWordCount {
 
     public static void main(String[] args) {
 
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         
         DataStream<Tuple2<String, Integer>> dataStream = env
                 .socketTextStream("localhost", 9999)
@@ -143,7 +143,7 @@ To run the example program start the input stream with netcat first from a termi
 nc -lk 9999
 ~~~
 
-The lines typed to this terminal are submitted as a source for your streaming job.
+The lines typed to this terminal will be the source data stream for your streaming job.
 
 [Back to top](#top)
 
@@ -155,39 +155,47 @@ Program Skeleton
 
 As presented in the [example](#example-program) a Flink Streaming program looks almost identical to a regular Flink program. Each stream processing program consists of the following parts:
 
-1. Creating a `StreamExecutionEnvironment`,
+1. Obtaining a `StreamExecutionEnvironment`,
 2. Connecting to data stream sources,
 3. Specifying transformations on the data streams,
 4. Specifying output for the processed data,
 5. Executing the program.
 
 As these steps are basically the same as in the batch API we will only note the important differences.
-For stream processing jobs, the user needs to obtain a `StreamExecutionEnvironment` in contrast with the [batch API](programming_guide.html) where one would need an `ExecutionEnvironment`. The process otherwise is essentially the same:
+For stream processing jobs, the user needs to obtain a `StreamExecutionEnvironment` in contrast with the [batch API](programming_guide.html#program-skeleton) where one would need an `ExecutionEnvironment`. The process otherwise is essentially the same:
 
 {% highlight java %}
 StreamExecutionEnvironment.getExecutionEnvironment();
 StreamExecutionEnvironment.createLocalEnvironment(parallelism);
-StreamExecutionEnvironment.createRemoteEnvironment(…);
+StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles);
 {% endhighlight %}
 
 For connecting to data streams the `StreamExecutionEnvironment` has many different methods, from basic file sources to completely general user defined data sources. We will go into details in the [basics](#basics) section.
 
+For example:
+
 {% highlight java %}
 env.socketTextStream(host, port);
 env.fromElements(elements…);
+env.addSource(sourceFunction)
 {% endhighlight %}
 
 After defining the data stream sources the user can specify transformations on the data streams to create a new data stream. Different data streams can be also combined together for joint transformations which are being showcased in the [transformations](#transformations) section.
 
+For example:
+
 {% highlight java %}
-dataStream.map(new Mapper()).reduce(new Reducer());
+dataStream.map(mapFunction).reduce(reduceFunction);
 {% endhighlight %}
 
 The processed data can be pushed to different outputs called sinks. The user can define their own sinks or use any predefined filesystem, message queue or database sink.
 
+For example:
+
 {% highlight java %}
 dataStream.writeAsCsv(path);
 dataStream.print();
+dataStream.addSink(sinkFunction)
 {% endhighlight %}
 
 Once the complete program is specified `execute(programName)` is to be called on the `StreamExecutionEnvironment`. This will either execute on the local machine or submit the program for execution on a cluster, depending on the chosen execution environment.
@@ -202,39 +210,47 @@ env.execute(programName);
 
 As presented in the [example](#example-program) a Flink Streaming program looks almost identical to a regular Flink program. Each stream processing program consists of the following parts:
 
-1. Creating a `StreamExecutionEnvironment`,
+1. Obtaining a `StreamExecutionEnvironment`,
 2. Connecting to data stream sources,
 3. Specifying transformations on the data streams,
 4. Specifying output for the processed data,
 5. Executing the program.
 
 As these steps are basically the same as in the batch API we will only note the important differences.
-For stream processing jobs, the user needs to obtain a `StreamExecutionEnvironment` in contrast with the batch API where one would need an `ExecutionEnvironment`. The process otherwise is essentially the same:
+For stream processing jobs, the user needs to obtain a `StreamExecutionEnvironment` in contrast with the [batch API](programming_guide.html#program-skeleton) where one would need an `ExecutionEnvironment`. The process otherwise is essentially the same:
 
 {% highlight scala %}
 StreamExecutionEnvironment.getExecutionEnvironment
 StreamExecutionEnvironment.createLocalEnvironment(parallelism)
-StreamExecutionEnvironment.createRemoteEnvironment(…)
+StreamExecutionEnvironment.createRemoteEnvironment(host: String, port: String, parallelism: Int, jarFiles: String*)
 {% endhighlight %}
 
 For connecting to data streams the `StreamExecutionEnvironment` has many different methods, from basic file sources to completely general user defined data sources. We will go into details in the [basics](#basics) section.
 
+For example:
+
 {% highlight scala %}
 env.socketTextStream(host, port)
 env.fromElements(elements…)
+env.addSource(sourceFunction)
 {% endhighlight %}
 
 After defining the data stream sources the user can specify transformations on the data streams to create a new data stream. Different data streams can be also combined together for joint transformations which are being showcased in the [transformations](#transformations) section.
 
+For example:
+
 {% highlight scala %}
-dataStream.map(new Mapper).reduce(new Reducer)
+dataStream.map(mapFunction).reduce(reduceFunction)
 {% endhighlight %}
 
 The processed data can be pushed to different outputs called sinks. The user can define their own sinks or use any predefined filesystem, message queue or database sink.
 
+For example:
+
 {% highlight scala %}
 dataStream.writeAsCsv(path)
 dataStream.print
+dataStream.addSink(sinkFunction)
 {% endhighlight %}
 
 Once the complete program is specified `execute(programName)` is to be called on the `StreamExecutionEnvironment`. This will either execute on the local machine or submit the program for execution on a cluster, depending on the chosen execution environment.
@@ -254,64 +270,75 @@ Basics
 
 ### DataStream
 
-The `DataStream` is the basic data abstraction provided by the Flink Streaming API. It represents a continuous stream of data of a certain type from either a data source or a transformed data stream. You can apply transformations on either individual data points or windows of the `DataStream`. For example the map operator transforms each data point individually while window transformations work on intervals of data points at the same time.
+The `DataStream` is the basic data abstraction provided by the Flink Streaming. It represents a continuous, parallel, immutable stream of data of a certain type. By applying transformations the user can create new data streams or output the results of the computations. For instance the map transformation creates a new `DataStream` by applying a user defined function on each element of a given `DataStream`
 
-The transformations may return different `DataStream` types allowing more elaborate transformations, for example the `groupBy(…)` method returns a `GroupedDataStream` which can be used for grouped transformations such as aggregating by key.
+The transformations may return different data stream types allowing more elaborate transformations, for example the `groupBy(…)` method returns a `GroupedDataStream` which can be used for grouped transformations such as aggregating by key. We will discover more elaborate data stream types in the upcoming sections.
 
 ### Partitioning
 
-Partitioning controls how individual data points are distributed among the parallel instances of the transformation operators. By default *Forward* partitioning is used. There are several partitioning types supported in Flink Streaming:
+Partitioning controls how individual data points of a stream are distributed among the parallel instances of the transformation operators. This also controls the ordering of the records in the `DataStream`. There is partial ordering guarantee for the outputs with respect to the partitioning scheme (outputs produced from each partition are guaranteed to arrive in the order they were produced).
+
+There are several partitioning types supported in Flink Streaming:
 
- * *Forward*: Forward partitioning directs the output data to the next operator on the same machine (if possible) avoiding expensive network I/O. If there are more processing nodes than inputs or vice verse the load is distributed among the extra nodes in a round-robin fashion. This is the default partitioner.
+ * *Forward(default)*: Forward partitioning directs the output data to the next operator on the same machine (if possible) avoiding expensive network I/O. If there are more processing nodes than inputs or vice verse the load is distributed among the extra nodes in a round-robin fashion. This is the default partitioner.
 Usage: `dataStream.forward()`
  * *Shuffle*: Shuffle partitioning randomly partitions the output data stream to the next operator using uniform distribution. Use this only when it is important that the partitioning is randomised. If you only care about an even load use *Distribute*
 Usage: `dataStream.shuffle()`
  * *Distribute*: Distribute partitioning directs the output data stream to the next operator in a round-robin fashion, achieving a balanced distribution.
 Usage: `dataStream.distribute()`
- * *Field/Key*: Field/Key partitioning partitions the output data stream based on the hash code of a selected key of the tuples. Data points with the same key are directed to the same operator instance. This partition is applied when using the `groupBy` operator.
+ * *Field/Key*: Field/Key partitioning partitions the output data stream based on the hash code of a selected key of the tuples. Data points with the same key are directed to the same operator instance. 
+Usage: `dataStream.groupBy(fields…)`
  * *Broadcast*: Broadcast partitioning sends the output data stream to all parallel instances of the next operator.
 Usage: `dataStream.broadcast()`
- * *Global*: All data points end up at the same operator instance. To achieve this use the parallelism setting of the corresponding operator.
-Usage: `operator.setParallelism(1)`
+ * *Global*: All data points are directed to the first instance of the operator. 
+Usage: `dataStream.global()`
+
+By default *Forward* partitioning is used. 
+
+Partitioning does not remain in effect after a transformation, so it needs to be set again for subsequent operations.
 
 ### Connecting to the outside world
 
-The user is expected to connect to the outside world through the source and the sink interfaces. We provide a `cancel()` method where allocated resources can be freed up in case some other parts of the topology failed. The `cancel()` method is called upon termination.
+The user is expected to connect to the outside world through the source and the sink interfaces. 
 
 #### Sources
 
-The user can connect to data streams by the different implementations of `SourceFunction` using `StreamExecutionEnvironment.addSource(sourceFunction)`. In contrast with other operators, DataStreamSources have a default operator parallelism of 1.
+The user can create(connect to) data streams by the different implementations of `SourceFunction` interface using `StreamExecutionEnvironment.addSource(sourceFunction)`. By default, sources run with parallelism of 1.
 
 To create parallel sources the users source function needs to implement `ParallelSourceFunction` or extend `RichParallelSourceFunction` in which cases the source will have the parallelism of the environment. The parallelism for ParallelSourceFunctions can be changed afterwards using `source.setParallelism(parallelism)`.
 
-There are several predefined ones similar to the ones of the batch API and some streaming specific ones like:
+The `SourceFunction` interface contains only two methods: `run(Collector<T> out)` and `cancel()`.
+The `run` method will be called only once when the program starts, and should encapsulate the logic for generating the `DataStream`. Any object collected to the collector will be part of the `DataStream`. The `cancel` method will be called whenever the topology has failed for some reason.
 
- * `socketTextStream(hostname, port)`
- * `readTextStream(filepath)`
- * `generateSequence(from, to)`
- * `fromElements(elements…)`
- * `fromCollection(collection)`
- * `readTextFile(filepath)`
+In addition to the static data sources (with similar method signatures as the [batch API](programming_guide.html#data-sources)) there are several predefined stream sources accessible from the `StreamExecutionEnvironment`:
 
-These can be used to easily test and debug streaming programs.
-There are pre-implemented connectors for a number of the most popular message queue services, please refer to the section on [connectors](#stream-connectors) for more detail.
+* *Socket text stream*: Creates a new `DataStream` that contains the strings received infinitely from the given socket. Strings are decoded by the system's default character set. The user can optionally set the delimiters or the number of connection retries in case of errors.
+Usage: `env.socketTextStream(hostname, port,…)`
+* *Text file stream*: Creates a new `DataStream` that contains the lines of the files created (or modified) in a given directory. The system continuously monitors the given path, and processes any new files or modifications based on the settings. The file will be read with the system's default character set.
+Usage: `env.readFileStream(String path, long checkFrequencyMillis, WatchType watchType)`
+* *Message queue connectors*: There are pre-implemented connectors for a number of the most popular message queue services, please refer to the section on [connectors](#stream-connectors) for more detail.
+* *Custom source*: Creates a new `DataStream` by using a user defined `SourceFunction` implementation.
+Usage: `env.addSource(sourceFunction)`
 
 #### Sinks
 
-`DataStreamSink` represents the different outputs of a Flink Streaming program. There are several pre-defined implementations available right away:
+`DataStreamSink` represents the different outputs of Flink Streaming programs. The user can either define his own `SinkFunction` implementation or chose one of the available implementations (methods of `DataStream`).
+
+For example:
 
- * `dataStream.print()` – Writes the DataStream to the standard output, practical for testing purposes
- * `dataStream.writeAsText(parameters)` – Writes the DataStream to a text file
- * `dataStream.writeAsCsv(parameters)` – Writes the DataStream to CSV format
+ * `dataStream.print()` – Writes the `DataStream` to the standard output, practical for testing purposes
+ * `dataStream.writeAsText(parameters)` – Writes the `DataStream` to a text file
+ * `dataStream.writeAsCsv(parameters)` – Writes the `DataStream` to CSV format
+ * `dataStream.addSink(sinkFunction)` – Custom sink implementation
 
-The user can also implement arbitrary sink functionality by implementing the `SinkFunction` interface and using it with `dataStream.addSink(sinkFunction)`.
+There are pre-implemented connectors for a number of the most popular message queue services, please refer to the section on [connectors](#stream-connectors) for more detail.
 
 [Back to top](#top)
 
 Transformations
 ----------------
 
-Transformations represent the users' business logic on the data stream. The user can chain and combine multiple operators on the data stream to produce the desired processing steps. Most of the operators work very similar to the batch Flink API allowing developers to reason about `DataStream` the same way as they would about `DataSet`. At the same time there are operators that exploit the streaming nature of the data to allow advanced functionality.
+Transformations, also called operators, represent the users' business logic on the data stream. Operators consume data streams and produce new data streams. The user can chain and combine multiple operators on the data stream to produce the desired processing steps. Most of the operators work very similar to the batch Flink API allowing developers to reason about `DataStream` the same way as they would about `DataSet`. At the same time there are operators that exploit the streaming nature of the data to allow advanced functionality.
 
 ### Basic transformations
 
@@ -368,12 +395,8 @@ dataStream.flatMap(new FlatMapFunction<String, String>() {
       <td><strong>Filter</strong></td>
       <td>
         <p>Evaluates a boolean function for each element and retains those for which the function returns true.
-        <br/>
-        
-        <strong>IMPORTANT:</strong> The system assumes that the function does not modify the elements on which the predicate is applied. Violating this assumption
-        can lead to incorrect results.
-
-        <br/>
+	<br/>
+	<br/>
         A filter that filters out zero values:
         </p>
 {% highlight java %}
@@ -390,14 +413,14 @@ dataStream.filter(new FilterFunction<Integer>() {
     <tr>
       <td><strong>Reduce</strong></td>
       <td>
-        <p>Combines a group of elements into a single element by repeatedly combining two elements
+        <p>Combines a stream of elements into another stream by repeatedly combining two elements
         into one and emits the current state after every reduction. Reduce may be applied on a full, windowed or grouped data stream.
         <br/>
         
-        <strong>IMPORTANT:</strong> The streaming and the batch reduce functions have different semantics. A streaming reduce on a full or grouped data stream emits the current reduced value for every new element on a data stream. On a windowed data stream it works as a batch reduce: it produces at most one value.
+        <strong>IMPORTANT:</strong> The streaming and the batch reduce functions have different semantics. A streaming reduce on a data stream emits the current reduced value for every new element on a data stream. On a windowed data stream it works as a batch reduce: it produces at most one value per window.
         <br/>
-
-         A reducer that sums up the incoming stream:</p>
+	<br/>
+         A reducer that sums up the incoming stream, the result is a stream of intermediate sums:</p>
 {% highlight java %}
 dataStream.reduce(new ReduceFunction<Integer>() {
             @Override
@@ -413,7 +436,7 @@ dataStream.reduce(new ReduceFunction<Integer>() {
     <tr>
       <td><strong>Merge</strong></td>
       <td>
-        <p>Merges two or more datastreams creating a new stream containing all the elements from all the streams.</p>
+        <p>Merges two or more data streams creating a new stream containing all the elements from all the streams.</p>
 {% highlight java %}
 dataStream.merge(otherStream1, otherStream2, …)
 {% endhighlight %}
@@ -424,7 +447,7 @@ dataStream.merge(otherStream1, otherStream2, …)
 
 ----------
 
-The following transformations are available on data sets of Tuples:
+The following transformations are available on data streams of Tuples:
 
 <table class="table table-bordered">
   <thead>
@@ -439,8 +462,8 @@ The following transformations are available on data sets of Tuples:
       <td>
         <p>Selects a subset of fields from the tuples</p>
 {% highlight java %}
-DataSet<Tuple3<Integer, Double, String>> in = // [...]
-DataSet<Tuple2<String, Integer>> out = in.project(2,0);
+DataStream<Tuple3<Integer, Double, String>> in = // [...]
+DataStream<Tuple2<String, Integer>> out = in.project(2,0);
 {% endhighlight %}
       </td>
     </tr>
@@ -489,12 +512,8 @@ data.flatMap { str => str.split(" ") }
       <td><strong>Filter</strong></td>
       <td>
         <p>Evaluates a boolean function for each element and retains those for which the function returns true.
-        <br/>
-        
-        <strong>IMPORTANT:</strong> The system assumes that the function does not modify the elements on which the predicate is applied. Violating this assumption
-        can lead to incorrect results.
-        <br/>
-        
+       	<br/>
+	<br/>
         A filter that filters out zero values:
         </p>
 {% highlight scala %}
@@ -510,10 +529,10 @@ dataStream.filter{ _ != 0 }
         into one and emits the current state after every reduction. Reduce may be applied on a full, windowed or grouped data stream.
         <br/>
         
-        <strong>IMPORTANT:</strong> The streaming and the batch reduce functions have different semantics. A streaming reduce on a full or grouped data stream emits the current reduced value for every new element on a data stream. On a windowed data stream it works as a batch reduce: it produces at most one value.
+        <strong>IMPORTANT:</strong> The streaming and the batch reduce functions have different semantics. A streaming reduce on a data stream emits the current reduced value for every new element on a data stream. On a windowed data stream it works as a batch reduce: it produces at most one value per window.
         <br/>
-
-         A reducer that sums up the incoming stream:</p>
+	<br/>
+         A reducer that sums up the incoming stream, the result is a stream of intermediate sums:</p>
 {% highlight scala %}
 dataStream.reduce{ _ + _ }
 {% endhighlight %}
@@ -523,7 +542,7 @@ dataStream.reduce{ _ + _ }
     <tr>
       <td><strong>Merge</strong></td>
       <td>
-        <p>Merges two or more datastreams creating a new stream containing all the elements from all the streams.</p>
+        <p>Merges two or more data streams creating a new stream containing all the elements from all the streams.</p>
 {% highlight scala %}
 dataStream.merge(otherStream1, otherStream2, …)
 {% endhighlight %}
@@ -544,9 +563,11 @@ dataStream.merge(otherStream1, otherStream2, …)
 Some transformations require that the elements of a `DataStream` are grouped on some key. The user can create a `GroupedDataStream` by calling the `groupBy(key)` method of a non-grouped `DataStream`. 
 Keys can be of three types: fields positions (applicable for tuple/array types), field expressions (applicable for pojo types), KeySelector instances. 
 
+Aggregation or reduce operators called on `GroupedDataStream`s produce elements on a per group basis.
+
 ### Aggregations
 
-The Flink Streaming API supports different types of pre-defined aggregation operators similarly to the batch API.
+The Flink Streaming API supports different types of pre-defined aggregations `DataStreams`. The common property of these operators, just like reduce on streams, they produce the stream of intermediate aggregate values.
 
 Types of aggregations: `sum(field)`, `min(field)`, `max(field)`, `minBy(field, first)`, `maxBy(field, first)`.
 
@@ -558,22 +579,26 @@ There is also an option to apply user defined aggregations with the usage of the
 
 ### Window operators
 
-Flink streaming provides very flexible windowing semantics to create arbitrary windows (also referred to as discretizations or slices) of the data streams and apply reduce, map or aggregation transformations on the windows acquired. Windowing can be used for instance to create rolling aggregations of the most recent N elements, where N could be defined by Time, Count or any arbitrary user defined measure.
+Flink streaming provides very flexible data-driven windowing semantics to create arbitrary windows (also referred to as discretizations or slices) of the data streams and apply reduce, map or aggregation transformations on the windows acquired. Windowing can be used for instance to create rolling aggregations of the most recent N elements, where N could be defined by Time, Count or any arbitrary user defined measure. 
 
-The user can control the size (eviction) of the windows and the frequency of reduction or aggregation calls (trigger) on them in an intuitive API (some examples):
+The user can control the size (eviction) of the windows and the frequency of transformation or aggregation calls (trigger) on them in an intuitive API. We will describe the exact semantics of these operators in the [policy based windowing](#policy-based-windowing) section.
+
+Some examples:
 
  * `dataStream.window(eviction).every(trigger).reduceWindow(…)`
  * `dataStream.window(…).every(…).mapWindow(…).flatten()`
  * `dataStream.window(…).every(…).groupBy(…).aggregate(…).getDiscretizedStream()`
 
-The core abstraction of the Windowing semantics is the `WindowedDataStream` and the `StreamWindow`. The `WindowedDataStream` is created when we call the `window(…)` method of the DataStream and represents the windowed discretisation of the underlying stream. The user can think about it simply as a `DataStream<StreamWindow<T>>` where additional API functions are supplied to provide efficient transformations of individual windows. 
+The core abstraction of the Windowing semantics is the `WindowedDataStream` and the `StreamWindow`. The `WindowedDataStream` is created when we first call the `window(…)` method of the DataStream and represents the windowed discretisation of the underlying stream. The user can think about it simply as a `DataStream<StreamWindow<T>>` where additional API functions are supplied to provide efficient transformations of individual windows. 
+
+Please note at this point that the `.every(…)` call belongs together with the preceding `.window(…)` call and does not define a new transformation in itself.
 
-The result of a window transformation is again a `WindowedDataStream` which can also be used to further transform the resulting windows. In this sense, window transformations define mapping from stream windows to stream windows.
+The result of a window transformation is again a `WindowedDataStream` which can also be used to further apply other windowed computations. In this sense, window transformations define mapping from stream windows to stream windows.
 
 The user has different ways of using the a result of a window operation:
 
  * `windowedDataStream.flatten()` - streams the results element wise and returns a `DataStream<T>` where T is the type of the underlying windowed stream
- * `windowedDataStream.getDiscretizedStream()` - returns a `DataStream<StreamWindow<T>>` for applying some advanced logic on the stream windows itself
+ * `windowedDataStream.getDiscretizedStream()` - returns a `DataStream<StreamWindow<T>>` for applying some advanced logic on the stream windows itself, be careful here as at this point we need to materialise the full windows
  * Calling any window transformation further transforms the windows, while preserving the windowing logic
 
 The next example would create windows that hold elements of the last 5 seconds, and the user defined transformation would be executed on the windows every second (sliding the window by 1 second):
@@ -619,12 +644,14 @@ For detailed description of these policies please refer to the [Javadocs](http:/
 #### Policy based windowing
 The policy based windowing is a highly flexible way to specify stream discretisation also called windowing semantics. Two types of policies are used for such a specification:
 
- * `TriggerPolicy` defines when to trigger the reduce UDF on the current window and emit the result. In the API it completes a window statement such as: `window(…).every(…)`, while the triggering policy is passed within `every`. 
-
-Several predefined policies are provided in the API, including delta-based, punctuation based, count-based and time-based policies. Policies are in general UDFs and can implement any custom behaviour.
+ * `TriggerPolicy` defines when to trigger the reduce or transformation UDF on the current window and emit the result. In the API it completes a window statement such as: `window(…).every(…)`, while the triggering policy is passed within `every`. In case the user wants to use tumbling eviction policy (the window is emptied after the transformation) he can omit the `.every(…)` call and pass the trigger policy directly to the `.window(…)` call.
 
  * `EvictionPolicy` defines the length of a window as a means of a predicate for evicting tuples when they are no longer needed. In the API this can be defined by the `window(…)` operation on a stream. There are mostly the same predefined policy types provided as for trigger policies.
 
+Trigger and eviction policies work totally independently of each other. The eviction policy continuously maintains a window, into which it adds new elements and based on the eviction logic removes older elements in the order of arrival. The trigger policy on the other hand only decided at each new incoming element, whether it should trigger computation (and output results) on the currently maintained window.
+
+Several predefined policies are provided in the API, including delta-based, punctuation based, count-based and time-based policies. Policies are in general UDFs and can implement any custom behaviour.
+
 In addition to the `dataStream.window(…).every(…)` style users can specifically pass the trigger and eviction policies during the window call:
 
 <div class="codetabs" markdown="1">
@@ -642,7 +669,7 @@ dataStream.window(triggerPolicy, evictionPolicy)
 
 </div>
 
-By default most triggers can only trigger when a new element arrives. This might not be suitable for all the use-cases, especially when time based windowing is applied. To also provide triggering between elements so called active policies (the two interfaces controlling this special behaviour is ActiveTriggerPolicy and CentralActiveTrigger) can be used. The predefined time-based policies are already implemented in such a way and can hold as an example for user defined active policy implementations. 
+By default triggers can only trigger when a new element arrives. This might not be suitable for all the use-cases with low data rates . To also provide triggering between elements so called active policies (the two interfaces controlling this special behaviour is `ActiveTriggerPolicy` and `CentralActiveTrigger`) can be used. The predefined time-based policies are already implemented in such a way and can hold as an example for user defined active policy implementations. 
 
 Time-based trigger and eviction policies can work with user defined `TimeStamp` implementations, these policies already cover most use cases.
  
@@ -686,7 +713,7 @@ windowedDataStream.mapWindow(windowMapFunction)
 </div>
 
 #### Grouped transformations on windowed data streams
-Calling the `groupBy(…)` method on a windowed stream groups the elements by the given fields inside the stream windows. The window sizes (evictions) and slide sizes (triggers) will be calculated on the whole stream (in a global fashion), but the user defined functions will be applied on a per group basis. This means that for a call `windowedStream.groupBy(…).reduceWindow(…)` will transform each window into another window consisting of as many elements as groups, with the reduced values per key. Similarly the `mapWindow` transformation is applied per group as well.
+Calling the `groupBy(…)` method on a windowed stream groups the elements by the given fields inside the stream windows. The window sizes (evictions) and slide sizes (triggers) will be calculated on the whole stream (in a global fashion), but the user defined functions will be applied on a per group basis inside the window. This means that for a call `windowedStream.groupBy(…).reduceWindow(…)` will transform each window into another window consisting of as many elements as keys in the original window, with the reduced values per key. Similarly the `mapWindow` transformation is applied per group as well.
 
 The user can also create discretisation on a per group basis calling `window(…).every(…)` on an already grouped data stream. This will apply the discretisation logic independently for each key.
 
@@ -761,11 +788,13 @@ For example `dataStream.window(Count.of(100)).maxBy(field)` would create global
 
 ### Temporal database style operators
 
-While database style operators like joins (on key) and crosses are hard to define properly on data streams, a straightforward implementation is to apply these operators on windows of the data streams. 
+While database style operators like joins (on key) and crosses are hard to define properly on data streams, a straightforward interpretation is to apply these operators on windows of the data streams. 
+
+Currently join and cross operators are supported only on time windows. We are working on alleviating this limitations in the next release.
 
-Currently join and cross operators are supported on time windows.
+Temporal operators take the current windows of both streams and apply the join/cross logic on these window pairs.
 
-The Join transformation produces a new Tuple DataStream with two fields. Each tuple holds a joined element of the first input DataStream in the first tuple field and a matching element of the second input DataStream in the second field for the current window.
+The Join transformation produces a new Tuple DataStream with two fields. Each tuple holds a joined element of the first input DataStream in the first tuple field and a matching element of the second input DataStream in the second field for the current window. The user can also supply a custom join function to control the produced elements.
 
 The following code shows a default Join transformation using field position keys:
 
@@ -788,7 +817,7 @@ dataStream1.join(dataStream2)
 </div>
 </div>
 
-The Cross transformation combines two DataStreams into one DataStream. It builds all pairwise combinations of the elements of both input DataStreams in the current window, i.e., it builds a temporal Cartesian product.
+The Cross transformation combines two `DataStream`s into one `DataStream`. It builds all pairwise combinations of the elements of both input DataStreams in the current window, i.e., it builds a temporal Cartesian product. The user can also supply a custom cross function to control the produced elements
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -806,8 +835,8 @@ dataStream1 cross dataStream2 onWindow (windowing_params)
 
 ### Co operators
 
-Co operators allow the users to jointly transform two DataStreams of different types providing a simple way to jointly manipulate a shared state. It is designed to support joint stream transformations where merging is not appropriate due to different data types or in case the user needs explicit tracking of the joined stream origin.
-Co operators can be applied to ConnectedDataStreams which represent two DataStreams of possibly different types. A ConnectedDataStream can be created by calling the `connect(otherDataStream)` method of a DataStream. Please note that the two connected DataStreams can also be merged data streams.
+Co operators allow the users to jointly transform two `DataStream`s of different types providing a simple way to jointly manipulate streams with a shared state. It is designed to support joint stream transformations where merging is not appropriate due to different data types or in case the user needs explicit tracking of the origin of individual elements.
+Co operators can be applied to `ConnectedDataStream`s which represent two `DataStream`s of possibly different types. A `ConnectedDataStream` can be created by calling the `connect(otherDataStream)` method of a `DataStream`. Please note that the two connected `DataStream`s can also be merged data streams.
 
 #### Map on ConnectedDataStream
 Applies a CoMap transformation on two separate DataStreams, mapping them to a common output type. The transformation calls a `CoMapFunction.map1()` for each element of the first input and `CoMapFunction.map2()` for each element of the second input. Each CoMapFunction call returns exactly one element.
@@ -936,6 +965,8 @@ Iterable<String> select(Integer value) {
 {% endhighlight %}
 
 Every output will be emitted to the selected outputs exactly once, even if you add the same output names more than once.
+
+The functionality provided by output splitting can also be achieved efficiently (due to operator chaining) by multiple filter operators.
 </div>
 <div data-lang="scala" markdown="1">
 
@@ -961,6 +992,8 @@ Data streams only receive the elements directed to selected output names. The us
 The outputs of an operator are directed by implementing a function that returns the output names for the value. The data is sent to all the outputs returned by the function (referenced by their name). This way the direction of the outputs can be determined by the value of the data sent.
 
 Every output will be emitted to the selected outputs exactly once, even if you add the same output names more than once.
+
+The functionality provided by output splitting can also be achieved efficiently (due to operator chaining) by multiple filter operators.
 </div>
 
 </div>
@@ -1058,7 +1091,7 @@ Rich functions provide, in addition to the user-defined function (`map()`, `redu
 Lambda expressions with Java 8
 ------------
 
-For a more consice code one can rely on one of the main feautere of Java 8, lambda expressions. The following program has similar functionality to the one provided in the [example](#example-program) section, while showcasing the usage of lambda expressions.
+For a more consise code one can rely on one of the main feature of Java 8, lambda expressions. The following program has similar functionality to the one provided in the [example](#example-program) section, while showcasing the usage of lambda expressions.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java8" markdown="1">