You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:26:26 UTC

[49/51] [abbrv] git commit: [streaming] GroupReduce operator added + StreamCollector bugfix

[streaming] GroupReduce operator added + StreamCollector bugfix


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/309727ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/309727ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/309727ef

Branch: refs/heads/master
Commit: 309727ef94cb0bdca7e82ea31f020abf2c7075e4
Parents: 74d3742
Author: gyfora <gy...@gmail.com>
Authored: Wed Aug 6 22:36:53 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:23:43 2014 +0200

----------------------------------------------------------------------
 .../connectors/twitter/TwitterLocal.java        | 34 +++++++---
 .../api/collector/StreamCollector.java          | 12 ++--
 .../streaming/api/datastream/DataStream.java    | 46 ++++++++++---
 .../datastream/SingleOutputStreamOperator.java  |  4 +-
 .../streaming/api/invokable/SinkInvokable.java  | 16 +----
 .../api/invokable/SourceInvokable.java          | 18 +----
 .../api/invokable/StreamComponentInvokable.java | 15 ++++-
 .../api/invokable/StreamRecordInvokable.java    |  5 ++
 .../api/invokable/UserTaskInvokable.java        |  6 ++
 .../operator/BatchReduceInvokable.java          |  1 +
 .../api/invokable/operator/FilterInvokable.java | 16 +----
 .../invokable/operator/FlatMapInvokable.java    | 16 +----
 .../operator/GroupReduceInvokable.java          | 71 ++++++++++++++++++++
 .../api/invokable/operator/MapInvokable.java    | 17 +----
 .../operator/StreamReduceInvokable.java         | 18 ++---
 .../operator/WindowReduceInvokable.java         |  1 +
 .../api/invokable/operator/co/CoInvokable.java  |  5 ++
 .../invokable/operator/co/CoMapInvokable.java   |  1 +
 .../AbstractStreamComponent.java                |  5 +-
 .../api/collector/StreamCollectorTest.java      |  6 +-
 .../examples/wordcount/WordCountCounter.java    | 32 ++-------
 .../examples/wordcount/WordCountLocal.java      |  5 +-
 .../examples/wordcount/WordCountSplitter.java   |  7 +-
 23 files changed, 203 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
index 878a7ad..668647d 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
@@ -19,6 +19,9 @@
 
 package org.apache.flink.streaming.connectors.twitter;
 
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -28,19 +31,19 @@ import org.apache.flink.streaming.examples.wordcount.WordCountCounter;
 import org.apache.flink.util.Collector;
 
 /**
- * This program demonstrate the use of TwitterSource. 
- * Its aim is to count the frequency of the languages of tweets
+ * This program demonstrate the use of TwitterSource. Its aim is to count the
+ * frequency of the languages of tweets
  */
