You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:26:10 UTC

[33/51] [abbrv] git commit: [streaming] Updated Streaming function interfaces to match main project

[streaming] Updated Streaming function interfaces to match main project


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e73ea295
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e73ea295
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e73ea295

Branch: refs/heads/master
Commit: e73ea29593cdb8b4d9a11137b2188ca72673e98c
Parents: 0465d30
Author: gyfora <gy...@gmail.com>
Authored: Mon Aug 4 14:05:07 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:22:50 2014 +0200

----------------------------------------------------------------------
 .../streaming/connectors/flume/FlumeSink.java   |  12 +-
 .../streaming/connectors/flume/FlumeSource.java |   4 +-
 .../streaming/connectors/kafka/KafkaSink.java   |  12 +-
 .../streaming/connectors/kafka/KafkaSource.java |   4 +-
 .../connectors/kafka/KafkaTopology.java         |   2 +-
 .../streaming/connectors/rabbitmq/RMQSink.java  |  12 +-
 .../connectors/rabbitmq/RMQSource.java          |   4 +-
 .../connectors/twitter/TwitterSource.java       |   2 +-
 .../connectors/twitter/TwitterStreaming.java    |  33 +++---
 .../streaming/connectors/rabbitmq/RMQTest.java  |  46 ++++----
 .../apache/flink/streaming/api/DataStream.java  | 110 +++++++++++--------
 .../streaming/api/collector/OutputSelector.java |   4 +-
 .../api/function/co/CoMapFunction.java          |  13 ++-
 .../api/function/co/RichCoMapFunction.java      |  27 +++++
 .../api/function/sink/PrintSinkFunction.java    |   2 +-
 .../api/function/sink/RichSinkFunction.java     |  30 +++++
 .../api/function/sink/SinkFunction.java         |   6 +-
 .../api/function/sink/WriteSinkFunction.java    |   2 +-
 .../api/function/source/FileSourceFunction.java |   2 +-
 .../api/function/source/FileStreamFunction.java |   2 +-
 .../function/source/FromElementsFunction.java   |   2 +-
 .../function/source/GenSequenceFunction.java    |   2 +-
 .../api/function/source/RichSourceFunction.java |  29 +++++
 .../api/function/source/SourceFunction.java     |  10 +-
 .../streaming/api/invokable/SinkInvokable.java  |   9 +-
 .../api/invokable/SourceInvokable.java          |   9 +-
 .../operator/BatchReduceInvokable.java          |   4 +-
 .../api/invokable/operator/FilterInvokable.java |  15 ++-
 .../invokable/operator/FlatMapInvokable.java    |  15 ++-
 .../api/invokable/operator/MapInvokable.java    |  15 ++-
 .../operator/StreamReduceInvokable.java         |  13 ++-
 .../operator/WindowReduceInvokable.java         |   4 +-
 .../invokable/operator/co/CoMapInvokable.java   |   9 +-
 .../api/streamcomponent/CoStreamTask.java       |   4 +-
 .../util/serialization/FunctionTypeWrapper.java |  26 ++---
 .../apache/flink/streaming/api/IterateTest.java |   2 +-
 .../flink/streaming/api/WriteAsCsvTest.java     |   2 +-
 .../flink/streaming/api/WriteAsTextTest.java    |   2 +-
 .../api/collector/DirectedOutputTest.java       |   4 +-
 .../api/invokable/operator/BatchReduceTest.java |  10 +-
 .../api/invokable/operator/CoMapTest.java       |   4 +-
 .../api/invokable/operator/FilterTest.java      |   6 +-
 .../api/invokable/operator/FlatMapTest.java     |  16 +--
 .../api/invokable/operator/MapTest.java         |  36 +++---
 .../streamcomponent/StreamComponentTest.java    |   4 +-
 .../examples/basictopology/BasicTopology.java   |   6 +-
 .../examples/cellinfo/CellInfoLocal.java        |   4 +-
 .../CollaborativeFilteringSink.java             |   2 +-
 .../CollaborativeFilteringSource.java           |   2 +-
 .../examples/iterative/kmeans/KMeansSink.java   |   2 +-
 .../examples/iterative/kmeans/KMeansSource.java |   2 +-
 .../iterative/pagerank/PageRankSink.java        |   2 +-
 .../iterative/pagerank/PageRankSource.java      |   2 +-
 .../examples/iterative/sssp/SSSPSink.java       |   2 +-
 .../examples/iterative/sssp/SSSPSource.java     |   2 +-
 .../flink/streaming/examples/join/JoinSink.java |   2 +-
 .../streaming/examples/join/JoinSourceOne.java  |   2 +-
 .../streaming/examples/join/JoinSourceTwo.java  |   2 +-
 .../ml/IncrementalLearningSkeleton.java         |   4 +-
 .../streaming/examples/ml/IncrementalOLS.java   |   4 +-
 .../window/join/WindowJoinSourceOne.java        |   2 +-
 .../window/join/WindowJoinSourceTwo.java        |   2 +-
 .../examples/wordcount/WordCountCounter.java    |   6 +-
 .../examples/wordcount/WordCountSplitter.java   |   6 +-
 .../testdata_checksum/ASTopology.data.md5       |   1 +
 .../testdata_checksum/MovieLens100k.data.md5    |   1 +
 .../resources/testdata_checksum/hamlet.txt.md5  |   1 +
 .../testdata_checksum/terainput.txt.md5         |   1 +
 68 files changed, 389 insertions(+), 252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 6f943d1..69e34e6 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -30,7 +30,7 @@ import org.apache.flume.api.RpcClient;
 import org.apache.flume.api.RpcClientFactory;
 import org.apache.flume.event.EventBuilder;
 
