You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/17 21:42:24 UTC

[1/7] incubator-flink git commit: [streaming] Updated deprecated iterative functionality and docs

Repository: incubator-flink
Updated Branches:
  refs/heads/release-0.8 cb607df8e -> 7ef04c625


[streaming] Updated deprecated iterative functionality and docs


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c8e306b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c8e306b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c8e306b6

Branch: refs/heads/release-0.8
Commit: c8e306b6567a4b615c54e8d8f88116ff6f1a0e38
Parents: e34aca7
Author: Gyula Fora <gy...@apache.org>
Authored: Tue Dec 16 22:49:58 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Dec 17 21:41:08 2014 +0100

----------------------------------------------------------------------
 docs/streaming_guide.md                         | 18 +++----
 .../flink/streaming/api/JobGraphBuilder.java    | 18 +++----
 .../flink/streaming/api/StreamConfig.java       |  8 +--
 .../streaming/api/datastream/DataStream.java    | 29 +++++------
 .../api/datastream/IterativeDataStream.java     | 55 +++++---------------
 .../api/streamvertex/StreamIterationHead.java   |  4 +-
 .../api/streamvertex/StreamIterationTail.java   |  6 +--
 .../examples/iteration/IterateExample.java      |  4 +-
 8 files changed, 56 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c8e306b6/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index 379bb73..6e7f932 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -505,7 +505,7 @@ Every output will be emitted to the selected outputs exactly once, even if you a
 
 ### Iterations
 The Flink Streaming API supports implementing iterative stream processing dataflows similarly to the core Flink API. Iterative streaming programs also implement a step function and embed it into an `IterativeDataStream`.