-public class TwitterLocal {
+public class TwitterLocal implements Serializable {
 
+	private static final long serialVersionUID = 1L;
 	private static final int PARALLELISM = 1;
 	private static final int SOURCE_PARALLELISM = 1;
 
 	/**
-	 * FlatMapFunction to determine the language of tweets if possible 
+	 * FlatMapFunction to determine the language of tweets if possible
 	 */
-	public static class SelectLanguageFlatMap extends
-			JSONParseFlatMap<String, String> {
+	public static class SelectLanguageFlatMap extends JSONParseFlatMap<String, String> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -54,7 +57,9 @@ public class TwitterLocal {
 		}
 
 		/**
-		 * Change the null String to space character. Useful when null is undesirable.
+		 * Change the null String to space character. Useful when null is
+		 * undesirable.
+		 * 
 		 * @param in
 		 * @return
 		 */
@@ -83,11 +88,18 @@ public class TwitterLocal {
 		DataStream<String> streamSource = env.addSource(new TwitterSource(path, 100),
 				SOURCE_PARALLELISM);
 
-
 		DataStream<Tuple2<String, Integer>> dataStream = streamSource
-				.flatMap(new SelectLanguageFlatMap())
-				.partitionBy(0)
-				.map(new WordCountCounter());
+				.flatMap(new SelectLanguageFlatMap()).partitionBy(0)
+				.map(new MapFunction<String, Tuple2<String, Integer>>() {
+
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public Tuple2<String, Integer> map(String value) throws Exception {
+
+						return new Tuple2<String, Integer>(value, 1);
+					}
+				}).groupReduce(new WordCountCounter(), 0);
 
 		dataStream.addSink(new SinkFunction<Tuple2<String, Integer>>() {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
index fe21c29..e76cf94 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
@@ -60,9 +60,12 @@ public class StreamCollector<OUT> implements Collector<OUT> {
 	 */
 	public StreamCollector(int channelID,
 			SerializationDelegate<StreamRecord<OUT>> serializationDelegate) {
-
 		this.serializationDelegate = serializationDelegate;
-		this.streamRecord = new StreamRecord<OUT>();
+		if (serializationDelegate != null) {
+			this.streamRecord = serializationDelegate.getInstance();
+		} else {
+			this.streamRecord = new StreamRecord<OUT>();
+		}
 		this.channelID = channelID;
 		this.outputs = new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
 		this.outputMap = new HashMap<String, List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>>();
@@ -82,8 +85,9 @@ public class StreamCollector<OUT> implements Collector<OUT> {
 		for (String outputName : outputNames) {
 			if (outputName != null) {
 				if (!outputMap.containsKey(outputName)) {
-					outputMap.put(outputName,
-							new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>());
+					outputMap
+							.put(outputName,
+									new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>());
 					outputMap.get(outputName).add(output);
 				} else {
 					if (!outputMap.get(outputName).contains(output)) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 86b3322..bec55e0 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -30,10 +30,12 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.RichFilterFunction;
 import org.apache.flink.api.java.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -48,6 +50,7 @@ import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
 import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
 import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
+import org.apache.flink.streaming.api.invokable.operator.GroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
 import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
 import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
@@ -185,16 +188,16 @@ public abstract class DataStream<OUT> {
 	 * Sets the partitioning of the {@link DataStream} so that the output tuples
 	 * are partitioned by their hashcode and are sent to only one component.
 	 * 
-	 * @param keyposition
+	 * @param keyPosition
 	 *            The field used to compute the hashcode.
 	 * @return The DataStream with field partitioning set.
 	 */
-	public DataStream<OUT> partitionBy(int keyposition) {
-		if (keyposition < 0) {
+	public DataStream<OUT> partitionBy(int keyPosition) {
+		if (keyPosition < 0) {
 			throw new IllegalArgumentException("The position of the field must be non-negative");
 		}
 
-		return setConnectionType(new FieldsPartitioner<OUT>(keyposition));
+		return setConnectionType(new FieldsPartitioner<OUT>(keyPosition));
 	}
 
 	/**
@@ -280,6 +283,29 @@ public abstract class DataStream<OUT> {
 	}
 
 	/**
+	 * Applies a group and a reduce transformation on the DataStream grouped on
+	 * the given key position. The {@link ReduceFunction} will receive input
+	 * values based on the key value. Only input values with the same key will
+	 * go to the same reducer.The user can also extend
+	 * {@link RichReduceFunction} to gain access to other features provided
+	 * by the {@link RichFuntion} interface.
+	 * 
+	 * @param reducer
+	 *            The {@link ReduceFunction} that will be called for every
+	 *            element of the input values with the same key.
+	 * @param keyPosition
+	 *            The key position in the input values on which the grouping is
+	 *            made.
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> groupReduce(ReduceFunction<OUT> reducer,
+			int keyPosition) {
+		return addFunction("groupReduce", reducer,
+				new FunctionTypeWrapper<OUT, Tuple, OUT>(reducer, ReduceFunction.class, 0, -1, 0),
+				new GroupReduceInvokable<OUT>(reducer, keyPosition)).partitionBy(keyPosition);
+	}
+
+	/**
 	 * Applies a reduce transformation on preset chunks of the DataStream. The
 	 * transformation calls a {@link GroupReduceFunction} for each tuple batch
 	 * of the predefined size. Each GroupReduceFunction call can return any
@@ -505,8 +531,8 @@ public abstract class DataStream<OUT> {
 	 */
 	private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream, String path,
 			WriteFormatAsText<OUT> format, int batchSize, OUT endTuple) {
-		DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByBatches<OUT>(
-				path, format, batchSize, endTuple), null);
+		DataStreamSink<OUT> returnStream = addSink(inputStream,
+				new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), null);
 		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
 		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
@@ -657,8 +683,8 @@ public abstract class DataStream<OUT> {
 	 */
 	private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream, String path,
 			WriteFormatAsCsv<OUT> format, int batchSize, OUT endTuple) {
-		DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByBatches<OUT>(
-				path, format, batchSize, endTuple), null);
+		DataStreamSink<OUT> returnStream = addSink(inputStream,
+				new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), null);
 		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
 		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
@@ -801,8 +827,8 @@ public abstract class DataStream<OUT> {
 				sinkFunction, SinkFunction.class, 0, -1, 0));
 	}
 
-	private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream, SinkFunction<OUT> sinkFunction,
-			TypeSerializerWrapper<OUT, Tuple, OUT> typeWrapper) {
+	private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream,
+			SinkFunction<OUT> sinkFunction, TypeSerializerWrapper<OUT, Tuple, OUT> typeWrapper) {
 		DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink");
 
 		try {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 6d660b1..b86fac0 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -77,7 +77,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	 *            The mutability of the operator.
 	 * @return The operator with mutability set.
 	 */
-	public DataStream<OUT> setMutability(boolean isMutable) {
+	public SingleOutputStreamOperator<OUT, O> setMutability(boolean isMutable) {
 		jobGraphBuilder.setMutability(id, isMutable);
 		return this;
 	}
@@ -90,7 +90,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	 *            The maximum time between two output flushes.
 	 * @return The operator with buffer timeout set.
 	 */
-	public DataStream<OUT> setBufferTimeout(long timeoutMillis) {
+	public SingleOutputStreamOperator<OUT, O> setBufferTimeout(long timeoutMillis) {
 		jobGraphBuilder.setBufferTimeout(id, timeoutMillis);
 		return this;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index 50bdd42..887df8b 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -19,8 +19,6 @@
 
 package org.apache.flink.streaming.api.invokable;
 
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 
 public class SinkInvokable<IN> extends StreamRecordInvokable<IN, IN> {
@@ -29,6 +27,7 @@ public class SinkInvokable<IN> extends StreamRecordInvokable<IN, IN> {
 	private SinkFunction<IN> sinkFunction;
 
 	public SinkInvokable(SinkFunction<IN> sinkFunction) {
+		super(sinkFunction);
 		this.sinkFunction = sinkFunction;
 	}
 
@@ -47,17 +46,4 @@ public class SinkInvokable<IN> extends StreamRecordInvokable<IN, IN> {
 		}
 	}
 
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		if (sinkFunction instanceof RichFunction) {
-			((RichFunction) sinkFunction).open(parameters);
-		}
-	}
-
-	@Override
-	public void close() throws Exception {
-		if (sinkFunction instanceof RichFunction) {
-			((RichFunction) sinkFunction).close();
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
index d7710ae..a4be1e8 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
@@ -21,8 +21,6 @@ package org.apache.flink.streaming.api.invokable;
 
 import java.io.Serializable;
 
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 
 public class SourceInvokable<OUT> extends StreamComponentInvokable<OUT> implements Serializable {
@@ -31,10 +29,9 @@ public class SourceInvokable<OUT> extends StreamComponentInvokable<OUT> implemen
 
 	private SourceFunction<OUT> sourceFunction;
 
-	public SourceInvokable() {
-	}
 
 	public SourceInvokable(SourceFunction<OUT> sourceFunction) {
+		super(sourceFunction);
 		this.sourceFunction = sourceFunction;
 	}
 
@@ -42,17 +39,4 @@ public class SourceInvokable<OUT> extends StreamComponentInvokable<OUT> implemen
 		sourceFunction.invoke(collector);
 	}
 
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		if (sourceFunction instanceof RichFunction) {
-			((RichFunction) sourceFunction).open(parameters);
-		}
-	}
-
-	@Override
-	public void close() throws Exception {
-		if (sourceFunction instanceof RichFunction) {
-			((RichFunction) sourceFunction).close();
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
index 02ee5fd..ed718f1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
@@ -22,6 +22,8 @@ package org.apache.flink.streaming.api.invokable;
 import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
 
@@ -35,6 +37,11 @@ public abstract class StreamComponentInvokable<OUT> extends AbstractRichFunction
 	@SuppressWarnings("unused")
 	private int channelID;
 	protected Collector<OUT> collector;
+	protected Function userFunction;
+
+	public StreamComponentInvokable(Function userFunction) {
+		this.userFunction = userFunction;
+	}
 
 	public void setCollector(Collector<OUT> collector) {
 		this.collector = collector;
@@ -47,12 +54,16 @@ public abstract class StreamComponentInvokable<OUT> extends AbstractRichFunction
 
 	@Override
 	public void open(Configuration parameters) throws Exception {
-		System.out.println("Open not implemented");
+		if (userFunction instanceof RichFunction) {
+			((RichFunction) userFunction).open(parameters);
+		}
 	}
 
 	@Override
 	public void close() throws Exception {
-		System.out.println("Close not implemented");
+		if (userFunction instanceof RichFunction) {
+			((RichFunction) userFunction).close();
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
index b1cdde1..27dc05a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.invokable;
 
 import java.io.IOException;
 
+import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.util.Collector;
@@ -29,6 +30,10 @@ import org.apache.flink.util.MutableObjectIterator;
 public abstract class StreamRecordInvokable<IN, OUT> extends
 		StreamComponentInvokable<OUT> {
 
+	public StreamRecordInvokable(Function userFunction) {
+		super(userFunction);
+	}
+
 	private static final long serialVersionUID = 1L;
 
 	protected MutableObjectIterator<StreamRecord<IN>> recordIterator;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserTaskInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserTaskInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserTaskInvokable.java
index 0a88efd..8d1da6b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserTaskInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserTaskInvokable.java
@@ -21,8 +21,14 @@ package org.apache.flink.streaming.api.invokable;
 
 import java.io.Serializable;
 
+import org.apache.flink.api.common.functions.Function;
+
 public abstract class UserTaskInvokable<IN, OUT> extends
 		StreamRecordInvokable<IN, OUT> implements Serializable {
 
+	public UserTaskInvokable(Function userFunction) {
+		super(userFunction);
+	}
+
 	private static final long serialVersionUID = 1L;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
index c3c861b..4e0a7a5 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -29,6 +29,7 @@ public class BatchReduceInvokable<IN, OUT> extends StreamReduceInvokable<IN, OUT
 	private int batchSize;
 
 	public BatchReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, int batchSize) {
+		super(reduceFunction);
 		this.reducer = reduceFunction;
 		this.batchSize = batchSize;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index b64f08b..388920c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -20,8 +20,6 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
 
 public class FilterInvokable<IN> extends UserTaskInvokable<IN, IN> {
@@ -31,6 +29,7 @@ public class FilterInvokable<IN> extends UserTaskInvokable<IN, IN> {
 	FilterFunction<IN> filterFunction;
 
 	public FilterInvokable(FilterFunction<IN> filterFunction) {
+		super(filterFunction);
 		this.filterFunction = filterFunction;
 	}
 
@@ -53,17 +52,4 @@ public class FilterInvokable<IN> extends UserTaskInvokable<IN, IN> {
 		}
 	}
 
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		if (filterFunction instanceof RichFunction) {
-			((RichFunction) filterFunction).open(parameters);
-		}
-	}
-
-	@Override
-	public void close() throws Exception {
-		if (filterFunction instanceof RichFunction) {
-			((RichFunction) filterFunction).close();
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
index cc9fcc1..4cb4712 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -20,8 +20,6 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
 
 public class FlatMapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
@@ -30,6 +28,7 @@ public class FlatMapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
 	private FlatMapFunction<IN, OUT> flatMapper;
 
 	public FlatMapInvokable(FlatMapFunction<IN, OUT> flatMapper) {
+		super(flatMapper);
 		this.flatMapper = flatMapper;
 	}
 
@@ -48,17 +47,4 @@ public class FlatMapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
 		}
 	}
 
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		if (flatMapper instanceof RichFunction) {
-			((RichFunction) flatMapper).open(parameters);
-		}
-	}
-
-	@Override
-	public void close() throws Exception {
-		if (flatMapper instanceof RichFunction) {
-			((RichFunction) flatMapper).close();
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java
new file mode 100755
index 0000000..67af978
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java
@@ -0,0 +1,71 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+
+public class GroupReduceInvokable<IN> extends UserTaskInvokable<IN, IN> {
+	private static final long serialVersionUID = 1L;
+
+	private ReduceFunction<IN> reducer;
+	private int keyPosition;
+	private Map<Object, IN> values;
+
+	public GroupReduceInvokable(ReduceFunction<IN> reducer, int keyPosition) {
+		super(reducer);
+		this.reducer = reducer;
+		this.keyPosition = keyPosition;
+		values = new HashMap<Object, IN>();
+	}
+
+	@Override
+	protected void immutableInvoke() throws Exception {
+		while ((reuse = recordIterator.next(reuse)) != null) {
+			reduce();
+			resetReuse();
+		}
+	}
+
+	@Override
+	protected void mutableInvoke() throws Exception {
+		while ((reuse = recordIterator.next(reuse)) != null) {
+			reduce();
+		}
+	}
+
+	private void reduce() throws Exception {
+		Object key = reuse.getField(keyPosition);
+		IN currentValue = values.get(key);
+		IN nextValue = reuse.getObject();
+		if (currentValue != null) {
+			IN reduced = reducer.reduce(currentValue, nextValue);
+			values.put(key, reduced);
+			collector.collect(reduced);
+		} else {
+			values.put(key, nextValue);
+			collector.collect(nextValue);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
index 9dbb678..53e85e0 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -20,8 +20,6 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
 
 public class MapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
@@ -30,6 +28,7 @@ public class MapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
 	private MapFunction<IN, OUT> mapper;
 
 	public MapInvokable(MapFunction<IN, OUT> mapper) {
+		super(mapper);
 		this.mapper = mapper;
 	}
 
@@ -47,18 +46,4 @@ public class MapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
 			collector.collect(mapper.map(reuse.getObject()));
 		}
 	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		if (mapper instanceof RichFunction) {
-			((RichFunction) mapper).open(parameters);
-		}
-	}
-
-	@Override
-	public void close() throws Exception {
-		if (mapper instanceof RichFunction) {
-			((RichFunction) mapper).close();
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index 548a298..de0a18a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -21,12 +21,17 @@ package org.apache.flink.streaming.api.invokable.operator;
 
 import java.util.Iterator;
 
+import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
 
 public abstract class StreamReduceInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
+
+	public StreamReduceInvokable(Function userFunction) {
+		super(userFunction);
+	}
+
 	private static final long serialVersionUID = 1L;
 	protected GroupReduceFunction<IN, OUT> reducer;
 	protected BatchIterator<IN> userIterator;
@@ -35,16 +40,7 @@ public abstract class StreamReduceInvokable<IN, OUT> extends UserTaskInvokable<I
 	@Override
 	public void open(Configuration parameters) throws Exception {
 		userIterable = new BatchIterable();
-		if (reducer instanceof RichFunction) {
-			((RichFunction) reducer).open(parameters);
-		}
-	}
-
-	@Override
-	public void close() throws Exception {
-		if (reducer instanceof RichFunction) {
-			((RichFunction) reducer).close();
-		}
+		super.open(parameters);
 	}
 
 	protected class BatchIterable implements Iterable<IN> {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
index 309656b..3405641 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
@@ -31,6 +31,7 @@ public class WindowReduceInvokable<IN, OUT> extends StreamReduceInvokable<IN, OU
 	boolean window;
 
 	public WindowReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize) {
+		super(reduceFunction);
 		this.reducer = reduceFunction;
 		this.windowSize = windowSize;
 		this.window = true;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index d854e89..4ac75a1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.streaming.api.invokable.operator.co;
 
+import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
@@ -28,6 +29,10 @@ import org.apache.flink.util.MutableObjectIterator;
 public abstract class CoInvokable<IN1, IN2, OUT> extends
 		StreamComponentInvokable<OUT> {
 
+	public CoInvokable(Function userFunction) {
+		super(userFunction);
+	}
+
 	private static final long serialVersionUID = 1L;
 
 	protected Collector<OUT> collector;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
index cd51081..ab10b40 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
@@ -29,6 +29,7 @@ public class CoMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
 	private CoMapFunction<IN1, IN2, OUT> mapper;
 
 	public CoMapInvokable(CoMapFunction<IN1, IN2, OUT> mapper) {
+		super(mapper);
 		this.mapper = mapper;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
index e2e8816..324b5ba 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
@@ -108,6 +108,7 @@ public abstract class AbstractStreamComponent<OUT> extends AbstractInvokable {
 		outTypeInfo = typeWrapper.getOutputTypeInfo();
 		outSerializer = new StreamRecordSerializer<OUT>(outTypeInfo);
 		outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(outSerializer);
+		outSerializationDelegate.setInstance(outSerializer.createInstance());
 	}
 
 	protected void setConfigOutputs(
@@ -116,8 +117,8 @@ public abstract class AbstractStreamComponent<OUT> extends AbstractInvokable {
 		setCollector();
 
 		int numberOfOutputs = configuration.getNumberOfOutputs();
-		bufferTimeout= configuration.getBufferTimeout();
-		
+		bufferTimeout = configuration.getBufferTimeout();
+
 		for (int i = 0; i < numberOfOutputs; i++) {
 			setPartitioner(i, outputs);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
index 72b09c9..99f05ac 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
@@ -35,9 +35,11 @@ public class StreamCollectorTest {
 	@Test
 	public void testCollect() {
 		MockRecordWriter recWriter = MockRecordWriterFactory.create();
+		SerializationDelegate<StreamRecord<Tuple1<Integer>>> sd = new SerializationDelegate<StreamRecord<Tuple1<Integer>>>(
+				null);
+		sd.setInstance(new StreamRecord<Tuple1<Integer>>().setObject(new Tuple1<Integer>()));
 
-		StreamCollector<Tuple1<Integer>> collector = new StreamCollector<Tuple1<Integer>>(2,
-				new SerializationDelegate<StreamRecord<Tuple1<Integer>>>(null));
+		StreamCollector<Tuple1<Integer>> collector = new StreamCollector<Tuple1<Integer>>(2, sd);
 		collector.addOutput(recWriter, new ArrayList<String>());
 		collector.collect(new Tuple1<Integer>(3));
 		collector.collect(new Tuple1<Integer>(4));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountCounter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountCounter.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountCounter.java
index f106afe..6ee824a 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountCounter.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountCounter.java
@@ -19,38 +19,16 @@
 
 package org.apache.flink.streaming.examples.wordcount;
 
-import java.util.HashMap;
-import java.util.Map;
-
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.common.functions.MapFunction;
 
-public class WordCountCounter implements MapFunction<String, Tuple2<String, Integer>> {
+public class WordCountCounter implements ReduceFunction<Tuple2<String, Integer>> {
 	private static final long serialVersionUID = 1L;
 
-	private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
-	private String word = "";
-	private Integer count = 0;
-
-	private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
-
-	// Increments the counter of the occurrence of the input word
 	@Override
-	public Tuple2<String, Integer> map(String inTuple) throws Exception {
-		word = inTuple;
-
-		if (wordCounts.containsKey(word)) {
-			count = wordCounts.get(word) + 1;
-			wordCounts.put(word, count);
-		} else {
-			count = 1;
-			wordCounts.put(word, 1);
-		}
-
-		outTuple.f0 = word;
-		outTuple.f1 = count;
-
-		return outTuple;
+	public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
+			Tuple2<String, Integer> value2) throws Exception {
+		return new Tuple2<String, Integer>(value1.f0, value1.f1 + value2.f1);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
index fc31930..ba5a6e2 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
@@ -19,10 +19,10 @@
 
 package org.apache.flink.streaming.examples.wordcount;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.TestDataUtil;
-import org.apache.flink.api.java.tuple.Tuple2;
 
 public class WordCountLocal {
 
@@ -35,7 +35,8 @@ public class WordCountLocal {
 
 		DataStream<Tuple2<String, Integer>> dataStream = env
 				.readTextFile("src/test/resources/testdata/hamlet.txt")
-				.flatMap(new WordCountSplitter()).partitionBy(0).map(new WordCountCounter());
+				.flatMap(new WordCountSplitter())
+				.groupReduce(new WordCountCounter(), 0);
 
 		dataStream.print();
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/309727ef/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountSplitter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountSplitter.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountSplitter.java
index eb0dfd3..b962d4d 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountSplitter.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountSplitter.java
@@ -20,17 +20,18 @@
 package org.apache.flink.streaming.examples.wordcount;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
-public class WordCountSplitter implements FlatMapFunction<String, String> {
+public class WordCountSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
 	private static final long serialVersionUID = 1L;
 
 	// Splits the lines according on spaces
 	@Override
-	public void flatMap(String inTuple, Collector<String> out) throws Exception {
+	public void flatMap(String inTuple, Collector<Tuple2<String, Integer>> out) throws Exception {
 
 		for (String word : inTuple.split(" ")) {
-			out.collect(word);
+			out.collect(new Tuple2<String, Integer>(word, 1));
 		}
 	}
 }
\ No newline at end of file