-public abstract class FlumeSink<IN> extends SinkFunction<IN> {
+public abstract class FlumeSink<IN> implements SinkFunction<IN> {
 	private static final long serialVersionUID = 1L;
 
 	private static final Log LOG = LogFactory.getLog(FlumeSink.class);
@@ -51,18 +51,18 @@ public abstract class FlumeSink<IN> extends SinkFunction<IN> {
 	 * Receives tuples from the Apache Flink {@link DataStream} and forwards them to
 	 * Apache Flume.
 	 * 
-	 * @param tuple
+	 * @param value
 	 *            The tuple arriving from the datastream
 	 */
 	@Override
-	public void invoke(IN tuple) {
+	public void invoke(IN value) {
 
 		if (!initDone) {
 			client = new FlinkRpcClientFacade();
 			client.init(host, port);
 		}
 
-		byte[] data = serialize(tuple);
+		byte[] data = serialize(value);
 		if (!closeWithoutSend) {
 			client.sendDataToFlume(data);
 		}
@@ -75,11 +75,11 @@ public abstract class FlumeSink<IN> extends SinkFunction<IN> {
 	/**
 	 * Serializes tuples into byte arrays.
 	 * 
-	 * @param tuple
+	 * @param value
 	 *            The tuple used for the serialization
 	 * @return The serialized byte array.
 	 */
-	public abstract byte[] serialize(IN tuple);
+	public abstract byte[] serialize(IN value);
 
 	private class FlinkRpcClientFacade {
 		private RpcClient client;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
index b141efb..c296319 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
@@ -22,7 +22,7 @@ package org.apache.flink.streaming.connectors.flume;
 import java.util.List;
 
 import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.function.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flume.Context;
 import org.apache.flume.channel.ChannelProcessor;
@@ -30,7 +30,7 @@ import org.apache.flume.source.AvroSource;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.flume.source.avro.Status;
 
-public abstract class FlumeSource<OUT> extends SourceFunction<OUT> {
+public abstract class FlumeSource<OUT> extends RichSourceFunction<OUT> {
 	private static final long serialVersionUID = 1L;
 
 	String host;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
index 955e8dc..183860e 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
@@ -27,7 +27,7 @@ import kafka.producer.ProducerConfig;
 
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 
-public abstract class KafkaSink<IN, OUT> extends SinkFunction<IN> {
+public abstract class KafkaSink<IN, OUT> implements SinkFunction<IN> {
 	private static final long serialVersionUID = 1L;
 
 	private kafka.javaapi.producer.Producer<Integer, OUT> producer;
@@ -62,16 +62,16 @@ public abstract class KafkaSink<IN, OUT> extends SinkFunction<IN> {
 	/**
 	 * Called when new data arrives to the sink, and forwards it to Kafka.
 	 * 
-	 * @param tuple
+	 * @param value
 	 *            The incoming data
 	 */
 	@Override
-	public void invoke(IN tuple) {
+	public void invoke(IN value) {
 		if (!initDone) {
 			initialize();
 		}
 
-		OUT out = serialize(tuple);
+		OUT out = serialize(value);
 		KeyedMessage<Integer, OUT> data = new KeyedMessage<Integer, OUT>(topicId, out);
 
 		if (!closeWithoutSend) {
@@ -86,11 +86,11 @@ public abstract class KafkaSink<IN, OUT> extends SinkFunction<IN> {
 	/**
 	 * Serializes tuples into byte arrays.
 	 * 
-	 * @param tuple
+	 * @param value
 	 *            The tuple used for the serialization
 	 * @return The serialized byte array.
 	 */
-	public abstract OUT serialize(IN tuple);
+	public abstract OUT serialize(IN value);
 
 	/**
 	 * Closes the connection immediately and no further data will be sent.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
index 228069a..6e18b20 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
@@ -30,10 +30,10 @@ import kafka.consumer.KafkaStream;
 import kafka.javaapi.consumer.ConsumerConnector;
 
 import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.function.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
 
-public abstract class KafkaSource<OUT> extends SourceFunction<OUT> {
+public abstract class KafkaSource<OUT> extends RichSourceFunction<OUT> {
 	private static final long serialVersionUID = 1L;
 
 	private final String zkQuorum;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
index d605fb8..295f1cc 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
@@ -27,7 +27,7 @@ import org.apache.flink.util.Collector;
 
 public class KafkaTopology {
 
-	public static final class MySource extends SourceFunction<Tuple1<String>> {
+	public static final class MySource implements SourceFunction<Tuple1<String>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index c6f0ef5..2e3a8a6 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -29,7 +29,7 @@ import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 
-public abstract class RMQSink<IN> extends SinkFunction<IN> {
+public abstract class RMQSink<IN> implements SinkFunction<IN> {
 	private static final long serialVersionUID = 1L;
 
 	private static final Log LOG = LogFactory.getLog(RMQSink.class);
@@ -69,18 +69,18 @@ public abstract class RMQSink<IN> extends SinkFunction<IN> {
 	/**
 	 * Called when new data arrives to the sink, and forwards it to RMQ.
 	 * 
-	 * @param tuple
+	 * @param value
 	 *            The incoming data
 	 */
 	@Override
-	public void invoke(IN tuple) {
+	public void invoke(IN value) {
 		if (!initDone) {
 			initializeConnection();
 		}
 
 		try {
 			channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-			byte[] msg = serialize(tuple);
+			byte[] msg = serialize(value);
 			if (!closeWithoutSend) {
 				channel.basicPublish("", QUEUE_NAME, null, msg);
 			}
@@ -98,11 +98,11 @@ public abstract class RMQSink<IN> extends SinkFunction<IN> {
 	/**
 	 * Serializes tuples into byte arrays.
 	 * 
-	 * @param tuple
+	 * @param value
 	 *            The tuple used for the serialization
 	 * @return The serialized byte array.
 	 */
-	public abstract byte[] serialize(IN tuple);
+	public abstract byte[] serialize(IN value);
 
 	/**
 	 * Closes the connection.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 8303b1a..fa0be0d 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -24,7 +24,7 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.function.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
 
 import com.rabbitmq.client.Channel;
@@ -32,7 +32,7 @@ import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.QueueingConsumer;
 
-public abstract class RMQSource<OUT> extends SourceFunction<OUT> {
+public abstract class RMQSource<OUT> extends RichSourceFunction<OUT> {
 	private static final long serialVersionUID = 1L;
 
 	private static final Log LOG = LogFactory.getLog(RMQSource.class);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
index bc0995d..17e3b02 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -44,7 +44,7 @@ import com.twitter.hbc.httpclient.auth.OAuth1;
  * Implementation of {@link SourceFunction} specialized to emit tweets from Twitter.
  * It can connect to Twitter Streaming API, collect tweets and 
  */
-public class TwitterSource extends SourceFunction<String> {
+public class TwitterSource implements SourceFunction<String> {
 
 	private static final Log LOG = LogFactory.getLog(TwitterSource.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
index ee986ea..6a464ea 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
@@ -31,7 +31,8 @@ public class TwitterStreaming {
 	private static final int PARALLELISM = 1;
 	private static final int SOURCE_PARALLELISM = 1;
 
-	public static class TwitterSink extends SinkFunction<Tuple5<Long, Long, String, String, String>> {
+	public static class TwitterSink implements
+			SinkFunction<Tuple5<Long, Long, String, String, String>> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -44,40 +45,38 @@ public class TwitterStreaming {
 		}
 
 	}
-	
+
 	public static class SelectDataFlatMap extends
 			JSONParseFlatMap<String, Tuple5<Long, Long, String, String, String>> {
 
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void flatMap(String value,
-				Collector<Tuple5<Long, Long, String, String, String>> out)
+		public void flatMap(String value, Collector<Tuple5<Long, Long, String, String, String>> out)
 				throws Exception {
 
 			out.collect(new Tuple5<Long, Long, String, String, String>(
 					convertDateString2Long(getField(value, "id")),
 					convertDateString2LongDate(getField(value, "created_at")),
-					colationOfNull(getField(value, "user.name")),
-					colationOfNull(getField(value, "text")),
-					getField(value, "lang")));
+					colationOfNull(getField(value, "user.name")), colationOfNull(getField(value,
+							"text")), getField(value, "lang")));
 		}
-		
-		protected String colationOfNull(String in){
-			if(in==null){
+
+		protected String colationOfNull(String in) {
+			if (in == null) {
 				return " ";
 			}
 			return in;
 		}
-		
+
 		protected Long convertDateString2LongDate(String dateString) {
-			if (dateString!=(null)) {
+			if (dateString != (null)) {
 				String[] dateArray = dateString.split(" ");
-				return Long.parseLong(dateArray[2])*100000+Long.parseLong(dateArray[5]);
+				return Long.parseLong(dateArray[2]) * 100000 + Long.parseLong(dateArray[5]);
 			}
 			return 0L;
 		}
-		
+
 		protected Long convertDateString2Long(String dateString) {
 			if (dateString != null) {
 				return Long.parseLong(dateString);
@@ -87,14 +86,14 @@ public class TwitterStreaming {
 	}
 
 	public static void main(String[] args) {
-		
+
 		String path = "/home/eszes/git/auth.properties";
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment
 				.createLocalEnvironment(PARALLELISM);
 
-		DataStream<String> streamSource = env.addSource(
-				new TwitterSource(path,100), SOURCE_PARALLELISM);
+		DataStream<String> streamSource = env.addSource(new TwitterSource(path, 100),
+				SOURCE_PARALLELISM);
 
 		DataStream<Tuple5<Long, Long, String, String, String>> selectedDataStream = streamSource
 				.flatMap(new SelectDataFlatMap());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTest.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTest.java
index c6a43f2..ad704d7 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.flink.streaming.connectors.rabbitmq;
 
-
 import java.util.HashSet;
 import java.util.Set;
 
@@ -28,8 +27,8 @@ import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.junit.Test;
 
 public class RMQTest {
-	
-	public static final class MySink extends SinkFunction<Tuple1<String>> {
+
+	public static final class MySink implements SinkFunction<Tuple1<String>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -37,38 +36,37 @@ public class RMQTest {
 			result.add(tuple.f0);
 		}
 
-		
 	}
-	
+
 	private static Set<String> expected = new HashSet<String>();
 	private static Set<String> result = new HashSet<String>();
-	
+
 	@SuppressWarnings("unused")
-  private static void fillExpected() {
+	private static void fillExpected() {
 		expected.add("one");
 		expected.add("two");
 		expected.add("three");
 		expected.add("four");
 		expected.add("five");
 	}
-	
+
 	@Test
 	public void RMQTest1() throws Exception {
-//		
-//		StreamExecutionEnvironment env = new StreamExecutionEnvironment();
-//
-//		DataStream<Tuple1<String>> dataStream1 = env
-//				.addSource(new RMQSource("localhost", "hello"), 1)
-//				.addSink(new MySink());
-//		
-//		DataStream<Tuple1<String>> dataStream2 = env
-//				.fromElements("one", "two", "three", "four", "five", "q")
-//				.addSink(new RMQSink("localhost", "hello"));
-//
-//		env.execute();
-//		
-//		fillExpected();
-//		
-//		assertEquals(expected, result);
+		//
+		// StreamExecutionEnvironment env = new StreamExecutionEnvironment();
+		//
+		// DataStream<Tuple1<String>> dataStream1 = env
+		// .addSource(new RMQSource("localhost", "hello"), 1)
+		// .addSink(new MySink());
+		//
+		// DataStream<Tuple1<String>> dataStream2 = env
+		// .fromElements("one", "two", "three", "four", "five", "q")
+		// .addSink(new RMQSink("localhost", "hello"));
+		//
+		// env.execute();
+		//
+		// fillExpected();
+		//
+		// assertEquals(expected, result);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
index 430f09b..7aff259 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
@@ -19,19 +19,24 @@
 
 package org.apache.flink.streaming.api;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.java.functions.RichFilterFunction;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
 import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv;
@@ -184,7 +189,8 @@ public class DataStream<T> {
 	 * Sets the mutability of the operator represented by the DataStream. If the
 	 * operator is set to mutable, the tuples received in the user defined
 	 * functions, will be reused after the function call. Setting an operator to
-	 * mutable greatly reduces garbage collection overhead and thus scalability.
+	 * mutable reduces garbage collection overhead and thus increases
+	 * scalability.
 	 * 
 	 * @param isMutable
 	 *            The mutability of the operator.
@@ -309,38 +315,42 @@ public class DataStream<T> {
 
 	/**
 	 * Applies a Map transformation on a {@link DataStream}. The transformation
-	 * calls a {@link RichMapFunction} for each element of the DataStream. Each
-	 * MapFunction call returns exactly one element.
+	 * calls a {@link MapFunction} for each element of the DataStream. Each
+	 * MapFunction call returns exactly one element. The user can also extend
+	 * {@link RichMapFunction} to gain access to other features provided by the
+	 * {@link RichFuntion} interface.
 	 * 
 	 * @param mapper
-	 *            The RichMapFunction that is called for each element of the
+	 *            The MapFunction that is called for each element of the
 	 *            DataStream.
 	 * @param <R>
 	 *            output type
 	 * @return The transformed DataStream.
 	 */
-	public <R> StreamOperator<R> map(RichMapFunction<T, R> mapper) {
+	public <R> StreamOperator<R> map(MapFunction<T, R> mapper) {
 		return addFunction("map", mapper, new FunctionTypeWrapper<T, Tuple, R>(mapper,
-				RichMapFunction.class, 0, -1, 1), new MapInvokable<T, R>(mapper));
+				MapFunction.class, 0, -1, 1), new MapInvokable<T, R>(mapper));
 	}
 
 	/**
 	 * Applies a FlatMap transformation on a {@link DataStream}. The
-	 * transformation calls a {@link RichFlatMapFunction} for each element of
-	 * the DataStream. Each RichFlatMapFunction call can return any number of
-	 * elements including none.
+	 * transformation calls a {@link FlatMapFunction} for each element of the
+	 * DataStream. Each FlatMapFunction call can return any number of elements
+	 * including none. The user can also extend {@link RichFlatMapFunction} to
+	 * gain access to other features provided by the {@link RichFuntion}
+	 * interface.
 	 * 
 	 * @param flatMapper
-	 *            The RichFlatMapFunction that is called for each element of the
+	 *            The FlatMapFunction that is called for each element of the
 	 *            DataStream
 	 * 
 	 * @param <R>
 	 *            output type
 	 * @return The transformed DataStream.
 	 */
-	public <R> StreamOperator<R> flatMap(RichFlatMapFunction<T, R> flatMapper) {
+	public <R> StreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
 		return addFunction("flatMap", flatMapper, new FunctionTypeWrapper<T, Tuple, R>(flatMapper,
-				RichFlatMapFunction.class, 0, -1, 1), new FlatMapInvokable<T, R>(flatMapper));
+				FlatMapFunction.class, 0, -1, 1), new FlatMapInvokable<T, R>(flatMapper));
 	}
 
 	/**
@@ -348,7 +358,9 @@ public class DataStream<T> {
 	 * transformation calls a {@link CoMapFunction#map1(Tuple)} for each element
 	 * of the first DataStream (on which .coMapWith was called) and
 	 * {@link CoMapFunction#map2(Tuple)} for each element of the second
-	 * DataStream. Each CoMapFunction call returns exactly one element.
+	 * DataStream. Each CoMapFunction call returns exactly one element. The user
+	 * can also extend {@link RichCoMapFunction} to gain access to other
+	 * features provided by the {@link RichFuntion} interface.
 	 * 
 	 * @param coMapper
 	 *            The CoMapFunction used to jointly transform the two input
@@ -367,63 +379,67 @@ public class DataStream<T> {
 
 	/**
 	 * Applies a reduce transformation on preset chunks of the DataStream. The
-	 * transformation calls a {@link RichGroupReduceFunction} for each tuple
-	 * batch of the predefined size. Each RichGroupReduceFunction call can
-	 * return any number of elements including none.
+	 * transformation calls a {@link GroupReduceFunction} for each tuple batch
+	 * of the predefined size. Each GroupReduceFunction call can return any
+	 * number of elements including none. The user can also extend
+	 * {@link RichGroupReduceFunction} to gain access to other features provided
+	 * by the {@link RichFuntion} interface.
 	 * 
 	 * 
 	 * @param reducer
-	 *            The RichGroupReduceFunction that is called for each tuple
-	 *            batch.
+	 *            The GroupReduceFunction that is called for each tuple batch.
 	 * @param batchSize
 	 *            The number of tuples grouped together in the batch.
 	 * @param <R>
 	 *            output type
 	 * @return The modified DataStream.
 	 */
-	public <R> StreamOperator<R> batchReduce(RichGroupReduceFunction<T, R> reducer, int batchSize) {
+	public <R> StreamOperator<R> batchReduce(GroupReduceFunction<T, R> reducer, int batchSize) {
 		return addFunction("batchReduce", reducer, new FunctionTypeWrapper<T, Tuple, R>(reducer,
-				RichGroupReduceFunction.class, 0, -1, 1), new BatchReduceInvokable<T, R>(reducer,
+				GroupReduceFunction.class, 0, -1, 1), new BatchReduceInvokable<T, R>(reducer,
 				batchSize));
 	}
 
 	/**
 	 * Applies a reduce transformation on preset "time" chunks of the
-	 * DataStream. The transformation calls a {@link RichGroupReduceFunction} on
+	 * DataStream. The transformation calls a {@link GroupReduceFunction} on
 	 * records received during the predefined time window. The window shifted
-	 * after each reduce call. Each RichGroupReduceFunction call can return any
-	 * number of elements including none.
+	 * after each reduce call. Each GroupReduceFunction call can return any
+	 * number of elements including none.The user can also extend
+	 * {@link RichGroupReduceFunction} to gain access to other features provided
+	 * by the {@link RichFuntion} interface.
 	 * 
 	 * 
 	 * @param reducer
-	 *            The RichGroupReduceFunction that is called for each time
-	 *            window.
+	 *            The GroupReduceFunction that is called for each time window.
 	 * @param windowSize
 	 *            The time window to run the reducer on, in milliseconds.
 	 * @param <R>
 	 *            output type
 	 * @return The modified DataStream.
 	 */
-	public <R> StreamOperator<R> windowReduce(RichGroupReduceFunction<T, R> reducer, long windowSize) {
+	public <R> StreamOperator<R> windowReduce(GroupReduceFunction<T, R> reducer, long windowSize) {
 		return addFunction("batchReduce", reducer, new FunctionTypeWrapper<T, Tuple, R>(reducer,
-				RichGroupReduceFunction.class, 0, -1, 1), new WindowReduceInvokable<T, R>(reducer,
+				GroupReduceFunction.class, 0, -1, 1), new WindowReduceInvokable<T, R>(reducer,
 				windowSize));
 	}
 
 	/**
 	 * Applies a Filter transformation on a {@link DataStream}. The
-	 * transformation calls a {@link RichFilterFunction} for each element of the
+	 * transformation calls a {@link FilterFunction} for each element of the
 	 * DataStream and retains only those element for which the function returns
-	 * true. Elements for which the function returns false are filtered.
+	 * true. Elements for which the function returns false are filtered. The
+	 * user can also extend {@link RichFilterFunction} to gain access to other
+	 * features provided by the {@link RichFuntion} interface.
 	 * 
 	 * @param filter
-	 *            The RichFilterFunction that is called for each element of the
+	 *            The FilterFunction that is called for each element of the
 	 *            DataSet.
 	 * @return The filtered DataStream.
 	 */
-	public StreamOperator<T> filter(RichFilterFunction<T> filter) {
+	public StreamOperator<T> filter(FilterFunction<T> filter) {
 		return addFunction("filter", filter, new FunctionTypeWrapper<T, Tuple, T>(filter,
-				RichFilterFunction.class, 0, -1, 0), new FilterInvokable<T>(filter));
+				FilterFunction.class, 0, -1, 0), new FilterInvokable<T>(filter));
 	}
 
 	/**
@@ -745,14 +761,15 @@ public class DataStream<T> {
 	/**
 	 * Initiates an iterative part of the program that executes multiple times
 	 * and feeds back data streams. The iterative part needs to be closed by
-	 * calling {@link IterativeDataStream#closeWith(DataStream)}. The data
-	 * stream given to the {@code closeWith(DataStream)} method is the data
-	 * stream that will be fed back and used as the input for the iteration
+	 * calling {@link IterativeDataStream#closeWith(DataStream)}. The
+	 * transformation of this IterativeDataStream will be the iteration head.
+	 * The data stream given to the {@code closeWith(DataStream)} method is the
+	 * data stream that will be fed back and used as the input for the iteration
 	 * head. Unlike in batch processing by default the output of the iteration
 	 * stream is directed to both to the iteration head and the next component.
 	 * To direct tuples to the iteration head or the output specifically one can
-	 * use the {@code directTo(OutputSelector)} while referencing the iteration
-	 * head as 'iterate'.
+	 * use the {@code split(OutputSelector)} on the iteration tail while
+	 * referencing the iteration head as 'iterate'.
 	 * 
 	 * The iteration edge will be partitioned the same way as the first input of
 	 * the iteration head.
@@ -786,8 +803,8 @@ public class DataStream<T> {
 	 *            type of the return stream
 	 * @return the data stream constructed
 	 */
-	private <R> StreamOperator<R> addFunction(String functionName,
-			final AbstractRichFunction function, TypeSerializerWrapper<T, Tuple, R> typeWrapper,
+	private <R> StreamOperator<R> addFunction(String functionName, final Function function,
+			TypeSerializerWrapper<T, Tuple, R> typeWrapper,
 			UserTaskInvokable<T, R> functionInvokable) {
 
 		DataStream<T> inputStream = this.copy();
@@ -796,7 +813,8 @@ public class DataStream<T> {
 
 		try {
 			jobGraphBuilder.addTask(returnStream.getId(), functionInvokable, typeWrapper,
-					functionName, SerializationUtils.serialize(function), degreeOfParallelism);
+					functionName, SerializationUtils.serialize((Serializable) function),
+					degreeOfParallelism);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize user defined function");
 		}
@@ -816,16 +834,16 @@ public class DataStream<T> {
 	}
 
 	protected <T1, T2, R> StreamOperator<R> addCoFunction(String functionName,
-			DataStream<T1> inputStream1, DataStream<T2> inputStream2,
-			final AbstractRichFunction function, TypeSerializerWrapper<T1, T2, R> typeWrapper,
-			CoInvokable<T1, T2, R> functionInvokable) {
+			DataStream<T1> inputStream1, DataStream<T2> inputStream2, final Function function,
+			TypeSerializerWrapper<T1, T2, R> typeWrapper, CoInvokable<T1, T2, R> functionInvokable) {
 
 		StreamOperator<R> returnStream = new TwoInputStreamOperator<T1, T2, R>(environment,
 				functionName);
 
 		try {
 			jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable, typeWrapper,
-					functionName, SerializationUtils.serialize(function), degreeOfParallelism);
+					functionName, SerializationUtils.serialize((Serializable) function),
+					degreeOfParallelism);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize user defined function");
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
index 6d63385..798d8fa 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
@@ -51,10 +51,10 @@ public abstract class OutputSelector<T> implements Serializable {
 	 * directTo operator. The tuple will be emitted only to output names which
 	 * are added to the outputs collection.
 	 * 
-	 * @param outputObject
+	 * @param value
 	 *            Output object for which the output selection should be made.
 	 * @param outputs
 	 *            Selected output names should be added to this collection.
 	 */
-	public abstract void select(T outputObject, Collection<String> outputs);
+	public abstract void select(T value, Collection<String> outputs);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
index 8404a80..d1ef3d0 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
@@ -18,12 +18,13 @@
 
 package org.apache.flink.streaming.api.function.co;
 
-import org.apache.flink.api.common.functions.AbstractRichFunction;
+import java.io.Serializable;
 
+import org.apache.flink.api.common.functions.Function;
 
-public abstract class CoMapFunction<IN1, IN2, OUT> extends AbstractRichFunction {
-	private static final long serialVersionUID = 1L;
-	
-	public abstract OUT map1(IN1 value);
-	public abstract OUT map2(IN2 value);
+public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
+
+	public OUT map1(IN1 value);
+
+	public OUT map2(IN2 value);
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java
new file mode 100755
index 0000000..c93fc81
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.co;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+
+public abstract class RichCoMapFunction<IN1, IN2, OUT> extends AbstractRichFunction implements
+		CoMapFunction<IN1, IN2, OUT> {
+
+	private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
index 026c18e..4728800 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
@@ -27,7 +27,7 @@ package org.apache.flink.streaming.api.function.sink;
  * @param <IN>
  *            Input tuple type
  */
-public class PrintSinkFunction<IN> extends SinkFunction<IN> {
+public class PrintSinkFunction<IN> implements SinkFunction<IN> {
 	private static final long serialVersionUID = 1L;
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
new file mode 100755
index 0000000..4bbbdc4
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
@@ -0,0 +1,30 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+
+public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {
+
+	private static final long serialVersionUID = 1L;
+
+	public abstract void invoke(IN value);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
index 668837f..24f45e8 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
@@ -21,11 +21,9 @@ package org.apache.flink.streaming.api.function.sink;
 
 import java.io.Serializable;
 
-import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.Function;
 
-public abstract class SinkFunction<IN> extends AbstractRichFunction implements Serializable {
-
-	private static final long serialVersionUID = 1L;
+public interface SinkFunction<IN> extends Function, Serializable {
 
 	public abstract void invoke(IN value);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
index 774dd63..1cfcfaf 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
@@ -32,7 +32,7 @@ import java.util.ArrayList;
  * @param <IN>
  *            Input tuple type
  */
-public abstract class WriteSinkFunction<IN> extends SinkFunction<IN> {
+public abstract class WriteSinkFunction<IN> implements SinkFunction<IN> {
 	private static final long serialVersionUID = 1L;
 
 	protected final String path;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
index 3a732be..6c8cd3a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
@@ -25,7 +25,7 @@ import java.io.IOException;
 
 import org.apache.flink.util.Collector;
 
-public class FileSourceFunction extends SourceFunction<String> {
+public class FileSourceFunction implements SourceFunction<String> {
 	private static final long serialVersionUID = 1L;
 
 	private final String path;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
index 9cfb2ce..799e700 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
@@ -25,7 +25,7 @@ import java.io.IOException;
 
 import org.apache.flink.util.Collector;
 
-public class FileStreamFunction extends SourceFunction<String> {
+public class FileStreamFunction implements SourceFunction<String> {
 	private static final long serialVersionUID = 1L;
 
 	private final String path;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
index 89f5182..98e012b 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
@@ -24,7 +24,7 @@ import java.util.Collection;
 
 import org.apache.flink.util.Collector;
 
-public class FromElementsFunction<T> extends SourceFunction<T> {
+public class FromElementsFunction<T> implements SourceFunction<T> {
 	private static final long serialVersionUID = 1L;
 
 	Iterable<T> iterable;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
index d402374..ad7586c 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.Collector;
  * Source Function used to generate the number sequence
  * 
  */
-public class GenSequenceFunction extends SourceFunction<Long> {
+public class GenSequenceFunction implements SourceFunction<Long> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java
new file mode 100755
index 0000000..94311a1
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java
@@ -0,0 +1,29 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+
+public abstract class RichSourceFunction<OUT> extends AbstractRichFunction implements
+		SourceFunction<OUT> {
+
+	private static final long serialVersionUID = 1L;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
index 44e3387..0bdd7d6 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
@@ -19,12 +19,12 @@
 
 package org.apache.flink.streaming.api.function.source;
 
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.util.Collector;
+import java.io.Serializable;
 
-public abstract class SourceFunction<OUT> extends AbstractRichFunction {
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.util.Collector;
 
-	private static final long serialVersionUID = 1L;
+public interface SourceFunction<OUT> extends Function, Serializable {
 
-	public abstract void invoke(Collector<OUT> collector) throws Exception;
+	public void invoke(Collector<OUT> collector) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index b733362..50bdd42 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.streaming.api.invokable;
 
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 
@@ -48,11 +49,15 @@ public class SinkInvokable<IN> extends StreamRecordInvokable<IN, IN> {
 
 	@Override
 	public void open(Configuration parameters) throws Exception {
-		sinkFunction.open(parameters);
+		if (sinkFunction instanceof RichFunction) {
+			((RichFunction) sinkFunction).open(parameters);
+		}
 	}
 
 	@Override
 	public void close() throws Exception {
-		sinkFunction.close();
+		if (sinkFunction instanceof RichFunction) {
+			((RichFunction) sinkFunction).close();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
index d049bbf..d7710ae 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.invokable;
 
 import java.io.Serializable;
 
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 
@@ -43,11 +44,15 @@ public class SourceInvokable<OUT> extends StreamComponentInvokable<OUT> implemen
 
 	@Override
 	public void open(Configuration parameters) throws Exception {
-		sourceFunction.open(parameters);
+		if (sourceFunction instanceof RichFunction) {
+			((RichFunction) sourceFunction).open(parameters);
+		}
 	}
 
 	@Override
 	public void close() throws Exception {
-		sourceFunction.close();
+		if (sourceFunction instanceof RichFunction) {
+			((RichFunction) sourceFunction).close();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
index 61ba5a9..c3c861b 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -22,13 +22,13 @@ package org.apache.flink.streaming.api.invokable.operator;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 
 public class BatchReduceInvokable<IN, OUT> extends StreamReduceInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 	private int batchSize;
 
-	public BatchReduceInvokable(RichGroupReduceFunction<IN, OUT> reduceFunction, int batchSize) {
+	public BatchReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, int batchSize) {
 		this.reducer = reduceFunction;
 		this.batchSize = batchSize;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index 56ab680..b64f08b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -19,7 +19,8 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
-import org.apache.flink.api.java.functions.RichFilterFunction;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
 
@@ -27,9 +28,9 @@ public class FilterInvokable<IN> extends UserTaskInvokable<IN, IN> {
 
 	private static final long serialVersionUID = 1L;
 
-	RichFilterFunction<IN> filterFunction;
+	FilterFunction<IN> filterFunction;
 
-	public FilterInvokable(RichFilterFunction<IN> filterFunction) {
+	public FilterInvokable(FilterFunction<IN> filterFunction) {
 		this.filterFunction = filterFunction;
 	}
 
@@ -54,11 +55,15 @@ public class FilterInvokable<IN> extends UserTaskInvokable<IN, IN> {
 
 	@Override
 	public void open(Configuration parameters) throws Exception {
-		filterFunction.open(parameters);
+		if (filterFunction instanceof RichFunction) {
+			((RichFunction) filterFunction).open(parameters);
+		}
 	}
 
 	@Override
 	public void close() throws Exception {
-		filterFunction.close();
+		if (filterFunction instanceof RichFunction) {
+			((RichFunction) filterFunction).close();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
index 7796230..cc9fcc1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -19,16 +19,17 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
 
 public class FlatMapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 
-	private RichFlatMapFunction<IN, OUT> flatMapper;
+	private FlatMapFunction<IN, OUT> flatMapper;
 
-	public FlatMapInvokable(RichFlatMapFunction<IN, OUT> flatMapper) {
+	public FlatMapInvokable(FlatMapFunction<IN, OUT> flatMapper) {
 		this.flatMapper = flatMapper;
 	}
 
@@ -49,11 +50,15 @@ public class FlatMapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
 
 	@Override
 	public void open(Configuration parameters) throws Exception {
-		flatMapper.open(parameters);
+		if (flatMapper instanceof RichFunction) {
+			((RichFunction) flatMapper).open(parameters);
+		}
 	}
 
 	@Override
 	public void close() throws Exception {
-		flatMapper.close();
+		if (flatMapper instanceof RichFunction) {
+			((RichFunction) flatMapper).close();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
index 23fc31e..9dbb678 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -19,16 +19,17 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
 
 public class MapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 
-	private RichMapFunction<IN, OUT> mapper;
+	private MapFunction<IN, OUT> mapper;
 
-	public MapInvokable(RichMapFunction<IN, OUT> mapper) {
+	public MapInvokable(MapFunction<IN, OUT> mapper) {
 		this.mapper = mapper;
 	}
 
@@ -49,11 +50,15 @@ public class MapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
 
 	@Override
 	public void open(Configuration parameters) throws Exception {
-		mapper.open(parameters);
+		if (mapper instanceof RichFunction) {
+			((RichFunction) mapper).open(parameters);
+		}
 	}
 
 	@Override
 	public void close() throws Exception {
-		mapper.close();
+		if (mapper instanceof RichFunction) {
+			((RichFunction) mapper).close();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index 1a402a1..548a298 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -21,25 +21,30 @@ package org.apache.flink.streaming.api.invokable.operator;
 
 import java.util.Iterator;
 
-import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
 
 public abstract class StreamReduceInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
-	protected RichGroupReduceFunction<IN, OUT> reducer;
+	protected GroupReduceFunction<IN, OUT> reducer;
 	protected BatchIterator<IN> userIterator;
 	protected BatchIterable userIterable;
 
 	@Override
 	public void open(Configuration parameters) throws Exception {
 		userIterable = new BatchIterable();
-		reducer.open(parameters);
+		if (reducer instanceof RichFunction) {
+			((RichFunction) reducer).open(parameters);
+		}
 	}
 
 	@Override
 	public void close() throws Exception {
-		reducer.close();
+		if (reducer instanceof RichFunction) {
+			((RichFunction) reducer).close();
+		}
 	}
 
 	protected class BatchIterable implements Iterable<IN> {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
index 430a68e..309656b 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
@@ -22,7 +22,7 @@ package org.apache.flink.streaming.api.invokable.operator;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 
 public class WindowReduceInvokable<IN, OUT> extends StreamReduceInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
@@ -30,7 +30,7 @@ public class WindowReduceInvokable<IN, OUT> extends StreamReduceInvokable<IN, OU
 	volatile boolean isRunning;
 	boolean window;
 
-	public WindowReduceInvokable(RichGroupReduceFunction<IN, OUT> reduceFunction, long windowSize) {
+	public WindowReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize) {
 		this.reducer = reduceFunction;
 		this.windowSize = windowSize;
 		this.window = true;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
index ac71b22..cd51081 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.streaming.api.invokable.operator.co;
 
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
 
@@ -56,12 +57,16 @@ public class CoMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
 
 	@Override
 	public void open(Configuration parameters) throws Exception {
-		mapper.open(parameters);
+		if (mapper instanceof RichFunction) {
+			((RichFunction) mapper).open(parameters);
+		}
 	}
 
 	@Override
 	public void close() throws Exception {
-		mapper.close();
+		if (mapper instanceof RichFunction) {
+			((RichFunction) mapper).close();
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
index 0e03915..c06e664 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
@@ -25,7 +25,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.io.network.api.MutableReader;
@@ -82,7 +82,7 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
 	}
 
 	@SuppressWarnings({ "unchecked", "rawtypes" })
-	private void setDeserializers(Object function, Class<? extends AbstractRichFunction> clazz) {
+	private void setDeserializers(Object function, Class<? extends Function> clazz) {
 		TypeInformation<IN1> inputTypeInfo1 = (TypeInformation<IN1>) typeWrapper
 				.getInputTypeInfo1();
 		inputDeserializer1 = new StreamRecordSerializer<IN1>(inputTypeInfo1);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
index 54471ae..2ac6a47 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
@@ -21,22 +21,20 @@ package org.apache.flink.streaming.util.serialization;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
-public class FunctionTypeWrapper<IN1, IN2, OUT> extends
-		TypeSerializerWrapper<IN1, IN2, OUT> {
+public class FunctionTypeWrapper<IN1, IN2, OUT> extends TypeSerializerWrapper<IN1, IN2, OUT> {
 	private static final long serialVersionUID = 1L;
 
-	private AbstractRichFunction function;
-	private Class<? extends AbstractRichFunction> functionSuperClass;
+	private Function function;
+	private Class<? extends Function> functionSuperClass;
 	private int inTypeParameter1;
 	private int inTypeParameter2;
 	private int outTypeParameter;
 
-	public FunctionTypeWrapper(AbstractRichFunction function,
-			Class<? extends AbstractRichFunction> functionSuperClass, int inTypeParameter1,
-			int inTypeParameter2, int outTypeParameter) {
+	public FunctionTypeWrapper(Function function, Class<? extends Function> functionSuperClass,
+			int inTypeParameter1, int inTypeParameter2, int outTypeParameter) {
 		this.function = function;
 		this.functionSuperClass = functionSuperClass;
 		this.inTypeParameter1 = inTypeParameter1;
@@ -54,18 +52,18 @@ public class FunctionTypeWrapper<IN1, IN2, OUT> extends
 	@Override
 	protected void setTypeInfo() {
 		if (inTypeParameter1 != -1) {
-			inTypeInfo1 = TypeExtractor.createTypeInfo(functionSuperClass,
-					function.getClass(), inTypeParameter1, null, null);
+			inTypeInfo1 = TypeExtractor.createTypeInfo(functionSuperClass, function.getClass(),
+					inTypeParameter1, null, null);
 		}
 
 		if (inTypeParameter2 != -1) {
-			inTypeInfo2 = TypeExtractor.createTypeInfo(functionSuperClass,
-					function.getClass(), inTypeParameter2, null, null);
+			inTypeInfo2 = TypeExtractor.createTypeInfo(functionSuperClass, function.getClass(),
+					inTypeParameter2, null, null);
 		}
 
 		if (outTypeParameter != -1) {
-			outTypeInfo = TypeExtractor.createTypeInfo(functionSuperClass,
-					function.getClass(), outTypeParameter, null, null);
+			outTypeInfo = TypeExtractor.createTypeInfo(functionSuperClass, function.getClass(),
+					outTypeParameter, null, null);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 68403a8..2b3edc2 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -64,7 +64,7 @@ public class IterateTest {
 
 	}
 
-	public static final class MySink extends SinkFunction<Boolean> {
+	public static final class MySink implements SinkFunction<Boolean> {
 
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
index e296733..28cbc6e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
@@ -48,7 +48,7 @@ public class WriteAsCsvTest {
 	private static List<String> expected4 = new ArrayList<String>();
 	private static List<String> expected5 = new ArrayList<String>();
 
-	public static final class MySource1 extends SourceFunction<Tuple1<Integer>> {
+	public static final class MySource1 implements SourceFunction<Tuple1<Integer>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
index 0f22262..337ca4e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
@@ -50,7 +50,7 @@ public class WriteAsTextTest {
 	private static List<String> expected4 = new ArrayList<String>();
 	private static List<String> expected5 = new ArrayList<String>();
 
-	public static final class MySource1 extends SourceFunction<Tuple1<Integer>> {
+	public static final class MySource1 implements SourceFunction<Tuple1<Integer>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index e2991b4..f2c647f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -51,7 +51,7 @@ public class DirectedOutputTest {
 		}
 	}
 
-	private static class EvenSink extends SinkFunction<Long> {
+	private static class EvenSink implements SinkFunction<Long> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -61,7 +61,7 @@ public class DirectedOutputTest {
 		}
 	}
 
-	private static class OddSink extends SinkFunction<Long> {
+	private static class OddSink implements SinkFunction<Long> {
 
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
index c23c9a7..49f4509 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
@@ -23,7 +23,7 @@ import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
 
-import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.LocalStreamEnvironment;
@@ -42,8 +42,8 @@ public class BatchReduceTest {
 	private static final int PARALlELISM = 1;
 	private static final long MEMORYSIZE = 32;
 
-	public static final class MyBatchReduce extends
-			RichGroupReduceFunction<Tuple1<Double>, Tuple1<Double>> {
+	public static final class MyBatchReduce implements
+			GroupReduceFunction<Tuple1<Double>, Tuple1<Double>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -62,7 +62,7 @@ public class BatchReduceTest {
 		}
 	}
 
-	public static final class MySink extends SinkFunction<Tuple1<Double>> {
+	public static final class MySink implements SinkFunction<Tuple1<Double>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -72,7 +72,7 @@ public class BatchReduceTest {
 
 	}
 
-	public static final class MySource extends SourceFunction<Tuple1<Double>> {
+	public static final class MySource implements SourceFunction<Tuple1<Double>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
index 2c3f480..82a5f89 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
@@ -39,7 +39,7 @@ public class CoMapTest implements Serializable {
 	private static Set<String> result;
 	private static Set<String> expected = new HashSet<String>();
 
-	private final static class EmptySink extends SinkFunction<Boolean> {
+	private final static class EmptySink implements SinkFunction<Boolean> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -47,7 +47,7 @@ public class CoMapTest implements Serializable {
 		}
 	}
 
-	private final static class MyCoMap extends
+	private final static class MyCoMap implements
 			CoMapFunction<String, Integer, Boolean> {
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e73ea295/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
index 2d4fe7a..0cba0bf 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
 import java.util.HashSet;
 import java.util.Set;
 
-import org.apache.flink.api.java.functions.RichFilterFunction;
+import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.streaming.api.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.util.LogUtils;
@@ -36,7 +36,7 @@ public class FilterTest implements Serializable {
 
 	private static Set<Integer> set = new HashSet<Integer>();
 
-	private static class MySink extends SinkFunction<Integer> {
+	private static class MySink implements SinkFunction<Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -45,7 +45,7 @@ public class FilterTest implements Serializable {
 		}
 	}
 
-	static class MyFilter extends RichFilterFunction<Integer> {
+	static class MyFilter implements FilterFunction<Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override