-Unlike in the core API the user does not define the maximum number of iterations, but at the tail of each iteration the output is both streamed forward to the next operator and also streamed back to the iteration head. The user controls the output of the iteration tail using [output splitting](#output-splitting).
+Unlike in the core API the user does not define the maximum number of iterations, but at the tail of each iteration part of the output is streamed forward to the next operator and part is streamed back to the iteration head. The user controls the output of the iteration tail using [output splitting](#output-splitting).
 To start an iterative part of the program the user defines the iteration starting point:
 
 ~~~java
@@ -517,20 +517,18 @@ The operator applied on the iteration starting point is the head of the iteratio
 DataStream<Integer> head = iteration.map(new IterationHead());
 ~~~
 
-To close an iteration and define the iteration tail, the user calls `.closeWith(tail)` method of the `IterativeDataStream`:
+To close an iteration and define the iteration tail, the user calls `.closeWith(iterationTail)` method of the `IterativeDataStream`.
 
-~~~java
-DataStream<Integer> tail = head.map(new IterationTail());
-iteration.closeWith(tail);
-~~~
-Or to use with output splitting:
+A common pattern is to use output splitting:
 
 ~~~java
-SplitDataStream<Integer> tail = head.map(new IterationTail()).split(outputSelector);
-iteration.closeWith(tail.select("iterate"));
+SplitDataStream<..> tailOperator = head.map(new IterationTail()).split(outputSelector);
+iteration.closeWith(tailOperator.select("iterate"));
 ~~~ 
 
-Because iterative streaming programs do not have a set number of iteratons for each data element, the streaming program has no information on the end of its input. From this it follows that iterative streaming programs run until the user manually stops the program. While this is acceptable under normal circumstances a method is provided to allow iterative programs to shut down automatically if no input received by the iteration head for a predefined number of milliseconds.
+In these case all output directed to the “iterate” edge would be fed back to the iteration head.
+
+Because iterative streaming programs do not have a set number of iterations for each data element, the streaming program has no information on the end of its input. From this it follows that iterative streaming programs run until the user manually stops the program. While this is acceptable under normal circumstances a method is provided to allow iterative programs to shut down automatically if no input received by the iteration head for a predefined number of milliseconds.
 To use this function the user needs to call, the `iteration.setMaxWaitTime(millis)` to control the max wait time. 
 
 ### Rich functions

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c8e306b6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 5fc4a1b..d63042a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -76,9 +76,9 @@ public class JobGraphBuilder {
 	private Map<String, byte[]> serializedFunctions;
 	private Map<String, byte[]> outputSelectors;
 	private Map<String, Class<? extends AbstractInvokable>> vertexClasses;
-	private Map<String, String> iterationIds;
-	private Map<String, String> iterationIDtoHeadName;
-	private Map<String, String> iterationIDtoTailName;
+	private Map<String, Integer> iterationIds;
+	private Map<Integer, String> iterationIDtoHeadName;
+	private Map<Integer, String> iterationIDtoTailName;
 	private Map<String, Integer> iterationTailCount;
 	private Map<String, Long> iterationWaitTime;
 	private Map<String, Map<String, OperatorState<?>>> operatorStates;
@@ -109,9 +109,9 @@ public class JobGraphBuilder {
 		serializedFunctions = new HashMap<String, byte[]>();
 		outputSelectors = new HashMap<String, byte[]>();
 		vertexClasses = new HashMap<String, Class<? extends AbstractInvokable>>();
-		iterationIds = new HashMap<String, String>();
-		iterationIDtoHeadName = new HashMap<String, String>();
-		iterationIDtoTailName = new HashMap<String, String>();
+		iterationIds = new HashMap<String, Integer>();
+		iterationIDtoHeadName = new HashMap<Integer, String>();
+		iterationIDtoTailName = new HashMap<Integer, String>();
 		iterationTailCount = new HashMap<String, Integer>();
 		iterationWaitTime = new HashMap<String, Long>();
 		operatorStates = new HashMap<String, Map<String, OperatorState<?>>>();
@@ -205,7 +205,7 @@ public class JobGraphBuilder {
 	 * @param waitTime
 	 *            Max wait time for next record
 	 */
-	public void addIterationHead(String vertexName, String iterationHead, String iterationID,
+	public void addIterationHead(String vertexName, String iterationHead, Integer iterationID,
 			int parallelism, long waitTime) {
 
 		addVertex(vertexName, StreamIterationHead.class, null, null, null, parallelism);
@@ -242,7 +242,7 @@ public class JobGraphBuilder {
 	 * @param waitTime
 	 *            Max waiting time for next record
 	 */
-	public void addIterationTail(String vertexName, String iterationTail, String iterationID,
+	public void addIterationTail(String vertexName, String iterationTail, Integer iterationID,
 			int parallelism, long waitTime) {
 
 		if (bufferTimeout.get(iterationTail) == 0) {
@@ -558,7 +558,7 @@ public class JobGraphBuilder {
 			vertex.setSlotSharingGroup(shareGroup);
 		}
 
-		for (String iterID : new HashSet<String>(iterationIds.values())) {
+		for (Integer iterID : new HashSet<Integer>(iterationIds.values())) {
 			CoLocationGroup ccg = new CoLocationGroup();
 			AbstractJobVertex tail = streamVertices.get(iterationIDtoTailName.get(iterID));
 			AbstractJobVertex head = streamVertices.get(iterationIDtoHeadName.get(iterID));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c8e306b6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 31af9cb..1d863a7 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -228,12 +228,12 @@ public class StreamConfig {
 		}
 	}
 
-	public void setIterationId(String iterationId) {
-		config.setString(ITERATION_ID, iterationId);
+	public void setIterationId(Integer iterationId) {
+		config.setInteger(ITERATION_ID, iterationId);
 	}
 
-	public String getIterationId() {
-		return config.getString(ITERATION_ID, "iteration-0");
+	public Integer getIterationId() {
+		return config.getInteger(ITERATION_ID, 0);
 	}
 
 	public void setIterationWaitTime(long time) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c8e306b6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 3fc685a..5d03100 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -345,24 +345,24 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Initiates an iterative part of the program that executes multiple times
-	 * and feeds back data streams. The iterative part needs to be closed by
-	 * calling {@link IterativeDataStream#closeWith(DataStream)}. The
-	 * transformation of this IterativeDataStream will be the iteration head.
-	 * The data stream given to the {@code closeWith(DataStream)} method is the
-	 * data stream that will be fed back and used as the input for the iteration
-	 * head. Unlike in batch processing by default the output of the iteration
-	 * stream is directed to both to the iteration head and the next component.
-	 * To direct tuples to the iteration head or the output specifically one can
-	 * use the {@code split(OutputSelector)} on the iteration tail while
-	 * referencing the iteration head as 'iterate'.
+	 * Initiates an iterative part of the program that feeds back data streams.
+	 * The iterative part needs to be closed by calling
+	 * {@link IterativeDataStream#closeWith(DataStream)}. The transformation of
+	 * this IterativeDataStream will be the iteration head. The data stream
+	 * given to the {@link IterativeDataStream#closeWith(DataStream)} method is
+	 * the data stream that will be fed back and used as the input for the
+	 * iteration head. A common usage pattern for streaming iterations is to use
+	 * output splitting to send a part of the closing data stream to the head.
+	 * Refer to {@link SingleOutputStreamOperator#split(OutputSelector)} for
+	 * more information.
 	 * <p>
 	 * The iteration edge will be partitioned the same way as the first input of
 	 * the iteration head.
 	 * <p>
 	 * By default a DataStream with iteration will never terminate, but the user
 	 * can use the {@link IterativeDataStream#setMaxWaitTime} call to set a max
-	 * waiting time for the iteration.
+	 * waiting time for the iteration head. If no data received in the set time,
+	 * the stream terminates.
 	 * 
 	 * @return The iterative data stream created.
 	 */
@@ -1118,7 +1118,7 @@ public class DataStream<OUT> {
 		return returnStream;
 	}
 
-	protected <R> DataStream<OUT> addIterationSource(String iterationID, long waitTime) {
+	protected <R> DataStream<OUT> addIterationSource(Integer iterationID, long waitTime) {
 
 		DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource", null);
 
@@ -1162,8 +1162,7 @@ public class DataStream<OUT> {
 
 		if (inputStream instanceof IterativeDataStream) {
 			IterativeDataStream<OUT> iterativeStream = (IterativeDataStream<OUT>) inputStream;
-			returnStream.addIterationSource(iterativeStream.iterationID.toString(),
-					iterativeStream.waitTime);
+			returnStream.addIterationSource(iterativeStream.iterationID, iterativeStream.waitTime);
 		}
 
 		return returnStream;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c8e306b6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index 29f5eec..d8497ae 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -17,11 +17,6 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.partitioner.DistributePartitioner;
-
 /**
  * The iterative data stream represents the start of an iteration in a
  * {@link DataStream}.
@@ -52,49 +47,27 @@ public class IterativeDataStream<IN> extends
 
 	/**
 	 * Closes the iteration. This method defines the end of the iterative
-	 * program part. By default the DataStream represented by the parameter will
-	 * be fed back to the iteration head, however the user can explicitly select
-	 * which tuples should be iterated by {@code directTo(OutputSelector)}.
-	 * Tuples directed to 'iterate' will be fed back to the iteration head.
+	 * program part that will be fed back to the start of the iteration. </br>
+	 * </br>A common usage pattern for streaming iterations is to use output
+	 * splitting to send a part of the closing data stream to the head. Refer to
+	 * {@link SingleOutputStreamOperator#split(OutputSelector)} for more
+	 * information.
 	 * 
-	 * @param iterationResult
-	 *            The data stream that can be fed back to the next iteration.
 	 * 
-	 */
-	public DataStream<IN> closeWith(DataStream<IN> iterationResult) {
-		return closeWith(iterationResult, "iterate");
-	}
-
-	/**
-	 * Closes the iteration. This method defines the end of the iterative
-	 * program part. By default the DataStream represented by the parameter will
-	 * be fed back to the iteration head, however the user can explicitly select
-	 * which tuples should be iterated by {@code directTo(OutputSelector)}.
-	 * Tuples directed to 'iterate' will be fed back to the iteration head.
-	 * 
-	 * @param iterationTail
-	 *            The data stream that can be fed back to the next iteration.
-	 * @param iterationName
-	 *            Name of the iteration edge (backward edge to iteration head)
-	 *            when used with directed emits
+	 * @param iterationResult
+	 *            The data stream that is fed back to the next iteration head.
+	 * @return Returns the stream that was fed back to the iteration. In most
+	 *         cases no further transformation are applied on this stream.
 	 * 
 	 */
-	public <R> DataStream<IN> closeWith(DataStream<IN> iterationTail, String iterationName) {
-		DataStream<R> returnStream = new DataStreamSink<R>(environment, "iterationSink", null);
+	public DataStream<IN> closeWith(DataStream<IN> iterationTail) {
+		DataStream<IN> iterationSink = new DataStreamSink<IN>(environment, "iterationSink", null);
 
-		jobGraphBuilder.addIterationTail(returnStream.getId(), iterationTail.getId(),
-				iterationID.toString(), iterationTail.getParallelism(), waitTime);
+		jobGraphBuilder.addIterationTail(iterationSink.getId(), iterationTail.getId(), iterationID,
+				iterationTail.getParallelism(), waitTime);
 
 		jobGraphBuilder.setIterationSourceSettings(iterationID.toString(), iterationTail.getId());
-
-		List<String> name = Arrays.asList(new String[] { iterationName });
-
-		for (DataStream<IN> stream : iterationTail.mergedStreams) {
-			String inputID = stream.getId();
-			jobGraphBuilder.setEdge(inputID, returnStream.getId(), new DistributePartitioner<IN>(
-					true), 0, name, false);
-		}
-
+		connectGraph(iterationTail, iterationSink.getId(), 0);
 		return iterationTail;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c8e306b6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
index 4dfecb1..f85fc7e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
@@ -36,7 +36,7 @@ public class StreamIterationHead<OUT extends Tuple> extends StreamVertex<OUT,OUT
 	private OutputHandler<OUT> outputHandler;
 
 	private static int numSources;
-	private String iterationId;
+	private Integer iterationId;
 	@SuppressWarnings("rawtypes")
 	private BlockingQueue<StreamRecord> dataChannel;
 	private long iterationWaitTime;
@@ -58,7 +58,7 @@ public class StreamIterationHead<OUT extends Tuple> extends StreamVertex<OUT,OUT
 		shouldWait = iterationWaitTime > 0;
 
 		try {
-			BlockingQueueBroker.instance().handIn(iterationId, dataChannel);
+			BlockingQueueBroker.instance().handIn(iterationId.toString(), dataChannel);
 		} catch (Exception e) {
 
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c8e306b6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
index b603686..1883c06 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
@@ -27,13 +27,13 @@ import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StreamIterationTail<IN extends Tuple> extends StreamVertex<IN,IN> {
+public class StreamIterationTail<IN extends Tuple> extends StreamVertex<IN, IN> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamIterationTail.class);
 
 	private InputHandler<IN> inputHandler;
 
-	private String iterationId;
+	private Integer iterationId;
 	@SuppressWarnings("rawtypes")
 	private BlockingQueue<StreamRecord> dataChannel;
 	private long iterationWaitTime;
@@ -50,7 +50,7 @@ public class StreamIterationTail<IN extends Tuple> extends StreamVertex<IN,IN> {
 			iterationId = configuration.getIterationId();
 			iterationWaitTime = configuration.getIterationWaitTime();
 			shouldWait = iterationWaitTime > 0;
-			dataChannel = BlockingQueueBroker.instance().get(iterationId);
+			dataChannel = BlockingQueueBroker.instance().get(iterationId.toString());
 		} catch (Exception e) {
 			throw new StreamVertexException(String.format(
 					"Cannot register inputs of StreamIterationSink %s", iterationId), e);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c8e306b6/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index 8fb42d6..4a51c41 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -68,7 +68,7 @@ public class IterateExample {
 		// obtain execution environment and set setBufferTimeout(0) to enable
 		// continuous flushing of the output buffers (lowest latency)
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
-				.setBufferTimeout(0);
+				.setBufferTimeout(1);
 
 		// create an iterative data stream from the input
 		IterativeDataStream<Tuple2<Double, Integer>> it = env.fromCollection(input).shuffle()
@@ -80,7 +80,7 @@ public class IterateExample {
 		// apply the step function to add new random value to the tuple and to
 		// increment the counter and split the output with the output selector
 		SplitDataStream<Tuple2<Double, Integer>> step = it.map(new Step()).shuffle()
-				.setBufferTimeout(1).split(new MySelector());
+				.split(new MySelector());
 
 		// close the iteration by selecting the tuples that were directed to the
 		// 'iterate' channel in the output selector


[7/7] incubator-flink git commit: [FLINK-1325] [streaming] Added clousure cleaning to streaming

Posted by mb...@apache.org.
[FLINK-1325] [streaming] Added clousure cleaning to streaming

This closes #273


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/02bad153
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/02bad153
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/02bad153

Branch: refs/heads/release-0.8
Commit: 02bad15318da525f6db938a41cd10c7203156314
Parents: c8e306b
Author: mbalassi <mb...@apache.org>
Authored: Wed Dec 17 16:46:01 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Dec 17 21:41:09 2014 +0100

----------------------------------------------------------------------
 .../api/datastream/CoBatchedDataStream.java     |  4 +-
 .../api/datastream/CoWindowDataStream.java      |  4 +-
 .../api/datastream/ConnectedDataStream.java     | 38 +++++++++-----
 .../streaming/api/datastream/DataStream.java    | 55 +++++++++++++-------
 .../api/datastream/GroupedDataStream.java       |  9 ++--
 .../datastream/SingleOutputStreamOperator.java  |  3 +-
 .../api/datastream/WindowedDataStream.java      | 18 ++++---
 .../environment/StreamExecutionEnvironment.java | 24 +++++++--
 8 files changed, 103 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02bad153/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java
index 387c356..3b58188 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java
@@ -110,10 +110,10 @@ public class CoBatchedDataStream<IN1, IN2> extends ConnectedDataStream<IN1, IN2>
 			CoReduceFunction<IN1, IN2, OUT> coReducer) {
 		CoBatchReduceInvokable<IN1, IN2, OUT> invokable;
 		if (isGrouped) {
-			invokable = new CoGroupedBatchReduceInvokable<IN1, IN2, OUT>(coReducer, batchSize1,
+			invokable = new CoGroupedBatchReduceInvokable<IN1, IN2, OUT>(clean(coReducer), batchSize1,
 					batchSize2, slideSize1, slideSize2, keySelector1, keySelector2);
 		} else {
-			invokable = new CoBatchReduceInvokable<IN1, IN2, OUT>(coReducer, batchSize1,
+			invokable = new CoBatchReduceInvokable<IN1, IN2, OUT>(clean(coReducer), batchSize1,
 					batchSize2, slideSize1, slideSize2);
 		}
 		return invokable;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02bad153/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
index 6e47873..c8c634a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
@@ -96,11 +96,11 @@ public class CoWindowDataStream<IN1, IN2> extends CoBatchedDataStream<IN1, IN2>
 			CoReduceFunction<IN1, IN2, OUT> coReducer) {
 		CoWindowReduceInvokable<IN1, IN2, OUT> invokable;
 		if (isGrouped) {
-			invokable = new CoGroupedWindowReduceInvokable<IN1, IN2, OUT>(coReducer, batchSize1,
+			invokable = new CoGroupedWindowReduceInvokable<IN1, IN2, OUT>(clean(coReducer), batchSize1,
 					batchSize2, slideSize1, slideSize2, keySelector1, keySelector2, timeStamp1,
 					timeStamp2);
 		} else {
-			invokable = new CoWindowReduceInvokable<IN1, IN2, OUT>(coReducer, batchSize1,
+			invokable = new CoWindowReduceInvokable<IN1, IN2, OUT>(clean(coReducer), batchSize1,
 					batchSize2, slideSize1, slideSize2, timeStamp1, timeStamp2);
 		}
 		return invokable;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02bad153/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 6336e68..dcc3dab 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -96,6 +97,18 @@ public class ConnectedDataStream<IN1, IN2> {
 		this.keySelector2 = coDataStream.keySelector2;
 	}
 
+	public <F> F clean(F f) {
+		if (getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) {
+			ClosureCleaner.clean(f, true);
+		}
+		ClosureCleaner.ensureSerializable(f);
+		return f;
+	}
+
+	public StreamExecutionEnvironment getExecutionEnvironment() {
+		return environment;
+	}
+
 	/**
 	 * Returns the first {@link DataStream}.
 	 * 
@@ -404,8 +417,8 @@ public class ConnectedDataStream<IN1, IN2> {
 		TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoMapFunction.class,
 				coMapper.getClass(), 2, null, null);
 
-		return addCoFunction("coMap", coMapper, outTypeInfo, new CoMapInvokable<IN1, IN2, OUT>(
-				coMapper));
+		return addCoFunction("coMap", clean(coMapper), outTypeInfo,
+				new CoMapInvokable<IN1, IN2, OUT>(clean(coMapper)));
 	}
 
 	/**
@@ -428,8 +441,8 @@ public class ConnectedDataStream<IN1, IN2> {
 		TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoFlatMapFunction.class,
 				coFlatMapper.getClass(), 2, null, null);
 
-		return addCoFunction("coFlatMap", coFlatMapper, outTypeInfo,
-				new CoFlatMapInvokable<IN1, IN2, OUT>(coFlatMapper));
+		return addCoFunction("coFlatMap", clean(coFlatMapper), outTypeInfo,
+				new CoFlatMapInvokable<IN1, IN2, OUT>(clean(coFlatMapper)));
 	}
 
 	/**
@@ -453,7 +466,8 @@ public class ConnectedDataStream<IN1, IN2> {
 		TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoReduceFunction.class,
 				coReducer.getClass(), 2, null, null);
 
-		return addCoFunction("coReduce", coReducer, outTypeInfo, getReduceInvokable(coReducer));
+		return addCoFunction("coReduce", clean(coReducer), outTypeInfo,
+				getReduceInvokable(clean(coReducer)));
 	}
 
 	/**
@@ -517,8 +531,8 @@ public class ConnectedDataStream<IN1, IN2> {
 		TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoWindowFunction.class,
 				coWindowFunction.getClass(), 2, null, null);
 
-		return addCoFunction("coWindowReduce", coWindowFunction, outTypeInfo,
-				new CoWindowInvokable<IN1, IN2, OUT>(coWindowFunction, windowSize, slideInterval,
+		return addCoFunction("coWindowReduce", clean(coWindowFunction), outTypeInfo,
+				new CoWindowInvokable<IN1, IN2, OUT>(clean(coWindowFunction), windowSize, slideInterval,
 						timestamp1, timestamp2));
 	}
 
@@ -526,10 +540,10 @@ public class ConnectedDataStream<IN1, IN2> {
 			CoReduceFunction<IN1, IN2, OUT> coReducer) {
 		CoReduceInvokable<IN1, IN2, OUT> invokable;
 		if (isGrouped) {
-			invokable = new CoGroupedReduceInvokable<IN1, IN2, OUT>(coReducer, keySelector1,
+			invokable = new CoGroupedReduceInvokable<IN1, IN2, OUT>(clean(coReducer), keySelector1,
 					keySelector2);
 		} else {
-			invokable = new CoReduceInvokable<IN1, IN2, OUT>(coReducer);
+			invokable = new CoReduceInvokable<IN1, IN2, OUT>(clean(coReducer));
 		}
 		return invokable;
 	}
@@ -542,7 +556,7 @@ public class ConnectedDataStream<IN1, IN2> {
 				crossFunction.getClass(), 2, null, null);
 
 		CrossWindowFunction<IN1, IN2, OUT> crossWindowFunction = new CrossWindowFunction<IN1, IN2, OUT>(
-				crossFunction);
+				clean(crossFunction));
 
 		return addGeneralWindowCombine(crossWindowFunction, outTypeInfo, windowSize, slideInterval,
 				timestamp1, timestamp2);
@@ -593,8 +607,8 @@ public class ConnectedDataStream<IN1, IN2> {
 			throw new IllegalArgumentException("Slide interval must be positive");
 		}
 
-		return addCoFunction("coWindowReduce", coWindowFunction, outTypeInfo,
-				new CoWindowInvokable<IN1, IN2, OUT>(coWindowFunction, windowSize, slideInterval,
+		return addCoFunction("coWindowReduce", clean(coWindowFunction), outTypeInfo,
+				new CoWindowInvokable<IN1, IN2, OUT>(clean(coWindowFunction), windowSize, slideInterval,
 						timestamp1, timestamp2));
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02bad153/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 5d03100..474d57b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -35,6 +35,7 @@ import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -182,6 +183,18 @@ public class DataStream<OUT> {
 		return this.typeInfo;
 	}
 
+	public <F> F clean(F f) {
+		if (getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) {
+			ClosureCleaner.clean(f, true);
+		}
+		ClosureCleaner.ensureSerializable(f);
+		return f;
+	}
+
+	public StreamExecutionEnvironment getExecutionEnvironment() {
+		return environment;
+	}
+
 	/**
 	 * Creates a new {@link DataStream} by merging {@link DataStream} outputs of
 	 * the same type with each other. The DataStreams merged using this operator
@@ -261,7 +274,7 @@ public class DataStream<OUT> {
 	 * @return The grouped {@link DataStream}
 	 */
 	public GroupedDataStream<OUT> groupBy(KeySelector<OUT, ?> keySelector) {
-		return new GroupedDataStream<OUT>(this, keySelector);
+		return new GroupedDataStream<OUT>(this, clean(keySelector));
 	}
 
 	/**
@@ -300,7 +313,7 @@ public class DataStream<OUT> {
 	 * @return
 	 */
 	public DataStream<OUT> partitionBy(KeySelector<OUT, ?> keySelector) {
-		return setConnectionType(new FieldsPartitioner<OUT>(keySelector));
+		return setConnectionType(new FieldsPartitioner<OUT>(clean(keySelector)));
 	}
 
 	/**
@@ -386,9 +399,10 @@ public class DataStream<OUT> {
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> map(MapFunction<OUT, R> mapper) {
 
-		TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(mapper, getType());
+		TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType());
 
-		return addFunction("map", mapper, getType(), outType, new MapInvokable<OUT, R>(mapper));
+		return addFunction("map", clean(mapper), getType(), outType, new MapInvokable<OUT, R>(
+				clean(mapper)));
 	}
 
 	/**
@@ -409,10 +423,10 @@ public class DataStream<OUT> {
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> flatMap(FlatMapFunction<OUT, R> flatMapper) {
 
-		TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(flatMapper, getType());
+		TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper), getType());
 
-		return addFunction("flatMap", flatMapper, getType(), outType, new FlatMapInvokable<OUT, R>(
-				flatMapper));
+		return addFunction("flatMap", clean(flatMapper), getType(), outType,
+				new FlatMapInvokable<OUT, R>(clean(flatMapper)));
 	}
 
 	/**
@@ -428,8 +442,8 @@ public class DataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
 
-		return addFunction("reduce", reducer, getType(), getType(), new StreamReduceInvokable<OUT>(
-				reducer));
+		return addFunction("reduce", clean(reducer), getType(), getType(),
+				new StreamReduceInvokable<OUT>(clean(reducer)));
 	}
 
 	/**
@@ -447,7 +461,8 @@ public class DataStream<OUT> {
 	 * @return The filtered DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> filter(FilterFunction<OUT> filter) {
-		return addFunction("filter", filter, getType(), getType(), new FilterInvokable<OUT>(filter));
+		return addFunction("filter", clean(filter), getType(), getType(), new FilterInvokable<OUT>(clean(
+				filter)));
 	}
 
 	/**
@@ -780,9 +795,9 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Writes a DataStream to the standard output stream (stdout).<br> For each
-	 * element of the DataStream the result of {@link Object#toString()} is
-	 * written.
+	 * Writes a DataStream to the standard output stream (stdout).<br>
+	 * For each element of the DataStream the result of
+	 * {@link Object#toString()} is written.
 	 * 
 	 * @return The closed DataStream.
 	 */
@@ -793,11 +808,11 @@ public class DataStream<OUT> {
 
 		return returnStream;
 	}
-	
+
 	/**
-	 * Writes a DataStream to the standard output stream (stderr).<br> For each
-	 * element of the DataStream the result of {@link Object#toString()} is
-	 * written.
+	 * Writes a DataStream to the standard output stream (stderr).<br>
+	 * For each element of the DataStream the result of
+	 * {@link Object#toString()} is written.
 	 * 
 	 * @return The closed DataStream.
 	 */
@@ -1112,7 +1127,7 @@ public class DataStream<OUT> {
 
 		StreamReduceInvokable<OUT> invokable = new StreamReduceInvokable<OUT>(aggregate);
 
-		SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("reduce", aggregate,
+		SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("reduce", clean(aggregate),
 				typeInfo, typeInfo, invokable);
 
 		return returnStream;
@@ -1229,8 +1244,8 @@ public class DataStream<OUT> {
 
 		try {
 			jobGraphBuilder.addStreamVertex(returnStream.getId(), new SinkInvokable<OUT>(
-					sinkFunction), inTypeInfo, null, "sink", SerializationUtils
-					.serialize(sinkFunction), degreeOfParallelism);
+					clean(sinkFunction)), inTypeInfo, null, "sink", SerializationUtils
+					.serialize(clean(sinkFunction)), degreeOfParallelism);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize SinkFunction");
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02bad153/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 32f664f..2620d3e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -60,9 +60,10 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 	 *            element of the input values with the same key.
 	 * @return The transformed DataStream.
 	 */
+	@Override
 	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
-		return addFunction("groupReduce", reducer, getType(), getType(),
-				new GroupedReduceInvokable<OUT>(reducer, keySelector));
+		return addFunction("groupReduce", clean(reducer), getType(), getType(),
+				new GroupedReduceInvokable<OUT>(clean(reducer), keySelector));
 	}
 
 	/**
@@ -178,10 +179,10 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 	@Override
 	protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
 
-		GroupedReduceInvokable<OUT> invokable = new GroupedReduceInvokable<OUT>(aggregate,
+		GroupedReduceInvokable<OUT> invokable = new GroupedReduceInvokable<OUT>(clean(aggregate),
 				keySelector);
 
-		SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("groupReduce", aggregate,
+		SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("groupReduce", clean(aggregate),
 				typeInfo, typeInfo, invokable);
 
 		return returnStream;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02bad153/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 76da27c..3e1c940 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -115,7 +115,8 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	 */
 	public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {
 		try {
-			jobGraphBuilder.setOutputSelector(id, SerializationUtils.serialize(outputSelector));
+			jobGraphBuilder.setOutputSelector(id,
+					SerializationUtils.serialize(clean(outputSelector)));
 
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize OutputSelector");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02bad153/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index b0ab99f..09b2678 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -123,6 +123,10 @@ public class WindowedDataStream<OUT> {
 		this.userEvicters = windowedDataStream.userEvicters;
 		this.allCentral = windowedDataStream.allCentral;
 	}
+	
+	public <F> F clean(F f){
+		return dataStream.clean(f);
+	}
 
 	/**
 	 * Defines the slide size (trigger frequency) for the windowed data stream.
@@ -226,7 +230,7 @@ public class WindowedDataStream<OUT> {
 	 * @return The transformed DataStream
 	 */
 	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reduceFunction) {
-		return dataStream.addFunction("NextGenWindowReduce", reduceFunction, getType(), getType(),
+		return dataStream.addFunction("NextGenWindowReduce", clean(reduceFunction), getType(), getType(),
 				getReduceInvokable(reduceFunction));
 	}
 
@@ -250,7 +254,7 @@ public class WindowedDataStream<OUT> {
 		TypeInformation<R> outType = TypeExtractor
 				.getGroupReduceReturnTypes(reduceFunction, inType);
 
-		return dataStream.addFunction("NextGenWindowReduce", reduceFunction, inType, outType,
+		return dataStream.addFunction("NextGenWindowReduce", clean(reduceFunction), inType, outType,
 				getReduceGroupInvokable(reduceFunction));
 	}
 
@@ -453,7 +457,7 @@ public class WindowedDataStream<OUT> {
 		StreamInvokable<OUT, OUT> invokable = getReduceInvokable(aggregator);
 
 		SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.addFunction("windowReduce",
-				aggregator, getType(), getType(), invokable);
+				clean(aggregator), getType(), getType(), invokable);
 
 		return returnStream;
 	}
@@ -550,12 +554,12 @@ public class WindowedDataStream<OUT> {
 	private <R> StreamInvokable<OUT, R> getReduceGroupInvokable(GroupReduceFunction<OUT, R> reducer) {
 		StreamInvokable<OUT, R> invokable;
 		if (isGrouped) {
-			invokable = new GroupedWindowInvokable<OUT, R>(reducer, keySelector,
+			invokable = new GroupedWindowInvokable<OUT, R>(clean(reducer), keySelector,
 					getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers(),
 					getCentralEvicters());
 
 		} else {
-			invokable = new WindowGroupReduceInvokable<OUT, R>(reducer, getTriggers(),
+			invokable = new WindowGroupReduceInvokable<OUT, R>(clean(reducer), getTriggers(),
 					getEvicters());
 		}
 		return invokable;
@@ -564,12 +568,12 @@ public class WindowedDataStream<OUT> {
 	private StreamInvokable<OUT, OUT> getReduceInvokable(ReduceFunction<OUT> reducer) {
 		StreamInvokable<OUT, OUT> invokable;
 		if (isGrouped) {
-			invokable = new GroupedWindowInvokable<OUT, OUT>(reducer, keySelector,
+			invokable = new GroupedWindowInvokable<OUT, OUT>(clean(reducer), keySelector,
 					getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers(),
 					getCentralEvicters());
 
 		} else {
-			invokable = new WindowReduceInvokable<OUT>(reducer, getTriggers(), getEvicters());
+			invokable = new WindowReduceInvokable<OUT>(clean(reducer), getTriggers(), getEvicters());
 		}
 		return invokable;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02bad153/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 85fb90f..783fa28 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -54,10 +55,10 @@ public abstract class StreamExecutionEnvironment {
 
 	private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();
 
-	private int degreeOfParallelism = 1;
-
 	private long bufferTimeout = 100;
 
+	private ExecutionConfig config = new ExecutionConfig();
+
 	protected JobGraphBuilder jobGraphBuilder;
 
 	// --------------------------------------------------------------------------------------------
@@ -72,6 +73,21 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
+	 * Sets the config object.
+	 */
+	public void setConfig(ExecutionConfig config) {
+		Validate.notNull(config);
+		this.config = config;
+	}
+
+	/**
+	 * Gets the config object.
+	 */
+	public ExecutionConfig getConfig() {
+		return config;
+	}
+
+	/**
 	 * Gets the degree of parallelism with which operation are executed by
 	 * default. Operations can individually override this value to use a
 	 * specific degree of parallelism via {@link DataStream#setParallelism}.
@@ -80,7 +96,7 @@ public abstract class StreamExecutionEnvironment {
 	 *         override that value.
 	 */
 	public int getDegreeOfParallelism() {
-		return this.degreeOfParallelism;
+		return config.getDegreeOfParallelism();
 	}
 
 	/**
@@ -100,7 +116,7 @@ public abstract class StreamExecutionEnvironment {
 		if (degreeOfParallelism < 1) {
 			throw new IllegalArgumentException("Degree of parallelism must be at least one.");
 		}
-		this.degreeOfParallelism = degreeOfParallelism;
+		config.setDegreeOfParallelism(degreeOfParallelism);
 		return this;
 	}
 


[5/7] incubator-flink git commit: [FLINK-610] Replace Avro by Kryo as the GenericType serializer

Posted by mb...@apache.org.
[FLINK-610] Replace Avro by Kryo as the GenericType serializer

The performance of data-intensive jobs using Kryo is probably going to be slow.

Set correct classloader

try to use Kryo.copy() with fallback to serialization copy


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a835e5df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a835e5df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a835e5df

Branch: refs/heads/release-0.8
Commit: a835e5dfb97624f3132761c7933aaffb03b0d06f
Parents: 1ac9651
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Dec 16 11:30:52 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Dec 17 21:41:09 2014 +0100

----------------------------------------------------------------------
 .../common/typeutils/SerializerTestBase.java    |   5 +-
 .../typeutils/SerializerTestInstance.java       |   2 -
 flink-java/pom.xml                              |   6 +
 .../api/java/typeutils/GenericTypeInfo.java     |  19 +--
 .../flink/api/java/typeutils/TypeExtractor.java |  17 +-
 .../java/typeutils/runtime/KryoSerializer.java  |  58 ++++---
 .../type/extractor/PojoTypeExtractionTest.java  | 166 +++++++++++--------
 .../AbstractGenericTypeSerializerTest.java      |   2 +-
 .../runtime/KryoGenericTypeComparatorTest.java  |   2 +-
 .../runtime/KryoGenericTypeSerializerTest.java  |  44 ++++-
 flink-scala/pom.xml                             |   8 +
 .../runtime/KryoGenericTypeSerializerTest.scala | 128 ++++++++++++++
 .../javaApiOperators/GroupReduceITCase.java     |  71 +++++++-
 .../util/CollectionDataSets.java                |  77 +++++++++
 14 files changed, 481 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a835e5df/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index d509284..5122af9 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -59,7 +59,10 @@ public abstract class SerializerTestBase<T> {
 	public void testInstantiate() {
 		try {
 			TypeSerializer<T> serializer = getSerializer();
-			
+			if(serializer.getClass().getName().endsWith("KryoSerializer")) {
+				// the kryo serializer will return null. We ignore this test for Kryo.
+				return;
+			}
 			T instance = serializer.createInstance();
 			assertNotNull("The created instance must not be null.", instance);
 			

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a835e5df/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
index c48e879..7f65995 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.api.common.typeutils;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
 
 public class SerializerTestInstance<T> extends SerializerTestBase<T> {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a835e5df/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 22826d8..14cb469 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -58,6 +58,12 @@ under the License.
 			<artifactId>asm</artifactId>
 		</dependency>
 		
+		<dependency>
+			<groupId>com.twitter</groupId>
+			<artifactId>chill_2.10</artifactId>
+			<version>0.5.1</version>
+		</dependency>
+
 		<!--  guava needs to be in "provided" scope, to make sure it is not included into the jars by the shading -->
 		<dependency>
 			<groupId>com.google.guava</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a835e5df/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index 8a1406b..5bc6cb9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -22,26 +22,16 @@ import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
 import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
-
-import java.util.Collection;
+import org.apache.flink.api.java.typeutils.runtime.KryoSerializer;
 
 
 public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
 
 	private final Class<T> typeClass;
-	private final static Class<?>[] unsupportedByAvro = new Class[] {Collection.class};
-	
+
 	public GenericTypeInfo(Class<T> typeClass) {
 		this.typeClass = typeClass;
-		for (Class<?> unsupported: unsupportedByAvro) {
-			if(unsupported.isAssignableFrom(typeClass)) {
-				throw new RuntimeException("The type '"+typeClass+"' is currently not supported " +
-						"by the Avro Serializer that Flink is using for serializing " +
-						"arbitrary objects");
-			}
-		}
 	}
 	
 	@Override
@@ -76,10 +66,7 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType
 
 	@Override
 	public TypeSerializer<T> createSerializer() {
-		// NOTE: The TypeExtractor / pojo logic is assuming that we are using a Avro Serializer here
-		// in particular classes implementing GenericContainer are handled as GenericTypeInfos 
-		// (this will probably not work with Kryo)
-		return new AvroSerializer<T>(this.typeClass);
+		return new KryoSerializer<T>(this.typeClass);
 	}
 
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a835e5df/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 3bceac5..e52e2af 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -30,7 +30,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.avro.generic.GenericContainer;
 import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.CrossFunction;
@@ -947,16 +946,16 @@ public class TypeExtractor {
 			// special case handling for Class, this should not be handled by the POJO logic
 			return new GenericTypeInfo<X>(clazz);
 		}
-		if(GenericContainer.class.isAssignableFrom(clazz)) {
-			// this is a type generated by Avro. GenericTypeInfo is able to handle this case because its using Avro.
-			return new GenericTypeInfo<X>(clazz);
-		}
+
 		try {
 			TypeInformation<X> pojoType = analyzePojo(clazz, new ArrayList<Type>(typeHierarchy), clazzTypeHint);
 			if (pojoType != null) {
 				return pojoType;
 			}
 		} catch (InvalidTypesException e) {
+			if(LOG.isDebugEnabled()) {
+				LOG.debug("Unable to handle type "+clazz+" as POJO. Message: "+e.getMessage(), e);
+			}
 			// ignore and create generic type info
 		}
 
@@ -1051,9 +1050,11 @@ public class TypeExtractor {
 				fieldTypeHierarchy.add(fieldType);
 				pojoFields.add(new PojoField(field, createTypeInfoWithTypeHierarchy(fieldTypeHierarchy, fieldType, null, null) ));
 			} catch (InvalidTypesException e) {
-				//pojoFields.add(new PojoField(field, new GenericTypeInfo( Object.class ))); // we need kryo to properly serialize this
-				throw new InvalidTypesException("Flink is currently unable to serialize this type: "+fieldType+""
-						+ "\nThe system is internally using the Avro serializer which is not able to handle that type.", e);
+				Class<?> genericClass = Object.class;
+				if(isClassType(fieldType)) {
+					genericClass = typeToClass(fieldType);
+				}
+				pojoFields.add(new PojoField(field, new GenericTypeInfo( genericClass )));
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a835e5df/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
index 7a98abf..f2c5848 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
@@ -19,20 +19,23 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 
+import com.twitter.chill.ScalaKryoInstantiator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 
 public class KryoSerializer<T> extends TypeSerializer<T> {
-	private static final long serialVersionUID = 1L;
+	private static final long serialVersionUID = 2L;
 
 	private final Class<T> type;
-	private final Class<? extends T> typeToInstantiate;
 
 	private transient Kryo kryo;
 	private transient T copyInstance;
@@ -44,21 +47,13 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 	private transient Output output;
 
 	public KryoSerializer(Class<T> type){
-		this(type,type);
-	}
-
-	public KryoSerializer(Class<T> type, Class<? extends T> typeToInstantiate){
-		if(type == null || typeToInstantiate == null){
+		if(type == null){
 			throw new NullPointerException("Type class cannot be null.");
 		}
-
 		this.type = type;
-		this.typeToInstantiate = typeToInstantiate;
-		kryo = new Kryo();
-		kryo.setAsmEnabled(true);
-		kryo.register(type);
 	}
 
+
 	@Override
 	public boolean isImmutableType() {
 		return false;
@@ -71,20 +66,36 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 
 	@Override
 	public T createInstance() {
-		checkKryoInitialized();
-		return kryo.newInstance(typeToInstantiate);
+		return null;
 	}
 
 	@Override
 	public T copy(T from) {
+		if(from == null) {
+			return null;
+		}
 		checkKryoInitialized();
-		return kryo.copy(from);
+		try {
+			return kryo.copy(from);
+		} catch(KryoException ke) {
+			// kryo was unable to copy it, so we do it through serialization:
+			ByteArrayOutputStream baout = new ByteArrayOutputStream();
+			Output output = new Output(baout);
+
+			kryo.writeObject(output, from);
+
+			output.flush();
+
+			ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray());
+			Input input = new Input(bain);
+
+			return (T)kryo.readObject(input, from.getClass());
+		}
 	}
 	
 	@Override
 	public T copy(T from, T reuse) {
-		checkKryoInitialized();
-		return kryo.copy(from);
+		return copy(from);
 	}
 
 	@Override
@@ -101,7 +112,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 			previousOut = target;
 		}
 		
-		kryo.writeObject(output, record);
+		kryo.writeClassAndObject(output, record);
 		output.flush();
 	}
 
@@ -113,7 +124,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 			input = new NoFetchingInput(inputStream);
 			previousIn = source;
 		}
-		return kryo.readObject(input, typeToInstantiate);
+		return (T) kryo.readClassAndObject(input);
 	}
 	
 	@Override
@@ -136,14 +147,14 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 	
 	@Override
 	public int hashCode() {
-		return type.hashCode() + 31 * typeToInstantiate.hashCode();
+		return type.hashCode();
 	}
 	
 	@Override
 	public boolean equals(Object obj) {
 		if (obj != null && obj instanceof KryoSerializer) {
 			KryoSerializer<?> other = (KryoSerializer<?>) obj;
-			return other.type == this.type && other.typeToInstantiate == this.typeToInstantiate;
+			return other.type == this.type;
 		} else {
 			return false;
 		}
@@ -153,9 +164,10 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 
 	private void checkKryoInitialized() {
 		if (this.kryo == null) {
-			this.kryo = new Kryo();
-			this.kryo.setAsmEnabled(true);
+			this.kryo = new ScalaKryoInstantiator().newKryo();
+			this.kryo.setRegistrationRequired(false);
 			this.kryo.register(type);
+			this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a835e5df/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
index e5ac1ca..893e63c 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
@@ -45,7 +45,7 @@ import com.google.common.collect.HashMultiset;
 
 /**
  *  Pojo Type tests
- *  
+ *
  *  A Pojo is a bean-style class with getters, setters and empty ctor
  *   OR a class with all fields public (or for every private field, there has to be a public getter/setter)
  *   everything else is a generic type (that can't be used for field selection)
@@ -55,12 +55,12 @@ public class PojoTypeExtractionTest {
 	public static class HasDuplicateField extends WC {
 		private int count; // duplicate
 	}
-	
+
 	@Test(expected=RuntimeException.class)
 	public void testDuplicateFieldException() {
 		TypeExtractor.createTypeInfo(HasDuplicateField.class);
 	}
-	
+
 	// test with correct pojo types
 	public static class WC { // is a pojo
 		public ComplexNestedClass complex; // is a pojo
@@ -84,6 +84,7 @@ public class PojoTypeExtractionTest {
 		public Tuple3<Long, Long, String> word; //Tuple Type with three basic types
 		public Object nothing; // generic type
 		public MyWritable hadoopCitizen;  // writableType
+		public List<String> collection;
 	}
 
 	// all public test
@@ -92,7 +93,7 @@ public class PojoTypeExtractionTest {
 		public HashMultiset<Integer> fancyIds; // generic type
 		public String[]	fancyArray;			 // generic type
 	}
-	
+
 	public static class ParentSettingGenerics extends PojoWithGenerics<Integer, Long> {
 		public String field3;
 	}
@@ -101,16 +102,16 @@ public class PojoTypeExtractionTest {
 		public T1 field1;
 		public T2 field2;
 	}
-	
+
 	public static class ComplexHierarchyTop extends ComplexHierarchy<Tuple1<String>> {}
 	public static class ComplexHierarchy<T> extends PojoWithGenerics<FromTuple,T> {}
-	
+
 	// extends from Tuple and adds a field
 	public static class FromTuple extends Tuple3<String, String, Long> {
 		private static final long serialVersionUID = 1L;
 		public int special;
 	}
-	
+
 	public static class IncorrectPojo {
 		private int isPrivate;
 		public int getIsPrivate() {
@@ -118,7 +119,7 @@ public class PojoTypeExtractionTest {
 		}
 		// setter is missing (intentional)
 	}
-	
+
 	// correct pojo
 	public static class BeanStylePojo {
 		public String abc;
@@ -136,7 +137,7 @@ public class PojoTypeExtractionTest {
 			this.a = a;
 		}
 	}
-	
+
 	// in this test, the location of the getters and setters is mixed across the type hierarchy.
 	public static class TypedPojoGetterSetterCheck extends GenericPojoGetterSetterCheck<String> {
 		public void setPackageProtected(String in) {
@@ -149,50 +150,64 @@ public class PojoTypeExtractionTest {
 			return packageProtected;
 		}
 	}
-	
+
 	@Test
 	public void testIncorrectPojos() {
 		TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(IncorrectPojo.class);
 		Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>);
-		
+
 		typeForClass = TypeExtractor.createTypeInfo(WrongCtorPojo.class);
 		Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>);
 	}
-	
+
 	@Test
 	public void testCorrectPojos() {
 		TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(BeanStylePojo.class);
 		Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
-		
+
 		typeForClass = TypeExtractor.createTypeInfo(TypedPojoGetterSetterCheck.class);
 		Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
 	}
-	
+
 	@Test
 	public void testPojoWC() {
 		TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(WC.class);
 		checkWCPojoAsserts(typeForClass);
-		
+
 		WC t = new WC();
 		t.complex = new ComplexNestedClass();
 		TypeInformation<?> typeForObject = TypeExtractor.getForObject(t);
 		checkWCPojoAsserts(typeForObject);
 	}
-	
+
 	private void checkWCPojoAsserts(TypeInformation<?> typeInfo) {
 		Assert.assertFalse(typeInfo.isBasicType());
 		Assert.assertFalse(typeInfo.isTupleType());
-		Assert.assertEquals(9, typeInfo.getTotalFields());
+		Assert.assertEquals(10, typeInfo.getTotalFields());
 		Assert.assertTrue(typeInfo instanceof PojoTypeInfo);
 		PojoTypeInfo<?> pojoType = (PojoTypeInfo<?>) typeInfo;
-		
+
 		List<FlatFieldDescriptor> ffd = new ArrayList<FlatFieldDescriptor>();
-		String[] fields = {"count","complex.date", "complex.hadoopCitizen", "complex.nothing",
-				"complex.someFloat", "complex.someNumber", "complex.word.f0",
-				"complex.word.f1", "complex.word.f2"};
-		int[] positions = {8,0,1,2,
-				3,4,5,
-				6,7};
+		String[] fields = {"count",
+				"complex.date",
+				"complex.hadoopCitizen",
+				"complex.collection",
+				"complex.nothing",
+				"complex.someFloat",
+				"complex.someNumber",
+				"complex.word.f0",
+				"complex.word.f1",
+				"complex.word.f2"};
+		int[] positions = {9,
+				1,
+				2,
+				0,
+				3,
+				4,
+				5,
+				6,
+				7,
+				8};
 		Assert.assertEquals(fields.length, positions.length);
 		for(int i = 0; i < fields.length; i++) {
 			pojoType.getKey(fields[i], 0, ffd);
@@ -200,86 +215,93 @@ public class PojoTypeExtractionTest {
 			Assert.assertEquals("position of field "+fields[i]+" wrong", positions[i], ffd.get(0).getPosition());
 			ffd.clear();
 		}
-		
+
 		pojoType.getKey("complex.word.*", 0, ffd);
 		Assert.assertEquals(3, ffd.size());
 		// check if it returns 5,6,7
 		for(FlatFieldDescriptor ffdE : ffd) {
 			final int pos = ffdE.getPosition();
-			Assert.assertTrue(pos <= 7 );
-			Assert.assertTrue(5 <= pos );
-			if(pos == 5) {
-				Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
-			}
+			Assert.assertTrue(pos <= 8 );
+			Assert.assertTrue(6 <= pos );
 			if(pos == 6) {
 				Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
 			}
 			if(pos == 7) {
+				Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
+			}
+			if(pos == 8) {
 				Assert.assertEquals(String.class, ffdE.getType().getTypeClass());
 			}
 		}
 		ffd.clear();
-		
+
 		// scala style full tuple selection for pojos
 		pojoType.getKey("complex.word._", 0, ffd);
 		Assert.assertEquals(3, ffd.size());
 		ffd.clear();
-		
+
 		pojoType.getKey("complex.*", 0, ffd);
-		Assert.assertEquals(8, ffd.size());
+		Assert.assertEquals(9, ffd.size());
 		// check if it returns 0-7
 		for(FlatFieldDescriptor ffdE : ffd) {
 			final int pos = ffdE.getPosition();
-			Assert.assertTrue(ffdE.getPosition() <= 7 );
+			Assert.assertTrue(ffdE.getPosition() <= 8 );
 			Assert.assertTrue(0 <= ffdE.getPosition() );
+
 			if(pos == 0) {
-				Assert.assertEquals(Date.class, ffdE.getType().getTypeClass());
+				Assert.assertEquals(List.class, ffdE.getType().getTypeClass());
 			}
 			if(pos == 1) {
-				Assert.assertEquals(MyWritable.class, ffdE.getType().getTypeClass());
+				Assert.assertEquals(Date.class, ffdE.getType().getTypeClass());
 			}
 			if(pos == 2) {
-				Assert.assertEquals(Object.class, ffdE.getType().getTypeClass());
+				Assert.assertEquals(MyWritable.class, ffdE.getType().getTypeClass());
 			}
 			if(pos == 3) {
-				Assert.assertEquals(Float.class, ffdE.getType().getTypeClass());
+				Assert.assertEquals(Object.class, ffdE.getType().getTypeClass());
 			}
 			if(pos == 4) {
-				Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass());
+				Assert.assertEquals(Float.class, ffdE.getType().getTypeClass());
 			}
 			if(pos == 5) {
-				Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
+				Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass());
 			}
 			if(pos == 6) {
 				Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
 			}
 			if(pos == 7) {
+				Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
+			}
+			if(pos == 8) {
 				Assert.assertEquals(String.class, ffdE.getType().getTypeClass());
 			}
+			if(pos == 9) {
+				Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass());
+			}
 		}
 		ffd.clear();
-		
+
 		pojoType.getKey("*", 0, ffd);
-		Assert.assertEquals(9, ffd.size());
+		Assert.assertEquals(10, ffd.size());
 		// check if it returns 0-8
 		for(FlatFieldDescriptor ffdE : ffd) {
-			Assert.assertTrue(ffdE.getPosition() <= 8 );
+			Assert.assertTrue(ffdE.getPosition() <= 9 );
 			Assert.assertTrue(0 <= ffdE.getPosition() );
-			if(ffdE.getPosition() == 8) {
+			if(ffdE.getPosition() == 9) {
 				Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass());
 			}
 		}
 		ffd.clear();
-		
+
 		TypeInformation<?> typeComplexNested = pojoType.getTypeAt(0); // ComplexNestedClass complex
 		Assert.assertTrue(typeComplexNested instanceof PojoTypeInfo);
-		
-		Assert.assertEquals(6, typeComplexNested.getArity());
-		Assert.assertEquals(8, typeComplexNested.getTotalFields());
+
+		Assert.assertEquals(7, typeComplexNested.getArity());
+		Assert.assertEquals(9, typeComplexNested.getTotalFields());
 		PojoTypeInfo<?> pojoTypeComplexNested = (PojoTypeInfo<?>) typeComplexNested;
-		
+
 		boolean dateSeen = false, intSeen = false, floatSeen = false,
-				tupleSeen = false, objectSeen = false, writableSeen = false;
+				tupleSeen = false, objectSeen = false, writableSeen = false, collectionSeen = false;
 		for(int i = 0; i < pojoTypeComplexNested.getArity(); i++) {
 			PojoField field = pojoTypeComplexNested.getPojoFieldAt(i);
 			String name = field.field.getName();
@@ -330,6 +352,13 @@ public class PojoTypeExtractionTest {
 				writableSeen = true;
 				Assert.assertEquals(new WritableTypeInfo<MyWritable>(MyWritable.class), field.type);
 				Assert.assertEquals(MyWritable.class, field.type.getTypeClass());
+			} else if(name.equals("collection")) {
+				if(collectionSeen) {
+					Assert.fail("already seen");
+				}
+				collectionSeen = true;
+				Assert.assertEquals(new GenericTypeInfo(List.class), field.type);
+
 			} else {
 				Assert.fail("field "+field+" is not expected");
 			}
@@ -340,29 +369,29 @@ public class PojoTypeExtractionTest {
 		Assert.assertTrue("Field was not present", tupleSeen);
 		Assert.assertTrue("Field was not present", objectSeen);
 		Assert.assertTrue("Field was not present", writableSeen);
-		
+		Assert.assertTrue("Field was not present", collectionSeen);
+
 		TypeInformation<?> typeAtOne = pojoType.getTypeAt(1); // int count
 		Assert.assertTrue(typeAtOne instanceof BasicTypeInfo);
-		
+
 		Assert.assertEquals(typeInfo.getTypeClass(), WC.class);
 		Assert.assertEquals(typeInfo.getArity(), 2);
 	}
 
 	// Kryo is required for this, so disable for now.
-	@Ignore
 	@Test
 	public void testPojoAllPublic() {
 		TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(AllPublic.class);
 		checkAllPublicAsserts(typeForClass);
-		
+
 		TypeInformation<?> typeForObject = TypeExtractor.getForObject(new AllPublic() );
 		checkAllPublicAsserts(typeForObject);
 	}
-	
+
 	private void checkAllPublicAsserts(TypeInformation<?> typeInformation) {
 		Assert.assertTrue(typeInformation instanceof PojoTypeInfo);
-		Assert.assertEquals(9, typeInformation.getArity());
-		Assert.assertEquals(11, typeInformation.getTotalFields());
+		Assert.assertEquals(10, typeInformation.getArity());
+		Assert.assertEquals(12, typeInformation.getTotalFields());
 		// check if the three additional fields are identified correctly
 		boolean arrayListSeen = false, multisetSeen = false, strArraySeen = false;
 		PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeInformation;
@@ -390,9 +419,9 @@ public class PojoTypeExtractionTest {
 				strArraySeen = true;
 				Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, field.type);
 				Assert.assertEquals(String[].class, field.type.getTypeClass());
-			} else if(Arrays.asList("date", "someNumber", "someFloat", "word", "nothing", "hadoopCitizen").contains(name)) {
+			} else if(Arrays.asList("date", "someNumber", "someFloat", "word", "nothing", "hadoopCitizen", "collection").contains(name)) {
 				// ignore these, they are inherited from the ComplexNestedClass
-			} 
+			}
 			else {
 				Assert.fail("field "+field+" is not expected");
 			}
@@ -401,18 +430,18 @@ public class PojoTypeExtractionTest {
 		Assert.assertTrue("Field was not present", multisetSeen);
 		Assert.assertTrue("Field was not present", strArraySeen);
 	}
-	
+
 	@Test
 	public void testPojoExtendingTuple() {
 		TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(FromTuple.class);
 		checkFromTuplePojo(typeForClass);
-		
+
 		FromTuple ft = new FromTuple();
 		ft.f0 = ""; ft.f1 = ""; ft.f2 = 0L;
 		TypeInformation<?> typeForObject = TypeExtractor.getForObject(ft);
 		checkFromTuplePojo(typeForObject);
 	}
-	
+
 	private void checkFromTuplePojo(TypeInformation<?> typeInformation) {
 		Assert.assertTrue(typeInformation instanceof PojoTypeInfo<?>);
 		Assert.assertEquals(4, typeInformation.getTotalFields());
@@ -431,7 +460,7 @@ public class PojoTypeExtractionTest {
 			}
 		}
 	}
-	
+
 	@Test
 	public void testPojoWithGenerics() {
 		TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(ParentSettingGenerics.class);
@@ -453,13 +482,12 @@ public class PojoTypeExtractionTest {
 			}
 		}
 	}
-	
+
 	/**
 	 * Test if the TypeExtractor is accepting untyped generics,
 	 * making them GenericTypes
 	 */
 	@Test
-	@Ignore // kryo needed.
 	public void testPojoWithGenericsSomeFieldsGeneric() {
 		TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(PojoWithGenerics.class);
 		Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
@@ -478,8 +506,8 @@ public class PojoTypeExtractionTest {
 			}
 		}
 	}
-	
-	
+
+
 	@Test
 	public void testPojoWithComplexHierarchy() {
 		TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(ComplexHierarchyTop.class);
@@ -554,10 +582,10 @@ public class PojoTypeExtractionTest {
 		public VertexTyped() {
 		}
 	}
-	
+
 	@Test
 	public void testGetterSetterWithVertex() {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<VertexTyped> set = env.fromElements(new VertexTyped(0L, 3.0), new VertexTyped(1L, 1.0));
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a835e5df/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
index cacc05b..d604105 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
@@ -143,7 +143,7 @@ abstract public class AbstractGenericTypeSerializerTest {
 		}
 	}
 
-	private final <T> void runTests(T... instances) {
+	protected final <T> void runTests(T... instances) {
 		if (instances == null || instances.length == 0) {
 			throw new IllegalArgumentException();
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a835e5df/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
index c6ef4db..37dba4e 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
@@ -25,4 +25,4 @@ public class KryoGenericTypeComparatorTest extends AbstractGenericTypeComparator
 	protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
 		return new KryoSerializer<T>(type);
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a835e5df/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
index f6fc987..3c22b15 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
@@ -18,11 +18,53 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
+
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
 
 public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest {
+
+	@Test
+	public void testJavaList(){
+		Collection<Integer> a = new ArrayList<Integer>();
+
+		fillCollection(a);
+
+		runTests(a);
+	}
+
+	@Test
+	public void testJavaSet(){
+		Collection<Integer> b = new HashSet<Integer>();
+
+		fillCollection(b);
+
+		runTests(b);
+	}
+
+	@Test
+	public void testJavaDequeue(){
+		Collection<Integer> c = new LinkedList<Integer>();
+
+		fillCollection(c);
+
+		runTests(c);
+	}
+
+	private void fillCollection(Collection<Integer> coll){
+		coll.add(42);
+		coll.add(1337);
+		coll.add(49);
+		coll.add(1);
+	}
+
 	@Override
 	protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
 		return new KryoSerializer<T>(type);
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a835e5df/flink-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index ae16876..edeb7f2 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -95,6 +95,14 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a835e5df/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
new file mode 100644
index 0000000..ddbe322
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.runtime
+
+import org.apache.flink.api.common.typeutils.SerializerTestInstance
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
+import org.junit.Test
+
+import scala.reflect._
+
+class KryoGenericTypeSerializerTest {
+
+  @Test
+  def testScalaListSerialization: Unit = {
+    val a = List(42,1,49,1337)
+
+    runTests(a)
+  }
+
+  @Test
+  def testScalaMutablelistSerialization: Unit = {
+    val a = scala.collection.mutable.ListBuffer(42,1,49,1337)
+
+    runTests(a)
+  }
+
+  @Test
+  def testScalaMapSerialization: Unit = {
+    val a = Map(("1" -> 1), ("2" -> 2), ("42" -> 42), ("1337" -> 1337))
+
+    runTests(a)
+  }
+
+  @Test
+  def testMutableMapSerialization: Unit ={
+    val a = scala.collection.mutable.Map((1 -> "1"), (2 -> "2"), (3 -> "3"))
+
+    runTests(a)
+  }
+
+  @Test
+  def testScalaListComplexTypeSerialization: Unit = {
+    val a = ComplexType("1234", 42, List(1,2,3,4))
+    val b = ComplexType("4321", 24, List(4,3,2,1))
+    val c = ComplexType("1337", 1, List(1))
+    val list = List(a, b, c)
+
+    runTests(list)
+  }
+
+  @Test
+  def testHeterogenousScalaList: Unit = {
+    val a = new DerivedType("foo", "bar")
+    val b = new BaseType("foobar")
+    val c = new DerivedType2("bar", "foo")
+    val list = List(a,b,c)
+
+    runTests(list)
+  }
+
+  case class ComplexType(id: String, number: Int, values: List[Int]){
+    override def equals(obj: Any): Boolean ={
+      if(obj != null && obj.isInstanceOf[ComplexType]){
+        val complexType = obj.asInstanceOf[ComplexType]
+        id.equals(complexType.id) && number.equals(complexType.number) && values.equals(
+          complexType.values)
+      }else{
+        false
+      }
+    }
+  }
+
+  class BaseType(val name: String){
+    override def equals(obj: Any): Boolean = {
+      if(obj != null && obj.isInstanceOf[BaseType]){
+        obj.asInstanceOf[BaseType].name.equals(name)
+      }else{
+        false
+      }
+    }
+  }
+
+  class DerivedType(name: String, val sub: String) extends BaseType(name){
+    override def equals(obj: Any): Boolean = {
+      if(obj != null && obj.isInstanceOf[DerivedType]){
+        super.equals(obj) && obj.asInstanceOf[DerivedType].sub.equals(sub)
+      }else{
+        false
+      }
+    }
+  }
+
+  class DerivedType2(name: String, val sub: String) extends BaseType(name){
+    override def equals(obj: Any): Boolean = {
+      if(obj != null && obj.isInstanceOf[DerivedType2]){
+        super.equals(obj) && obj.asInstanceOf[DerivedType2].sub.equals(sub)
+      }else{
+        false
+      }
+    }
+  }
+
+  def runTests[T : ClassTag](objects: T *): Unit ={
+    val clsTag = classTag[T]
+    val typeInfo = new GenericTypeInfo[T](clsTag.runtimeClass.asInstanceOf[Class[T]])
+    val serializer = typeInfo.createSerializer()
+    val typeClass = typeInfo.getTypeClass
+
+    val instance = new SerializerTestInstance[T](serializer, typeClass, -1, objects: _*)
+
+    instance.testAll()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a835e5df/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index 60a0d89..8994ba9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -53,7 +53,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 @RunWith(Parameterized.class)
 public class GroupReduceITCase extends JavaProgramTestBase {
 	
-	private static int NUM_PROGRAMS = 26;
+	private static int NUM_PROGRAMS = 28;
 	
 	private int curProgId = config.getInteger("ProgramId", -1);
 	private String resultPath;
@@ -763,7 +763,74 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 					// return expected result
 					return "b\nccc\nee\n";
 				}
-				
+
+				case 27: {
+					/*
+					 * Test Java collections within pojos ( == test kryo)
+					 */
+					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+					env.setDegreeOfParallelism(1);
+
+					DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithCollection(env);
+					// f0.f0 is first integer
+					DataSet<String> reduceDs = ds.groupBy("key")
+							.reduceGroup(new GroupReduceFunction<CollectionDataSets.PojoWithCollection, String>() {
+								@Override
+								public void reduce(
+										Iterable<CollectionDataSets.PojoWithCollection> values,
+										Collector<String> out) throws Exception {
+									StringBuilder concat = new StringBuilder();
+									concat.append("call");
+									for(CollectionDataSets.PojoWithCollection value : values) {
+										concat.append("For key "+value.key+" we got: ");
+										for(CollectionDataSets.Pojo1 p :value.pojos) {
+											concat.append("pojo.a="+p.a);
+										}
+									}
+									out.collect(concat.toString());
+								}
+							});
+					reduceDs.writeAsText(resultPath);
+					env.execute();
+
+					// return expected result
+					return "callFor key 0 we got: pojo.a=apojo.a=bFor key 0 we got: pojo.a=a2pojo.a=b2\n";
+				}
+
+				case 28: {
+					/*
+					 * Group by generic type
+					 */
+					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+					env.setDegreeOfParallelism(1);
+
+					DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithCollection(env);
+					// f0.f0 is first integer
+					DataSet<String> reduceDs = ds.groupBy("bigInt")
+							.reduceGroup(new GroupReduceFunction<CollectionDataSets.PojoWithCollection, String>() {
+								@Override
+								public void reduce(
+										Iterable<CollectionDataSets.PojoWithCollection> values,
+										Collector<String> out) throws Exception {
+									StringBuilder concat = new StringBuilder();
+									concat.append("call");
+									for(CollectionDataSets.PojoWithCollection value : values) {
+										concat.append("\nFor key "+value.bigInt+" we got:\n"+value);
+									}
+									out.collect(concat.toString());
+								}
+							});
+					reduceDs.writeAsText(resultPath);
+					env.execute();
+
+					// return expected result
+					return "call\n" +
+							"For key 92233720368547758070 we got:\n" +
+							"PojoWithCollection{pojos.size()=2, key=0, sqlDate=2033-05-18, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=10, mixed=[{someKey=1}, /this/is/wrong, uhlala]}\n" +
+							"For key 92233720368547758070 we got:\n" +
+							"PojoWithCollection{pojos.size()=2, key=0, sqlDate=1976-05-03, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=31104000, mixed=null}\n";
+				}
+
 				default: {
 					throw new IllegalArgumentException("Invalid program id");
 				}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a835e5df/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
index 1f812d9..895e996 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
@@ -18,11 +18,17 @@
 
 package org.apache.flink.test.javaApiOperators.util;
 
+import java.io.File;
 import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.Hashtable;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -33,6 +39,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.hadoop.io.IntWritable;
+import scala.math.BigInt;
 
 /**
  * #######################################################################################################
@@ -496,6 +503,13 @@ public class CollectionDataSets {
 	public static class Pojo1 {
 		public String a;
 		public String b;
+
+		public Pojo1() {}
+
+		public Pojo1(String a, String b) {
+			this.a = a;
+			this.b = b;
+		}
 	}
 
 	public static class Pojo2 {
@@ -561,5 +575,68 @@ public class CollectionDataSets {
 		return env.fromCollection(data);
 	}
 
+	public static class PojoWithCollection {
+		public List<Pojo1> pojos;
+		public int key;
+		public java.sql.Date sqlDate;
+		public BigInteger bigInt;
+		public BigDecimal bigDecimalKeepItNull;
+		public BigInt scalaBigInt;
+		public List<Object> mixed;
+
+		@Override
+		public String toString() {
+			return "PojoWithCollection{" +
+					"pojos.size()=" + pojos.size() +
+					", key=" + key +
+					", sqlDate=" + sqlDate +
+					", bigInt=" + bigInt +
+					", bigDecimalKeepItNull=" + bigDecimalKeepItNull +
+					", scalaBigInt=" + scalaBigInt +
+					", mixed=" + mixed +
+					'}';
+		}
+	}
+
+	public static DataSet<PojoWithCollection> getPojoWithCollection(ExecutionEnvironment env) {
+		List<PojoWithCollection> data = new ArrayList<PojoWithCollection>();
+
+		List<Pojo1> pojosList1 = new ArrayList<Pojo1>();
+		pojosList1.add(new Pojo1("a", "aa"));
+		pojosList1.add(new Pojo1("b", "bb"));
+
+		List<Pojo1> pojosList2 = new ArrayList<Pojo1>();
+		pojosList2.add(new Pojo1("a2", "aa2"));
+		pojosList2.add(new Pojo1("b2", "bb2"));
+
+		PojoWithCollection pwc1 = new PojoWithCollection();
+		pwc1.pojos = pojosList1;
+		pwc1.key = 0;
+		pwc1.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
+		pwc1.scalaBigInt = BigInt.int2bigInt(10);
+		pwc1.bigDecimalKeepItNull = null;
+		pwc1.sqlDate = new java.sql.Date(2000000000000L); // 2033 ;)
+		pwc1.mixed = new ArrayList<Object>();
+		Map<String, Integer> map = new HashMap<String, Integer>();
+		map.put("someKey", 1); // map.put("anotherKey", 2); map.put("third", 3);
+		pwc1.mixed.add(map);
+		pwc1.mixed.add(new File("/this/is/wrong"));
+		pwc1.mixed.add("uhlala");
+
+		PojoWithCollection pwc2 = new PojoWithCollection();
+		pwc2.pojos = pojosList2;
+		pwc2.key = 0;
+		pwc2.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
+		pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
+		pwc2.bigDecimalKeepItNull = null;
+		pwc2.sqlDate = new java.sql.Date(200000000000L); // 1976
+
+
+		data.add(pwc1);
+		data.add(pwc2);
+
+		return env.fromCollection(data);
+	}
+
 }
 


[4/7] incubator-flink git commit: Fix invalid type hierarchy creation by Pojo logic

Posted by mb...@apache.org.
Fix invalid type hierarchy creation by Pojo logic


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1ac9651a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1ac9651a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1ac9651a

Branch: refs/heads/release-0.8
Commit: 1ac9651abff2760a2a66c6cce0e3aa2e1bf5d1dd
Parents: 02bad15
Author: twalthr <in...@twalthr.com>
Authored: Wed Dec 10 22:02:07 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Dec 17 21:41:09 2014 +0100

----------------------------------------------------------------------
 .../flink/api/java/typeutils/TypeExtractor.java | 87 ++++++++++----------
 1 file changed, 43 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1ac9651a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 67b2a51..3bceac5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -232,29 +232,6 @@ public class TypeExtractor {
 		// get info from hierarchy
 		return (TypeInformation<OUT>) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
 	}
-
-
-	/**
-	 * @param curT : start type
-	 * @return Type The immediate child of the top class
-	 */
-	private Type recursivelyGetTypeHierarchy(ArrayList<Type> typeHierarchy, Type curT, Class<?> stopAtClass) {
-		while (!(curT instanceof ParameterizedType && ((Class<?>) ((ParameterizedType) curT).getRawType()).equals(
-				stopAtClass))
-				&& !(curT instanceof Class<?> && ((Class<?>) curT).equals(stopAtClass))) {
-			typeHierarchy.add(curT);
-			
-			// parameterized type
-			if (curT instanceof ParameterizedType) {
-				curT = ((Class<?>) ((ParameterizedType) curT).getRawType()).getGenericSuperclass();
-			}
-			// class
-			else {
-				curT = ((Class<?>) curT).getGenericSuperclass();
-			}
-		}
-		return curT;
-	}
 	
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	private <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfoWithTypeHierarchy(ArrayList<Type> typeHierarchy, Type t,
@@ -330,7 +307,7 @@ public class TypeExtractor {
 			int fieldCount = countFieldsInClass(tAsClass);
 			if(fieldCount != tupleSubTypes.length) {
 				// the class is not a real tuple because it contains additional fields. treat as a pojo
-				return (TypeInformation<OUT>) analyzePojo(tAsClass, new ArrayList<Type>(), null); // the typeHierarchy here should be sufficient, even though it stops at the Tuple.class.
+				return (TypeInformation<OUT>) analyzePojo(tAsClass, new ArrayList<Type>(typeHierarchy), null); // the typeHierarchy here should be sufficient, even though it stops at the Tuple.class.
 			}
 			
 			return new TupleTypeInfo(tAsClass, tupleSubTypes);
@@ -396,23 +373,11 @@ public class TypeExtractor {
 		}
 		// no tuple, no TypeVariable, no generic type
 		else if (t instanceof Class) {
-			return privateGetForClass((Class<OUT>) t, new ArrayList<Type>());
+			return privateGetForClass((Class<OUT>) t, typeHierarchy);
 		}
 		
 		throw new InvalidTypesException("Type Information could not be created.");
 	}
-	
-	private int countFieldsInClass(Class<?> clazz) {
-		int fieldCount = 0;
-		for(Field field : clazz.getFields()) { // get all fields
-			if(	!Modifier.isStatic(field.getModifiers()) &&
-				!Modifier.isTransient(field.getModifiers())
-				) {
-				fieldCount++;
-			}
-		}
-		return fieldCount;
-	}
 
 	private <IN1, IN2> TypeInformation<?> createTypeInfoFromInputs(TypeVariable<?> returnTypeVar, ArrayList<Type> returnTypeHierarchy, 
 			TypeInformation<IN1> in1TypeInfo, TypeInformation<IN2> in2TypeInfo) {
@@ -427,6 +392,11 @@ public class TypeExtractor {
 			returnTypeVar = (TypeVariable<?>) matReturnTypeVar;
 		}
 		
+		// no input information exists
+		if (in1TypeInfo == null && in2TypeInfo == null) {
+			return null;
+		}
+		
 		// create a new type hierarchy for the input
 		ArrayList<Type> inputTypeHierarchy = new ArrayList<Type>();
 		// copy the function part of the type hierarchy
@@ -753,6 +723,34 @@ public class TypeExtractor {
 	//  Utility methods
 	// --------------------------------------------------------------------------------------------
 	
+	/**
+	 * @param curT : start type
+	 * @return Type The immediate child of the top class
+	 */
+	private Type getTypeHierarchy(ArrayList<Type> typeHierarchy, Type curT, Class<?> stopAtClass) {
+		// skip first one
+		if (typeHierarchy.size() > 0 && typeHierarchy.get(0) == curT && isClassType(curT)) {
+			curT = typeToClass(curT).getGenericSuperclass();
+		}
+		while (!(isClassType(curT) && typeToClass(curT).equals(stopAtClass))) {
+			typeHierarchy.add(curT);
+			curT = typeToClass(curT).getGenericSuperclass();
+		}
+		return curT;
+	}
+	
+	private int countFieldsInClass(Class<?> clazz) {
+		int fieldCount = 0;
+		for(Field field : clazz.getFields()) { // get all fields
+			if(	!Modifier.isStatic(field.getModifiers()) &&
+				!Modifier.isTransient(field.getModifiers())
+				) {
+				fieldCount++;
+			}
+		}
+		return fieldCount;
+	}
+	
 	private static Type removeGenericWrapper(Type t) {
 		if(t instanceof ParameterizedType 	&& 
 				(Collector.class.isAssignableFrom(typeToClass(t))
@@ -954,7 +952,7 @@ public class TypeExtractor {
 			return new GenericTypeInfo<X>(clazz);
 		}
 		try {
-			TypeInformation<X> pojoType = analyzePojo(clazz, typeHierarchy, clazzTypeHint);
+			TypeInformation<X> pojoType = analyzePojo(clazz, new ArrayList<Type>(typeHierarchy), clazzTypeHint);
 			if (pojoType != null) {
 				return pojoType;
 			}
@@ -1032,12 +1030,12 @@ public class TypeExtractor {
 	}
 
 	private <X> TypeInformation<X> analyzePojo(Class<X> clazz, ArrayList<Type> typeHierarchy, ParameterizedType clazzTypeHint) {
-		// try to create Type hierarchy, if the incoming one is empty.
-		if(typeHierarchy.size() == 0) {
-			recursivelyGetTypeHierarchy(typeHierarchy, clazz, Object.class);
+		// try to create Type hierarchy, if the incoming only contains the most bottom one or none.
+		if(typeHierarchy.size() <= 1) {
+			getTypeHierarchy(typeHierarchy, clazz, Object.class);
 		}
 		if(clazzTypeHint != null) {
-			recursivelyGetTypeHierarchy(typeHierarchy, clazzTypeHint, Object.class);
+			getTypeHierarchy(typeHierarchy, clazzTypeHint, Object.class);
 		}
 		
 		List<Field> fields = getAllDeclaredFields(clazz);
@@ -1049,8 +1047,9 @@ public class TypeExtractor {
 				return null;
 			}
 			try {
-				typeHierarchy.add(fieldType);
-				pojoFields.add(new PojoField(field, createTypeInfoWithTypeHierarchy(typeHierarchy, fieldType, null, null) ));
+				ArrayList<Type> fieldTypeHierarchy = new ArrayList<Type>(typeHierarchy);
+				fieldTypeHierarchy.add(fieldType);
+				pojoFields.add(new PojoField(field, createTypeInfoWithTypeHierarchy(fieldTypeHierarchy, fieldType, null, null) ));
 			} catch (InvalidTypesException e) {
 				//pojoFields.add(new PojoField(field, new GenericTypeInfo( Object.class ))); // we need kryo to properly serialize this
 				throw new InvalidTypesException("Flink is currently unable to serialize this type: "+fieldType+""


[3/7] incubator-flink git commit: [streaming] DataStream print functionality update

Posted by mb...@apache.org.
[streaming] DataStream print functionality update

PrintSinkFunction now explicitly states threads in output
Added printToErr functionality


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a33ad5d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a33ad5d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a33ad5d8

Branch: refs/heads/release-0.8
Commit: a33ad5d8303295e57f0e6a8df3c071f010963029
Parents: cb607df
Author: mbalassi <mb...@apache.org>
Authored: Tue Dec 16 12:48:09 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Dec 17 21:41:08 2014 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |  17 ++-
 .../api/function/sink/PrintSinkFunction.java    | 128 ++++++++++++++-----
 .../streamvertex/StreamingRuntimeContext.java   |   2 +-
 3 files changed, 112 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a33ad5d8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 978f5fa..3fc685a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -780,7 +780,7 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Writes a DataStream to the standard output stream (stdout). For each
+	 * Writes a DataStream to the standard output stream (stdout).<br> For each
 	 * element of the DataStream the result of {@link Object#toString()} is
 	 * written.
 	 * 
@@ -793,6 +793,21 @@ public class DataStream<OUT> {
 
 		return returnStream;
 	}
+	
+	/**
+	 * Writes a DataStream to the standard output stream (stderr).<br> For each
+	 * element of the DataStream the result of {@link Object#toString()} is
+	 * written.
+	 * 
+	 * @return The closed DataStream.
+	 */
+	public DataStreamSink<OUT> printToErr() {
+		DataStream<OUT> inputStream = this.copy();
+		PrintSinkFunction<OUT> printFunction = new PrintSinkFunction<OUT>(true);
+		DataStreamSink<OUT> returnStream = addSink(inputStream, printFunction, getType());
+
+		return returnStream;
+	}
 
 	/**
 	 * Writes a DataStream to the file specified by path in text format. For

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a33ad5d8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
index fc75da7..d460749 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
@@ -1,36 +1,98 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-
-/**
- * Dummy implementation of the SinkFunction writing every tuple to the standard
- * output. Used for print.
- * 
- * @param <IN>
- *            Input tuple type
- */
-public class PrintSinkFunction<IN> implements SinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void invoke(IN tuple) {
-		System.out.println(tuple);
-	}
-
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import java.io.PrintStream;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
+
+/**
+ * Implementation of the SinkFunction writing every tuple to the standard
+ * output or standard error stream.
+ * 
+ * @param <IN>
+ *            Input record type
+ */
+public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+	private static final boolean STD_OUT = false;
+	private static final boolean STD_ERR = true;
+	
+	private boolean target; 
+	private transient PrintStream stream;
+	private transient String prefix;
+	
+	/**
+	 * Instantiates a print sink function that prints to standard out.
+	 */
+	public PrintSinkFunction() {}
+	
+	/**
+	 * Instantiates a print sink function that prints to standard out.
+	 * 
+	 * @param stdErr True, if the format should print to standard error instead of standard out.
+	 */
+	public PrintSinkFunction(boolean stdErr) {
+		target = stdErr;
+	}
+
+	public void setTargetToStandardOut() {
+		target = STD_OUT;
+	}
+	
+	public void setTargetToStandardErr() {
+		target = STD_ERR;
+	}
+	
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
+		// get the target stream
+		stream = target == STD_OUT ? System.out : System.err;
+		
+		// set the prefix if we have a >1 DOP
+		prefix = (context.getNumberOfParallelSubtasks() > 1) ? 
+				((context.getIndexOfThisSubtask() + 1) + "> ") : null;
+	}
+
+	@Override
+	public void invoke(IN record) {
+		if (prefix != null) {
+			stream.println(prefix + record.toString());
+		}
+		else {
+			stream.println(record.toString());
+		}
+	}
+	
+	@Override
+	public void close() throws Exception {
+		this.stream = null;
+		this.prefix = null;
+		super.close();
+	}
+	
+	@Override
+	public String toString() {
+		return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a33ad5d8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
index 49cf15f..798724e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
@@ -34,7 +34,7 @@ import org.apache.flink.streaming.state.OperatorState;
  */
 public class StreamingRuntimeContext extends RuntimeUDFContext {
 
-	private Environment env;
+	public Environment env;
 	private final Map<String, OperatorState<?>> operatorStates;
 
 	public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,


[2/7] incubator-flink git commit: [streaming] [examples] Refactor and packaging for windowing examples

Posted by mb...@apache.org.
[streaming] [examples] Refactor and packaging for windowing examples

The current examples show-case the API, more meaningful examples are coming for the 0.9 release.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e34aca75
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e34aca75
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e34aca75

Branch: refs/heads/release-0.8
Commit: e34aca7545f4900725b470ff1ab2db4b48c2275f
Parents: a33ad5d
Author: mbalassi <mb...@apache.org>
Authored: Tue Dec 16 21:00:31 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Dec 17 21:41:08 2014 +0100

----------------------------------------------------------------------
 .../flink-streaming-examples/pom.xml            |  96 ++++++++++-
 .../examples/iteration/IterateExample.java      |  12 +-
 .../streaming/examples/join/WindowJoin.java     | 168 +++++++++++++++++++
 .../examples/window/join/WindowJoin.java        | 165 ------------------
 .../examples/windowing/DeltaExtractExample.java |  77 +++++++--
 .../windowing/MultiplePoliciesExample.java      | 104 ++++++++----
 .../examples/windowing/SlidingExample.java      |  99 +++++++----
 .../windowing/TimeWindowingExample.java         |  98 ++++++++---
 8 files changed, 547 insertions(+), 272 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e34aca75/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
index 1369828..d2d2b93 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
@@ -172,12 +172,12 @@ under the License.
 
 							<archive>
 								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.window.join.WindowJoin</program-class>
+									<program-class>org.apache.flink.streaming.examples.join.WindowJoin</program-class>
 								</manifestEntries>
 							</archive>
 
 							<includes>
-								<include>org/apache/flink/streaming/examples/window/join/*.class</include>			
+								<include>org/apache/flink/streaming/examples/join/*.class</include>			
 							</includes>
 						</configuration>
 					</execution>
@@ -252,6 +252,98 @@ under the License.
 							</includes>
 						</configuration>
 					</execution>
+
+					<!-- DeltaExract -->
+					<execution>
+						<id>DeltaExract</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>DeltaExract</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.windowing.DeltaExtractExample</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/windowing/DeltaExtractExample.class</include>
+								<include>org/apache/flink/streaming/examples/windowing/DeltaExtractExample$*.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- MultiplePolicies -->
+					<execution>
+						<id>MultiplePolicies</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>MultiplePolicies</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.windowing.MultiplePoliciesExample</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.class</include>
+								<include>org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample$*.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- SlidingExample -->
+					<execution>
+						<id>SlidingExample</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>SlidingExample</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.windowing.SlidingExample</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/windowing/SlidingExample.class</include>
+								<include>org/apache/flink/streaming/examples/windowing/SlidingExample$*.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- TimeWindowing -->
+					<execution>
+						<id>TimeWindowing</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>TimeWindowing</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.windowing.TimeWindowingExample</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/windowing/TimeWindowingExample.class</include>
+								<include>org/apache/flink/streaming/examples/windowing/TimeWindowingExample$*.class</include>
+							</includes>
+						</configuration>
+					</execution>
 				</executions>
 			</plugin>
 		</plugins>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e34aca75/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index 54dbdb0..8fb42d6 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -21,9 +21,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeDataStream;
@@ -109,12 +110,12 @@ public class IterateExample {
 	 * Iteration step function which takes an input (Double , Integer) and
 	 * produces an output (Double + random, Integer + 1).
 	 */
-	public static class Step implements
-			MapFunction<Tuple2<Double, Integer>, Tuple2<Double, Integer>> {
+	public static class Step extends
+			RichMapFunction<Tuple2<Double, Integer>, Tuple2<Double, Integer>> {
 		private static final long serialVersionUID = 1L;
-		private Random rnd;
+		private transient Random rnd;
 
-		public Step() {
+		public void open(Configuration parameters) {
 			rnd = new Random();
 		}
 
@@ -122,7 +123,6 @@ public class IterateExample {
 		public Tuple2<Double, Integer> map(Tuple2<Double, Integer> value) throws Exception {
 			return new Tuple2<Double, Integer>(value.f0 + rnd.nextDouble(), value.f1 + 1);
 		}
-
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e34aca75/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
new file mode 100644
index 0000000..93df823
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.join;
+
+import java.util.Random;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.source.RichSourceFunction;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * Example illustrating join over sliding windows of streams in Flink.
+ * 
+ * <p>
+ * his example will join two streams with a sliding window. One which emits
+ * grades and one which emits salaries of people.
+ * </p>
+ *
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>do windowed joins,
+ * <li>use tuple data types,
+ * <li>write a simple streaming program.
+ */
+public class WindowJoin {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// obtain execution environment
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// connect to the data sources for grades and salaries
+		DataStream<Tuple2<String, Integer>> grades = env.addSource(new GradeSource());
+		DataStream<Tuple2<String, Integer>> salaries = env.addSource(new SalarySource());
+
+		// apply a temporal join over the two stream based on the names over one
+		// second windows
+		DataStream<Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>> joinedStream = grades
+				.join(salaries)
+				.onWindow(1000)
+				.where(0)
+				.equalTo(0);
+
+		// emit result
+		if (fileOutput) {
+			joinedStream.writeAsText(outputPath, 1);
+		} else {
+			joinedStream.print();
+		}
+
+		// execute program
+		env.execute("Windowed Join Example");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	private final static String[] names = { "tom", "jerry", "alice", "bob", "john", "grace" };
+	private final static int GRADE_COUNT = 5;
+	private final static int SALARY_MAX = 10000;
+	private final static int SLEEP_TIME = 10;
+
+	/**
+	 * Continuously emit tuples with random names and integers (grades).
+	 */
+	public static class GradeSource implements SourceFunction<Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		private Random rand;
+		private Tuple2<String, Integer> outTuple;
+
+		public GradeSource() {
+			rand = new Random();
+			outTuple = new Tuple2<String, Integer>();
+		}
+
+		@Override
+		public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
+			while (true) {
+				outTuple.f0 = names[rand.nextInt(names.length)];
+				outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1;
+				out.collect(outTuple);
+				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
+			}
+		}
+	}
+
+	/**
+	 * Continuously emit tuples with random names and integers (salaries).
+	 */
+	public static class SalarySource extends RichSourceFunction<Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		private transient Random rand;
+		private transient Tuple2<String, Integer> outTuple;
+
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			rand = new Random();
+			outTuple = new Tuple2<String, Integer>();
+		}
+
+		@Override
+		public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
+			while (true) {
+				outTuple.f0 = names[rand.nextInt(names.length)];
+				outTuple.f1 = rand.nextInt(SALARY_MAX) + 1;
+				out.collect(outTuple);
+				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
+			}
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 1) {
+				outputPath = args[0];
+			} else {
+				System.err.println("Usage: WindowJoin <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing WindowJoin with generated data.");
+			System.out.println("  Provide parameter to write to file.");
+			System.out.println("  Usage: WindowJoin <result path>");
+		}
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e34aca75/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoin.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoin.java
deleted file mode 100644
index d5f921e..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoin.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.window.join;
-
-import java.util.Random;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-
-/**
- * Example illustrating join over sliding windows of streams in Flink.
- * 
- * <p>
- * his example will join two streams with a sliding window. One which emits
- * grades and one which emits salaries of people.
- * </p>
- *
- * <p>
- * This example shows how to:
- * <ul>
- * <li>do windowed joins,
- * <li>use tuple data types,
- * <li>write a simple streaming program.
- */
-public class WindowJoin {
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		// obtain execution environment
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// connect to the data sources for grades and salaries
-		DataStream<Tuple2<String, Integer>> grades = env.addSource(new GradeSource());
-		DataStream<Tuple2<String, Integer>> salaries = env.addSource(new SalarySource());
-
-		// apply a temporal join over the two stream based on the names over one
-		// second windows
-		DataStream<Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>> joinedStream = grades
-				.join(salaries)
-				.onWindow(1000)
-				.where(0)
-				.equalTo(0);
-
-		// emit result
-		if (fileOutput) {
-			joinedStream.writeAsText(outputPath, 1);
-		} else {
-			joinedStream.print();
-		}
-
-		// execute program
-		env.execute("Windowed Join Example");
-	}
-
-	// *************************************************************************
-	// USER FUNCTIONS
-	// *************************************************************************
-
-	private final static String[] names = { "tom", "jerry", "alice", "bob", "john", "grace" };
-	private final static int GRADE_COUNT = 5;
-	private final static int SALARY_MAX = 10000;
-	private final static int SLEEP_TIME = 10;
-
-	/**
-	 * Continuously emit tuples with random names and integers (grades).
-	 */
-	public static class GradeSource implements SourceFunction<Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		private Random rand;
-		private Tuple2<String, Integer> outTuple;
-
-		public GradeSource() {
-			rand = new Random();
-			outTuple = new Tuple2<String, Integer>();
-		}
-
-		@Override
-		public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
-			while (true) {
-				outTuple.f0 = names[rand.nextInt(names.length)];
-				outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1;
-				out.collect(outTuple);
-				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
-			}
-		}
-	}
-
-	/**
-	 * Continuously emit tuples with random names and integers (salaries).
-	 */
-	public static class SalarySource implements SourceFunction<Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		private Random rand;
-		private Tuple2<String, Integer> outTuple;
-
-		public SalarySource() {
-			rand = new Random();
-			outTuple = new Tuple2<String, Integer>();
-		}
-
-		@Override
-		public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
-			while (true) {
-				outTuple.f0 = names[rand.nextInt(names.length)];
-				outTuple.f1 = rand.nextInt(SALARY_MAX) + 1;
-				out.collect(outTuple);
-				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
-			}
-		}
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 1) {
-				outputPath = args[0];
-			} else {
-				System.err.println("Usage: WindowJoin <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing WindowJoin with generated data.");
-			System.out.println("  Provide parameter to write to file.");
-			System.out.println("  Usage: WindowJoin <result path>");
-		}
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e34aca75/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
index 0622dbf..1013e6f 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
@@ -34,32 +34,44 @@ import org.apache.flink.util.Collector;
  */
 public class DeltaExtractExample {
 
-	private static final int PARALLELISM = 1;
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
 
-	@SuppressWarnings({ "serial", "rawtypes", "unchecked" })
 	public static void main(String[] args) throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(PARALLELISM);
 
-		ReduceFunction<Tuple3<Double, Double, String>> concatStrings = new ReduceFunction<Tuple3<Double, Double, String>>() {
-			@Override
-			public Tuple3 reduce(Tuple3 value1, Tuple3 value2) throws Exception {
-				return new Tuple3(value1.f0, value2.f1, value1.f2 + "|" + value2.f2);
-			}
-		};
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
+		@SuppressWarnings({ "unchecked", "rawtypes" })
 		DataStream dstream = env
 				.addSource(new CountingSource())
 				.window(Delta.of(new EuclideanDistance(new FieldsFromTuple(0, 1)), new Tuple3(0d,
-						0d, "foo"), 1.2)).every(Count.of(2)).reduce(concatStrings);
+						0d, "foo"), 1.2))
+				.every(Count.of(2))
+				.reduce(new ConcatStrings());
+
+		// emit result
+		if (fileOutput) {
+			dstream.writeAsText(outputPath, 1);
+		} else {
+			dstream.print();
+		}
 
-		dstream.print();
-		env.execute();
+		// execute the program
+		env.execute("Delta Extract Example");
 
 	}
 
-	@SuppressWarnings("serial")
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
 	private static class CountingSource implements SourceFunction<Tuple3<Double, Double, String>> {
+		private static final long serialVersionUID = 1L;
 
 		private int counter = 0;
 
@@ -75,4 +87,41 @@ public class DeltaExtractExample {
 		}
 	}
 
+	private static final class ConcatStrings implements
+			ReduceFunction<Tuple3<Double, Double, String>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple3<Double, Double, String> reduce(Tuple3<Double, Double, String> value1,
+				Tuple3<Double, Double, String> value2) throws Exception {
+			return new Tuple3<Double, Double, String>(value1.f0, value2.f1, value1.f2 + "|"
+					+ value2.f2);
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 1) {
+				outputPath = args[0];
+			} else {
+				System.err.println("Usage: DeltaExtractExample <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing DeltaExtractExample with generated data.");
+			System.out.println("  Provide parameter to write to file.");
+			System.out.println("  Usage: DeltaExtractExample <result path>");
+		}
+		return true;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e34aca75/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
index 9b242f6..6f031c3 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
@@ -30,57 +30,103 @@ import org.apache.flink.util.Collector;
  */
 public class MultiplePoliciesExample {
 
-	private static final int PARALLELISM = 2;
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
 
 	public static void main(String[] args) throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(PARALLELISM);
-
-		// This reduce function does a String concat.
-		GroupReduceFunction<String, String> reducer = new GroupReduceFunction<String, String>() {
-
-			/**
-			 * Auto generates version ID
-			 */
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void reduce(Iterable<String> values, Collector<String> out) throws Exception {
-				String output = "|";
-				for (String v : values) {
-					output = output + v + "|";
-				}
-				out.collect(output);
-			}
 
-		};
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		DataStream<String> stream = env.addSource(new BasicSource())
 				.groupBy(0)
 				.window(Count.of(2))
 				.every(Count.of(3), Count.of(5))
-				.reduceGroup(reducer);
+				.reduceGroup(new Concat());
 
-		stream.print();
+		// emit result
+		if (fileOutput) {
+			stream.writeAsText(outputPath, 1);
+		} else {
+			stream.print();
+		}
 
-		env.execute();
+		// execute the program
+		env.execute("Multiple Policies Example");
 	}
 
-	public static class BasicSource implements SourceFunction<String> {
+	/**
+	 * This source function indefinitely provides String inputs for the
+	 * topology.
+	 */
+	public static final class BasicSource implements SourceFunction<String> {
 
 		private static final long serialVersionUID = 1L;
 
-		String str1 = new String("streaming");
-		String str2 = new String("flink");
+		private final static String STR_1 = new String("streaming");
+		private final static String STR_2 = new String("flink");
 
 		@Override
 		public void invoke(Collector<String> out) throws Exception {
 			// continuous emit
 			while (true) {
-				out.collect(str1);
-				out.collect(str2);
+				out.collect(STR_1);
+				out.collect(STR_2);
 			}
 		}
 	}
 
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * This reduce function does a String concat.
+	 */
+	public static final class Concat implements GroupReduceFunction<String, String> {
+
+		/**
+		 * Auto generates version ID
+		 */
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void reduce(Iterable<String> values, Collector<String> out) throws Exception {
+			String output = "|";
+			for (String v : values) {
+				output = output + v + "|";
+			}
+			out.collect(output);
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 1) {
+				outputPath = args[0];
+			} else {
+				System.err.println("Usage: MultiplePoliciesExample <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing MultiplePoliciesExample with generated data.");
+			System.out.println("  Provide parameter to write to file.");
+			System.out.println("  Usage: MultiplePoliciesExample <result path>");
+		}
+		return true;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e34aca75/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
index c9c78b5..cf03477 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
@@ -31,55 +31,58 @@ import org.apache.flink.util.Collector;
  */
 public class SlidingExample {
 
-	private static final int PARALLELISM = 1;
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
 
 	public static void main(String[] args) throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(PARALLELISM);
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		/*
 		 * SIMPLE-EXAMPLE: Use this to always keep the newest 10 elements in the
 		 * buffer Resulting windows will have an overlap of 5 elements
 		 */
-		
+
 		// DataStream<String> stream = env.addSource(new CountingSource())
 		// .window(Count.of(10))
 		// .every(Count.of(5))
-		// .reduce(reduceFunction);
-		
+		// .reduce(new Concat());
+
 		/*
 		 * ADVANCED-EXAMPLE: Use this to have the last element of the last
 		 * window as first element of the next window while the window size is
 		 * always 5
 		 */
-		
-		// This reduce function does a String concat.
-		ReduceFunction<String> reduceFunction = new ReduceFunction<String>() {
-
-			/**
-			 * default version ID
-			 */
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public String reduce(String value1, String value2) throws Exception {
-				return value1 + "|" + value2;
-			}
-
-		};
 
 		DataStream<String> stream = env.addSource(new CountingSource())
-				.window(Count.of(5).withDelete(4))
-				.every(Count.of(4).startingAt(-1))
-				.reduce(reduceFunction);
-
-		stream.print();
+				.window(Count.of(5)
+				.withDelete(4))
+				.every(Count.of(4)
+				.startingAt(-1))
+				.reduce(new Concat());
+
+		// emit result
+		if (fileOutput) {
+			stream.writeAsText(outputPath, 1);
+		} else {
+			stream.print();
+		}
 
-		env.execute();
+		// execute the program
+		env.execute("Sliding Example");
 	}
 
-	@SuppressWarnings("serial")
-	private static class CountingSource implements SourceFunction<String> {
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	private static final class CountingSource implements SourceFunction<String> {
+		private static final long serialVersionUID = 1L;
 
 		private int counter = 0;
 
@@ -93,6 +96,44 @@ public class SlidingExample {
 				collector.collect("V" + counter++);
 			}
 		}
+	}
+
+	/**
+	 * This reduce function does a String concat.
+	 */
+	private static final class Concat implements ReduceFunction<String> {
+		private static final long serialVersionUID = 1L;
 
+		@Override
+		public String reduce(String value1, String value2) throws Exception {
+			return value1 + "|" + value2;
+		}
 	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+	
+	private static boolean fileOutput = false;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 1) {
+				outputPath = args[0];
+			} else {
+				System.err.println("Usage: SlidingExample <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing SlidingExample with generated data.");
+			System.out.println("  Provide parameter to write to file.");
+			System.out.println("  Usage: SlidingExample <result path>");
+		}
+		return true;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e34aca75/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
index 8c26e4a..622aa82 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
@@ -21,9 +21,10 @@ import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.function.source.RichSourceFunction;
 import org.apache.flink.streaming.api.windowing.helper.Count;
 import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
@@ -36,55 +37,59 @@ import org.apache.flink.util.Collector;
  */
 public class TimeWindowingExample {
 
-	private static final int PARALLELISM = 1;
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
 
 	public static void main(String[] args) throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(PARALLELISM);
 
-		// Prevent output from being blocked
-		env.setBufferTimeout(100);
-
-		KeySelector<Integer, Integer> myKey = new KeySelector<Integer, Integer>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Integer getKey(Integer value) throws Exception {
-				if (value < 2) {
-					return 0;
-				} else {
-					return 1;
-				}
-			}
+		if (!parseParameters(args)) {
+			return;
+		}
 
-		};
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		DataStream<Integer> stream = env.addSource(new CountingSourceWithSleep())
 				.window(Count.of(100))
 				.every(Time.of(1000, TimeUnit.MILLISECONDS))
-				.groupBy(myKey)
+				.groupBy(new MyKey())
 				.sum(0);
 
-		stream.print();
+		// emit result
+		if (fileOutput) {
+			stream.writeAsText(outputPath, 1);
+		} else {
+			stream.print();
+		}
 
-		env.execute();
+		// execute the program
+		env.execute("Time Windowing Example");
 	}
 
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
 	/**
 	 * This data source emit one element every 0.001 sec. The output is an
 	 * Integer counting the output elements. As soon as the counter reaches
 	 * 10000 it is reset to 0. On each reset the source waits 5 sec. before it
 	 * restarts to produce elements.
 	 */
-	@SuppressWarnings("serial")
-	private static class CountingSourceWithSleep implements SourceFunction<Integer> {
+	private static final class CountingSourceWithSleep extends RichSourceFunction<Integer> {
+		private static final long serialVersionUID = 1L;
 
 		private int counter = 0;
+		private transient Random rnd;
+		
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			rnd = new Random();
+		}
 
 		@Override
 		public void invoke(Collector<Integer> collector) throws Exception {
-			Random rnd = new Random();
 			// continuous emit
 			while (true) {
 				if (counter > 9999) {
@@ -99,10 +104,49 @@ public class TimeWindowingExample {
 				// too fast for local tests and you might always see
 				// SUM[k=1..9999](k) as result.
 				Thread.sleep(1);
-
 				counter++;
 			}
 		}
+	}
+
+	private static final class MyKey implements KeySelector<Integer, Integer> {
 
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Integer getKey(Integer value) throws Exception {
+			if (value < 2) {
+				return 0;
+			} else {
+				return 1;
+			}
+		}
+
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 1) {
+				outputPath = args[0];
+			} else {
+				System.err.println("Usage: TimeWindowingExample <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing TimeWindowingExample with generated data.");
+			System.out.println("  Provide parameter to write to file.");
+			System.out.println("  Usage: TimeWindowingExample <result path>");
+		}
+		return true;
 	}
 }


[6/7] incubator-flink git commit: [FLINK-1333] Fixed getter/setter recognition for POJOs

Posted by mb...@apache.org.
[FLINK-1333] Fixed getter/setter recognition for POJOs

This closes #271


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/7ef04c62
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/7ef04c62
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/7ef04c62

Branch: refs/heads/release-0.8
Commit: 7ef04c625768515c874f3b015cf30f6631c4dade
Parents: a835e5d
Author: Robert Metzger <me...@web.de>
Authored: Tue Dec 16 22:00:50 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Dec 17 21:41:09 2014 +0100

----------------------------------------------------------------------
 .../flink/api/java/typeutils/TypeExtractor.java |  8 +++---
 .../type/extractor/PojoTypeExtractionTest.java  | 29 ++++++++++++++++++++
 2 files changed, 33 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7ef04c62/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index e52e2af..b528d00 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -989,12 +989,12 @@ public class TypeExtractor {
 			}
 			for(Method m : clazz.getMethods()) {
 				// check for getter
-				if(	// The name should be "get<FieldName>" or "<fieldName>" (for scala).
-					(m.getName().toLowerCase().equals("get"+fieldNameLow) || m.getName().toLowerCase().equals(fieldNameLow)) &&
+				if(	// The name should be "get<FieldName>" or "<fieldName>" (for scala) or "is<fieldName>" for boolean fields.
+					(m.getName().toLowerCase().equals("get"+fieldNameLow) || m.getName().toLowerCase().equals("is"+fieldNameLow) || m.getName().toLowerCase().equals(fieldNameLow)) &&
 					// no arguments for the getter
 					m.getParameterTypes().length == 0 &&
 					// return type is same as field type (or the generic variant of it)
-					(m.getReturnType().equals( fieldType ) || (fieldTypeGeneric != null && m.getGenericReturnType().equals(fieldTypeGeneric)) )
+					(m.getGenericReturnType().equals( fieldType ) || (fieldTypeGeneric != null && m.getGenericReturnType().equals(fieldTypeGeneric)) )
 				) {
 					if(hasGetter) {
 						throw new IllegalStateException("Detected more than one getter");
@@ -1004,7 +1004,7 @@ public class TypeExtractor {
 				// check for setters (<FieldName>_$eq for scala)
 				if((m.getName().toLowerCase().equals("set"+fieldNameLow) || m.getName().toLowerCase().equals(fieldNameLow+"_$eq")) &&
 					m.getParameterTypes().length == 1 && // one parameter of the field's type
-					( m.getParameterTypes()[0].equals( fieldType ) || (fieldTypeGeneric != null && m.getGenericParameterTypes()[0].equals(fieldTypeGeneric) ) )&&
+					( m.getGenericParameterTypes()[0].equals( fieldType ) || (fieldTypeGeneric != null && m.getGenericParameterTypes()[0].equals(fieldTypeGeneric) ) )&&
 					// return type is void.
 					m.getReturnType().equals(Void.TYPE)
 				) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7ef04c62/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
index 893e63c..7cff856 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.api.java.type.extractor;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Date;
 import java.util.List;
 
@@ -138,6 +139,34 @@ public class PojoTypeExtractionTest {
 		}
 	}
 
+	public static class PojoWithGenericFields {
+		private Collection<String> users;
+		private boolean favorited;
+
+		public boolean isFavorited() {
+			return favorited;
+		}
+
+		public void setFavorited(boolean favorited) {
+			this.favorited = favorited;
+		}
+
+		public Collection<String> getUsers() {
+			return users;
+		}
+
+		public void setUsers(Collection<String> users) {
+			this.users = users;
+		}
+	}
+	@Test
+	public void testPojoWithGenericFields() {
+		TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(PojoWithGenericFields.class);
+
+		Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
+	}
+
+
 	// in this test, the location of the getters and setters is mixed across the type hierarchy.
 	public static class TypedPojoGetterSetterCheck extends GenericPojoGetterSetterCheck<String> {
 		public void setPackageProtected(String in) {