You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/08/01 09:29:29 UTC

[13/22] [FLINK-701] Refactor Java API to use SAM interfaces. Introduce RichFunction stubs for all UDFs.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
new file mode 100644
index 0000000..0c8bc97
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+
+/**
+ * @param <IN1> First input type
+ * @param <IN2> Second input type
+ * @param <OUT> Output type
+ */
+public interface CrossFunction<IN1, IN2, OUT> extends Function, Serializable {
+
+	/**
+	 * User defined function for the cross operator.
+	 * 
+	 * @param record1 Record from first input
+	 * @param record2 Record from the second input
+	 * @return result of cross UDF.
+	 * @throws Exception
+	 */
+	OUT cross(IN1 record1, IN2 record2) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
new file mode 100644
index 0000000..2f68477
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+import java.io.Serializable;
+
+public interface FilterFunction<T> extends Function, Serializable {
+	
+	/**
+	 * User defined function for a filter.
+	 * 
+	 * @param value Incoming tuples
+	 * @return true for tuples that are allowed to pass the filter
+	 * @throws Exception
+	 */
+	boolean filter(T value) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
new file mode 100644
index 0000000..b2c8f30
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.apache.flink.util.Collector;
+
+/**
+ * Generic interface used for combiners.
+ */
+public interface FlatCombineFunction<T> extends Function, Serializable {
+
+	void combine(Iterator<T> values, Collector<T> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
new file mode 100644
index 0000000..6a6b971
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+
+public interface FlatJoinFunction<IN1, IN2, OUT> extends Function, Serializable {
+
+	void join (IN1 left, IN2 right, Collector<OUT> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
new file mode 100644
index 0000000..a8696cf
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+
+/**
+ *
+ * @param <T>
+ * @param <O>
+ */
+public interface FlatMapFunction<T, O> extends Function, Serializable {
+
+	/**
+	 * The core method of FlatMappable. Takes an element from the input data set and transforms
+	 * it into zero, one, or more elements.
+	 *
+	 * @param value The input value.
+	 * @param out The collector for for emitting result values.
+	 *
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+	 *                   to fail and may trigger recovery.
+	 */
+	void flatMap(T value, Collector<O> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
index d3b7db4..c2a201f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
@@ -16,82 +16,13 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
-import org.apache.flink.configuration.Configuration;
-
 /**
- * An base interface for all user-defined functions. This class defines methods for
- * the life cycle of the functions, as well as methods to access the context in which the functions
- * are executed.
+ * An base interface for all user-defined functions. This interface is empty in order
+ * to enable functions that are SAM (single abstract method) interfaces, so that they
+ * can be called as Java 8 lambdas
  */
 public interface Function {
-	
-	/**
-	 * Initialization method for the function. It is called before the actual working methods 
-	 * (like <i>map</i> or <i>join</i>) and thus suitable for one time setup work. For functions that
-	 * are part of an iteration, this method will be invoked at the beginning of each iteration superstep.
-	 * <p>
-	 * The configuration object passed to the function can be used for configuration and initialization.
-	 * The configuration contains all parameters that were configured on the function in the program
-	 * composition.
-	 * 
-	 * <pre><blockquote>
-	 * public class MyMapper extends FilterFunction<String> {
-	 * 
-	 *     private String searchString;
-	 *     
-	 *     public void open(Configuration parameters) {
-	 *         this.searchString = parameters.getString("foo");
-	 *     }
-	 *     
-	 *     public boolean filter(String value) {
-	 *         return value.equals(searchString);
-	 *     }
-	 * }
-	 * </blockquote></pre>
-	 * <p>
-	 * By default, this method does nothing.
-	 * 
-	 * @param parameters The configuration containing the parameters attached to the contract. 
-	 * 
-	 * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
-	 *                   runtime catches an exception, it aborts the task and lets the fail-over logic
-	 *                   decide whether to retry the task execution.
-	 * 
-	 * @see org.apache.flink.configuration.Configuration
-	 */
-	void open(Configuration parameters) throws Exception;
 
-	/**
-	 * Teardown method for the user code. It is called after the last call to the main working methods
-	 * (e.g. <i>map</i> or <i>join</i>). For functions that  are part of an iteration, this method will
-	 * be invoked after each iteration superstep.
-	 * <p>
-	 * This method can be used for clean up work.
-	 * 
-	 * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
-	 *                   runtime catches an exception, it aborts the task and lets the fail-over logic
-	 *                   decide whether to retry the task execution.
-	 */
-	void close() throws Exception;
-	
-	
-	/**
-	 * Gets the context that contains information about the UDF's runtime.
-	 * 
-	 * Context information are for example {@link org.apache.flink.api.common.accumulators.Accumulator}s
-	 * or the {@link org.apache.flink.api.common.cache.DistributedCache}.
-	 * 
-	 * @return The UDF's runtime context.
-	 */
-	RuntimeContext getRuntimeContext();
-	
-	/**
-	 * Sets the function's runtime context. Called by the framework when creating a parallel instance of the function.
-	 *  
-	 * @param t The runtime context.
-	 */
-	void setRuntimeContext(RuntimeContext t);
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCoGrouper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCoGrouper.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCoGrouper.java
deleted file mode 100644
index 59669a2..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCoGrouper.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.functions;
-
-import java.util.Iterator;
-
-import org.apache.flink.util.Collector;
-
-
-public interface GenericCoGrouper<V1, V2, O> extends Function {
-	
-	/**
-	 * This method must be implemented to provide a user implementation of a
-	 * coGroup. It is called for each two key-value pairs that share the same
-	 * key and come from different inputs.
-	 * 
-	 * @param records1 The records from the first input which were paired with the key.
-	 * @param records2 The records from the second input which were paired with the key.
-	 * @param out A collector that collects all output pairs.
-	 */
-	void coGroup(Iterator<V1> records1, Iterator<V2> records2, Collector<O> out) throws Exception;
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
index ada4eeb..41cfa1d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
@@ -23,7 +23,7 @@ import org.apache.flink.util.Collector;
 
 
 
-public interface GenericCollectorMap<T, O> extends Function {
+public interface GenericCollectorMap<T, O> extends RichFunction {
 	
 	void map(T record, Collector<O> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCombine.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCombine.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCombine.java
deleted file mode 100644
index 8dfe758..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCombine.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.functions;
-
-import java.util.Iterator;
-
-import org.apache.flink.util.Collector;
-
-/**
- * Generic interface used for combiners.
- */
-public interface GenericCombine<T> extends Function {
-
-	void combine(Iterator<T> records, Collector<T> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCrosser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCrosser.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCrosser.java
deleted file mode 100644
index 3de9b1d..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCrosser.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.functions;
-
-import org.apache.flink.util.Collector;
-
-
-/**
- * @param <V1> First input type
- * @param <V2> Second input type
- * @param <O> Output type
- */
-public interface GenericCrosser<V1, V2, O> extends Function {
-
-	/**
-	 * User defined function for the cross operator.
-	 * 
-	 * @param record1 Record from first input
-	 * @param record2 Record from the second input
-	 * @param out Collector to submit resulting records.
-	 * @throws Exception
-	 */
-	void cross(V1 record1, V2 record2, Collector<O> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericFilter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericFilter.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericFilter.java
deleted file mode 100644
index f34b038..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericFilter.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.functions;
-
-
-public interface GenericFilter<T> extends Function {
-	
-	/**
-	 * User defined function for a filter.
-	 * 
-	 * @param value Incoming tuples
-	 * @return true for tuples that are allowed to pass the filter
-	 * @throws Exception
-	 */
-	boolean filter(T value) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericFlatMap.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericFlatMap.java
deleted file mode 100644
index efb1d49..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericFlatMap.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.functions;
-
-import org.apache.flink.util.Collector;
-
-
-/**
- *
- * @param <T>
- * @param <O>
- */
-public interface GenericFlatMap<T, O> extends Function {
-	
-	/**
-	 * User defined function to perform transformations on records.
-	 * This method allows to submit an arbitrary number of records
-	 * per incoming tuple.
-	 * 
-	 * @param record incoming record
-	 * @param out outgoing collector to return none, one or more records
-	 * @throws Exception
-	 */
-	void flatMap(T record, Collector<O> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericGroupReduce.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericGroupReduce.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericGroupReduce.java
deleted file mode 100644
index e8d9910..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericGroupReduce.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.functions;
-
-import java.util.Iterator;
-
-import org.apache.flink.util.Collector;
-
-
-/**
- *
- * @param <T> Incoming types
- * @param <O> Outgoing types
- */
-public interface GenericGroupReduce<T, O> extends Function {
-	/**
-	 * 
-	 * The central function to be implemented for a reducer. The function receives per call one
-	 * key and all the values that belong to that key. Each key is guaranteed to be processed by exactly
-	 * one function call across all involved instances across all computing nodes.
-	 * 
-	 * @param records All records that belong to the given input key.
-	 * @param out The collector to hand results to.
-	 * @throws Exception
-	 */
-	void reduce(Iterator<T> records, Collector<O> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericJoiner.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericJoiner.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericJoiner.java
deleted file mode 100644
index 77c2ac9..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericJoiner.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.functions;
-
-import org.apache.flink.util.Collector;
-
-
-public interface GenericJoiner<V1, V2, O> extends Function {
-	
-	void join(V1 value1, V2 value2, Collector<O> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericMap.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericMap.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericMap.java
deleted file mode 100644
index 316bf5d..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericMap.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.functions;
-
-
-public interface GenericMap<T, O> extends Function {
-	
-	/**
-	 * A user-implemented function that modifies or transforms an incoming object and
-	 * returns the result.
-	 */
-	O map(T record) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericReduce.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericReduce.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericReduce.java
deleted file mode 100644
index 9e75f2e..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericReduce.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.functions;
-
-
-public interface GenericReduce<T> extends Function {
-	
-	T reduce(T value1, T value2) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
new file mode 100644
index 0000000..984d1fd
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.apache.flink.util.Collector;
+
+
+/**
+ *
+ * @param <T> Incoming types
+ * @param <O> Outgoing types
+ */
+public interface GroupReduceFunction<T, O> extends Function, Serializable {
+	/**
+	 * 
+	 * The central function to be implemented for a reducer. The function receives per call one
+	 * key and all the values that belong to that key. Each key is guaranteed to be processed by exactly
+	 * one function call across all involved instances across all computing nodes.
+	 * 
+	 * @param records All records that belong to the given input key.
+	 * @param out The collector to hand results to.
+	 * @throws Exception
+	 */
+	void reduce(Iterator<T> values, Collector<O> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
new file mode 100644
index 0000000..02f526a
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+
+public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {
+
+	OUT join(IN1 first, IN2 second) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
new file mode 100644
index 0000000..4e2520d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+
+import java.io.Serializable;
+
+public interface MapFunction<T, O> extends Function, Serializable {
+
+	/**
+	 * The core method of Mappable. Takes an element from the input data set and transforms
+	 * it into exactly one element.
+	 *
+	 * @param value The input value.
+	 * @returns  The transformed value
+	 *
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+	 *                   to fail and may trigger recovery.
+	 */
+	O map(T value) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
new file mode 100644
index 0000000..04f690a
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+
+import java.io.Serializable;
+
+public interface ReduceFunction<T> extends Function, Serializable {
+
+	/**
+	 * The core method of Reducible, combining two values into one value of the same type.
+	 * The reduce function is consecutively applied to all values of a group until only a single value remains.
+	 *
+	 * @param value1 The first value to combine.
+	 * @param value2 The second value to combine.
+	 * @return The combined value of both input values.
+	 *
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+	 *                   to fail and may trigger recovery.
+	 */
+	T reduce(T value1, T value2) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
new file mode 100644
index 0000000..ffc3ac2
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * An base interface for all rich user-defined functions. This class defines methods for
+ * the life cycle of the functions, as well as methods to access the context in which the functions
+ * are executed.
+ */
+public interface RichFunction extends Function {
+	
+	/**
+	 * Initialization method for the function. It is called before the actual working methods 
+	 * (like <i>map</i> or <i>join</i>) and thus suitable for one time setup work. For functions that
+	 * are part of an iteration, this method will be invoked at the beginning of each iteration superstep.
+	 * <p>
+	 * The configuration object passed to the function can be used for configuration and initialization.
+	 * The configuration contains all parameters that were configured on the function in the program
+	 * composition.
+	 * 
+	 * <pre><blockquote>
+	 * public class MyMapper extends FilterFunction<String> {
+	 * 
+	 *     private String searchString;
+	 *     
+	 *     public void open(Configuration parameters) {
+	 *         this.searchString = parameters.getString("foo");
+	 *     }
+	 *     
+	 *     public boolean filter(String value) {
+	 *         return value.equals(searchString);
+	 *     }
+	 * }
+	 * </blockquote></pre>
+	 * <p>
+	 * By default, this method does nothing.
+	 * 
+	 * @param parameters The configuration containing the parameters attached to the contract. 
+	 * 
+	 * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
+	 *                   runtime catches an exception, it aborts the task and lets the fail-over logic
+	 *                   decide whether to retry the task execution.
+	 * 
+	 * @see org.apache.flink.configuration.Configuration
+	 */
+	void open(Configuration parameters) throws Exception;
+
+	/**
+	 * Teardown method for the user code. It is called after the last call to the main working methods
+	 * (e.g. <i>map</i> or <i>join</i>). For functions that  are part of an iteration, this method will
+	 * be invoked after each iteration superstep.
+	 * <p>
+	 * This method can be used for clean up work.
+	 * 
+	 * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
+	 *                   runtime catches an exception, it aborts the task and lets the fail-over logic
+	 *                   decide whether to retry the task execution.
+	 */
+	void close() throws Exception;
+	
+	
+	/**
+	 * Gets the context that contains information about the UDF's runtime.
+	 * 
+	 * Context information are for example {@link org.apache.flink.api.common.accumulators.Accumulator}s
+	 * or the {@link org.apache.flink.api.common.cache.DistributedCache}.
+	 * 
+	 * @return The UDF's runtime context.
+	 */
+	RuntimeContext getRuntimeContext();
+	
+	/**
+	 * Sets the function's runtime context. Called by the framework when creating a parallel instance of the function.
+	 *  
+	 * @param t The runtime context.
+	 */
+	void setRuntimeContext(RuntimeContext t);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index a5f0b19..3cf30ff 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -34,7 +34,7 @@ import org.apache.flink.api.common.cache.DistributedCache;
  * the current degree of parallelism) and other constructs like accumulators and broadcast variables.
  * <p>
  * A function can, during runtime, obtain the RuntimeContext via a call to
- * {@link org.apache.flink.api.common.functions.AbstractFunction#getRuntimeContext()}.
+ * {@link AbstractRichFunction#getRuntimeContext()}.
  */
 public interface RuntimeContext {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
new file mode 100644
index 0000000..bc4ffd0
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions.util;
+
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+
+import java.lang.invoke.SerializedLambda;
+import java.lang.reflect.Method;
+
+public class FunctionUtils {
+
+
+	public static void openFunction (Function function, Configuration parameters) throws Exception{
+		if (function instanceof RichFunction) {
+			RichFunction richFunction = (RichFunction) function;
+			richFunction.open (parameters);
+		}
+	}
+
+	public static void closeFunction (Function function) throws Exception{
+		if (function instanceof RichFunction) {
+			RichFunction richFunction = (RichFunction) function;
+			richFunction.close ();
+		}
+	}
+
+	public static void setFunctionRuntimeContext (Function function, RuntimeContext context){
+		if (function instanceof RichFunction) {
+			RichFunction richFunction = (RichFunction) function;
+			richFunction.setRuntimeContext(context);
+		}
+	}
+
+	public static RuntimeContext getFunctionRuntimeContext (Function function, RuntimeContext defaultContext){
+		if (function instanceof RichFunction) {
+			RichFunction richFunction = (RichFunction) function;
+			return richFunction.getRuntimeContext();
+		}
+		else {
+			return defaultContext;
+		}
+	}
+
+	public static boolean isSerializedLambdaFunction(Function function) {
+		Class<?> clazz = function.getClass();
+		try {
+			Method replaceMethod = clazz.getDeclaredMethod("writeReplace");
+			replaceMethod.setAccessible(true);
+			Object serializedForm = replaceMethod.invoke(function);
+			if (serializedForm instanceof SerializedLambda) {
+				return true;
+			}
+			else {
+				return false;
+			}
+		}
+		catch (Exception e) {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
index b140dda..c416765 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
@@ -19,13 +19,13 @@
 
 package org.apache.flink.api.common.operators;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 
 /**
  * This operator represents a Union between two inputs.
  */
-public class Union<T> extends DualInputOperator<T, T, T, AbstractFunction> {
+public class Union<T> extends DualInputOperator<T, T, T, AbstractRichFunction> {
 	
 	private final static String NAME = "Union";
 	
@@ -34,7 +34,7 @@ public class Union<T> extends DualInputOperator<T, T, T, AbstractFunction> {
 	 */
 	public Union(BinaryOperatorInformation<T, T, T> operatorInfo) {
 		// we pass it an AbstractFunction, because currently all operators expect some form of UDF
-		super(new UserCodeClassWrapper<AbstractFunction>(AbstractFunction.class), operatorInfo, NAME);
+		super(new UserCodeClassWrapper<AbstractRichFunction>(AbstractRichFunction.class), operatorInfo, NAME);
 	}
 	
 	public Union(Operator<T> input1, Operator<T> input2) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
index ffcab50..66bea7f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.aggregators.AggregatorRegistry;
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.GenericCollectorMap;
 import org.apache.flink.api.common.operators.IterationOperator;
 import org.apache.flink.api.common.operators.Operator;
@@ -48,7 +48,7 @@ import org.apache.flink.util.Visitor;
 /**
  * 
  */
-public class BulkIterationBase<T> extends SingleInputOperator<T, T, AbstractFunction> implements IterationOperator {
+public class BulkIterationBase<T> extends SingleInputOperator<T, T, AbstractRichFunction> implements IterationOperator {
 	
 	private static String DEFAULT_NAME = "<Unnamed Bulk Iteration>";
 	
@@ -78,7 +78,7 @@ public class BulkIterationBase<T> extends SingleInputOperator<T, T, AbstractFunc
 	 * @param name
 	 */
 	public BulkIterationBase(UnaryOperatorInformation<T, T> operatorInfo, String name) {
-		super(new UserCodeClassWrapper<AbstractFunction>(AbstractFunction.class), operatorInfo, name);
+		super(new UserCodeClassWrapper<AbstractRichFunction>(AbstractRichFunction.class), operatorInfo, name);
 		inputPlaceHolder = new PartialSolutionPlaceHolder<T>(this, this.getOperatorInfo());
 	}
 
@@ -230,7 +230,7 @@ public class BulkIterationBase<T> extends SingleInputOperator<T, T, AbstractFunc
 	/**
 	 * Special Mapper that is added before a termination criterion and is only a container for an special aggregator
 	 */
-	public static class TerminationCriterionMapper<X> extends AbstractFunction implements Serializable, GenericCollectorMap<X, Nothing> {
+	public static class TerminationCriterionMapper<X> extends AbstractRichFunction implements Serializable, GenericCollectorMap<X, Nothing> {
 		private static final long serialVersionUID = 1L;
 		
 		private TerminationCriterionAggregator aggregator;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
index a9ae97c..4b85a31 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.api.common.operators.base;
 
-import org.apache.flink.api.common.functions.GenericCoGrouper;
+import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.DualInputOperator;
 import org.apache.flink.api.common.operators.Ordering;
@@ -28,9 +28,9 @@ import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 
 /**
- * @see GenericCoGrouper
+ * @see org.apache.flink.api.common.functions.CoGroupFunction
  */
-public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends GenericCoGrouper<IN1, IN2, OUT>> extends DualInputOperator<IN1, IN2, OUT, FT> {
+public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1, IN2, OUT>> extends DualInputOperator<IN1, IN2, OUT, FT> {
 	
 	/**
 	 * The ordering for the order inside a group from input one.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
index 33e150d..a66ea72 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.api.common.operators.base;
 
-import org.apache.flink.api.common.functions.GenericCrosser;
+import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.DualInputOperator;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
@@ -28,9 +28,9 @@ import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 
 
 /**
- * @see GenericCrosser
+ * @see org.apache.flink.api.common.functions.CrossFunction
  */
-public class CrossOperatorBase<IN1, IN2, OUT, FT extends GenericCrosser<?, ?, ?>> extends DualInputOperator<IN1, IN2, OUT, FT> {
+public class CrossOperatorBase<IN1, IN2, OUT, FT extends CrossFunction<?, ?, ?>> extends DualInputOperator<IN1, IN2, OUT, FT> {
 	
 	public CrossOperatorBase(UserCodeWrapper<FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, String name) {
 		super(udf, operatorInfo, name);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
index 89e5008..8e955b1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
@@ -23,7 +23,7 @@ import java.util.Collections;
 import java.util.Map;
 
 import org.apache.flink.api.common.aggregators.AggregatorRegistry;
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.DualInputOperator;
 import org.apache.flink.api.common.operators.IterationOperator;
@@ -51,7 +51,7 @@ import org.apache.flink.util.Visitor;
  * This class is a subclass of {@code DualInputOperator}. The solution set is considered the first input, the
  * workset is considered the second input.
  */
-public class DeltaIterationBase<ST, WT> extends DualInputOperator<ST, WT, ST, AbstractFunction> implements IterationOperator {
+public class DeltaIterationBase<ST, WT> extends DualInputOperator<ST, WT, ST, AbstractRichFunction> implements IterationOperator {
 
 	private final Operator<ST> solutionSetPlaceholder;
 
@@ -88,7 +88,7 @@ public class DeltaIterationBase<ST, WT> extends DualInputOperator<ST, WT, ST, Ab
 	}
 
 	public DeltaIterationBase(BinaryOperatorInformation<ST, WT, ST> operatorInfo, int[] keyPositions, String name) {
-		super(new UserCodeClassWrapper<AbstractFunction>(AbstractFunction.class), operatorInfo, name);
+		super(new UserCodeClassWrapper<AbstractRichFunction>(AbstractRichFunction.class), operatorInfo, name);
 		this.solutionSetKeyFields = keyPositions;
 		solutionSetPlaceholder = new SolutionSetPlaceHolder<ST>(this, new OperatorInformation<ST>(operatorInfo.getFirstInputType()));
 		worksetPlaceholder = new WorksetPlaceHolder<WT>(this, new OperatorInformation<WT>(operatorInfo.getSecondInputType()));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
index 34896a2..3c28c43 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.api.common.operators.base;
 
-import org.apache.flink.api.common.functions.GenericFlatMap;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
@@ -28,9 +28,9 @@ import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 
 
 /**
- * @see GenericFlatMap
+ * @see org.apache.flink.api.common.functions.FlatMapFunction
  */
-public class FilterOperatorBase<T, FT extends GenericFlatMap<T, T>> extends SingleInputOperator<T, T, FT> {
+public class FilterOperatorBase<T, FT extends FlatMapFunction<T, T>> extends SingleInputOperator<T, T, FT> {
 	
 	public FilterOperatorBase(UserCodeWrapper<FT> udf, UnaryOperatorInformation<T, T> operatorInfo, String name) {
 		super(udf, operatorInfo, name);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
index 0de236e..89575b6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.api.common.operators.base;
 
-import org.apache.flink.api.common.functions.GenericFlatMap;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
@@ -28,9 +28,9 @@ import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 
 
 /**
- * @see GenericFlatMap
+ * @see org.apache.flink.api.common.functions.FlatMapFunction
  */
-public class FlatMapOperatorBase<IN, OUT, FT extends GenericFlatMap<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
+public class FlatMapOperatorBase<IN, OUT, FT extends FlatMapFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
 	
 	public FlatMapOperatorBase(UserCodeWrapper<FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
 		super(udf, operatorInfo, name);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
index a24826a..ac55489 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
@@ -19,8 +19,9 @@
 
 package org.apache.flink.api.common.operators.base;
 
-import org.apache.flink.api.common.functions.GenericCombine;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
@@ -30,9 +31,9 @@ import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 
 
 /**
- * @see GenericGroupReduce
+ * @see org.apache.flink.api.common.functions.GroupReduceFunction
  */
-public class GroupReduceOperatorBase<IN, OUT, FT extends GenericGroupReduce<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
+public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
 
 	/**
 	 * The ordering for the order inside a reduce group.
@@ -91,15 +92,15 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GenericGroupReduce<IN,
 	/**
 	 * Marks the group reduce operation as combinable. Combinable operations may pre-reduce the
 	 * data before the actual group reduce operations. Combinable user-defined functions
-	 * must implement the interface {@link GenericCombine}.
+	 * must implement the interface {@link org.apache.flink.api.common.functions.FlatCombineFunction}.
 	 * 
 	 * @param combinable Flag to mark the group reduce operation as combinable.
 	 */
 	public void setCombinable(boolean combinable) {
 		// sanity check
-		if (combinable && !GenericCombine.class.isAssignableFrom(this.userFunction.getUserCodeClass())) {
-			throw new IllegalArgumentException("Cannot set a UDF as combinable if it does not implement the interface " + 
-					GenericCombine.class.getName());
+		if (combinable && !FlatCombineFunction.class.isAssignableFrom(this.userFunction.getUserCodeClass())) {
+			throw new IllegalArgumentException("Cannot set a UDF as combinable if it does not implement the interface " +
+					FlatCombineFunction.class.getName());
 		} else {
 			this.combinable = combinable;
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
index b4eeeaa..2ce0529 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.api.common.operators.base;
 
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.DualInputOperator;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
@@ -27,9 +27,9 @@ import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 
 /**
- * @see GenericJoiner
+ * @see org.apache.flink.api.common.functions.FlatJoinFunction
  */
-public class JoinOperatorBase<IN1, IN2, OUT, FT extends GenericJoiner<IN1, IN2, OUT>> extends DualInputOperator<IN1, IN2, OUT, FT>
+public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN2, OUT>> extends DualInputOperator<IN1, IN2, OUT, FT>
 {
 	public JoinOperatorBase(UserCodeWrapper<FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, int[] keyPositions1, int[] keyPositions2, String name) {
 		super(udf, operatorInfo, keyPositions1, keyPositions2, name);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
index efd8fa9..26fde05 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.api.common.operators.base;
 
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
@@ -33,7 +33,7 @@ import org.apache.flink.api.common.operators.util.UserCodeWrapper;
  * @param <OUT> The result type.
  * @param <FT> The type of the user-defined function.
  */
-public class MapOperatorBase<IN, OUT, FT extends GenericMap<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
+public class MapOperatorBase<IN, OUT, FT extends MapFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
 	
 	public MapOperatorBase(UserCodeWrapper<FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
 		super(udf, operatorInfo, name);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
index 62996ea..e6c435f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.api.common.operators.base;
 
-import org.apache.flink.api.common.functions.GenericReduce;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
@@ -31,12 +31,12 @@ import org.apache.flink.api.common.operators.util.UserCodeWrapper;
  * Base data flow operator for Reduce user-defined functions. Accepts reduce functions
  * and key positions. The key positions are expected in the flattened common data model.
  * 
- * @see GenericReduce
+ * @see org.apache.flink.api.common.functions.ReduceFunction
  *
  * @param <T> The type (parameters and return type) of the reduce function.
  * @param <FT> The type of the reduce function.
  */
-public class ReduceOperatorBase<T, FT extends GenericReduce<T>> extends SingleInputOperator<T, T, FT> {
+public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleInputOperator<T, T, FT> {
 
 	/**
 	 * Creates a grouped reduce data flow operator.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java
index 2ebae24..7d6495b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java
@@ -25,11 +25,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import org.apache.flink.api.common.functions.GenericCoGrouper;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.GenericCollectorMap;
-import org.apache.flink.api.common.functions.GenericCrosser;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.io.InputFormat;
@@ -56,10 +56,10 @@ public class OperatorUtil {
 
 	static {
 		STUB_CONTRACTS.put(GenericCollectorMap.class, CollectorMapOperatorBase.class);
-		STUB_CONTRACTS.put(GenericGroupReduce.class, GroupReduceOperatorBase.class);
-		STUB_CONTRACTS.put(GenericCoGrouper.class, CoGroupOperatorBase.class);
-		STUB_CONTRACTS.put(GenericCrosser.class, CrossOperatorBase.class);
-		STUB_CONTRACTS.put(GenericJoiner.class, JoinOperatorBase.class);
+		STUB_CONTRACTS.put(GroupReduceFunction.class, GroupReduceOperatorBase.class);
+		STUB_CONTRACTS.put(CoGroupFunction.class, CoGroupOperatorBase.class);
+		STUB_CONTRACTS.put(CrossFunction.class, CrossOperatorBase.class);
+		STUB_CONTRACTS.put(FlatJoinFunction.class, JoinOperatorBase.class);
 		STUB_CONTRACTS.put(FileInputFormat.class, GenericDataSourceBase.class);
 		STUB_CONTRACTS.put(FileOutputFormat.class, GenericDataSinkBase.class);
 		STUB_CONTRACTS.put(InputFormat.class, GenericDataSourceBase.class);
@@ -67,7 +67,7 @@ public class OperatorUtil {
 	}
 
 	/**
-	 * Returns the associated {@link Operator} type for the given {@link org.apache.flink.api.common.functions.Function} class.
+	 * Returns the associated {@link Operator} type for the given {@link org.apache.flink.api.common.functions.RichFunction} class.
 	 * 
 	 * @param stubClass
 	 *        the stub class

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java
index 647ceab..30091ab 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java
@@ -21,12 +21,12 @@ package org.apache.flink.api.common.operators.util;
 
 import static org.junit.Assert.assertEquals;
 
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.GenericCoGrouper;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.GenericCollectorMap;
-import org.apache.flink.api.common.functions.GenericCrosser;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.io.DelimitedInputFormat;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
@@ -36,7 +36,6 @@ import org.apache.flink.api.common.operators.base.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.base.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.common.operators.util.OperatorUtil;
 import org.apache.flink.types.IntValue;
 import org.junit.Test;
 
@@ -85,7 +84,7 @@ public class OperatorUtilTest {
 	 */
 	@Test
 	public void getContractClassShouldReturnNullForStub() {
-		final Class<?> result = OperatorUtil.getContractClass(Function.class);
+		final Class<?> result = OperatorUtil.getContractClass(RichFunction.class);
 		assertEquals(null, result);
 	}
 
@@ -116,13 +115,13 @@ public class OperatorUtilTest {
 		assertEquals(GenericDataSourceBase.class, result);
 	}
 
-	static abstract class CoGrouper implements GenericCoGrouper<IntValue, IntValue, IntValue> {}
+	static abstract class CoGrouper implements CoGroupFunction<IntValue, IntValue, IntValue> {}
 
-	static abstract class Crosser implements GenericCrosser<IntValue, IntValue, IntValue> {}
+	static abstract class Crosser implements CrossFunction<IntValue, IntValue, IntValue> {}
 
 	static abstract class Mapper implements GenericCollectorMap<IntValue, IntValue> {}
 
-	static abstract class Matcher implements GenericJoiner<IntValue, IntValue, IntValue> {}
+	static abstract class Matcher implements FlatJoinFunction<IntValue, IntValue, IntValue> {}
 
-	static abstract class Reducer implements GenericGroupReduce<IntValue, IntValue> {}
+	static abstract class Reducer implements GroupReduceFunction<IntValue, IntValue> {}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/pom.xml b/flink-examples/flink-java-examples/pom.xml
index 549e95b..ea0db5d 100644
--- a/flink-examples/flink-java-examples/pom.xml
+++ b/flink-examples/flink-java-examples/pom.xml
@@ -317,7 +317,6 @@ under the License.
 					
 				</executions>
 			</plugin>
-
 		</plugins>
 	</build>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/clustering/KMeans.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/clustering/KMeans.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/clustering/KMeans.java
index 4c22db1..8767aca 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/clustering/KMeans.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/clustering/KMeans.java
@@ -21,8 +21,9 @@ package org.apache.flink.example.java.clustering;
 import java.io.Serializable;
 import java.util.Collection;
 
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
@@ -197,7 +198,7 @@ public class KMeans {
 	// *************************************************************************
 	
 	/** Converts a Tuple2<Double,Double> into a Point. */
-	public static final class TuplePointConverter extends MapFunction<Tuple2<Double, Double>, Point> {
+	public static final class TuplePointConverter implements MapFunction<Tuple2<Double, Double>, Point> {
 
 		@Override
 		public Point map(Tuple2<Double, Double> t) throws Exception {
@@ -206,7 +207,7 @@ public class KMeans {
 	}
 	
 	/** Converts a Tuple3<Integer, Double,Double> into a Centroid. */
-	public static final class TupleCentroidConverter extends MapFunction<Tuple3<Integer, Double, Double>, Centroid> {
+	public static final class TupleCentroidConverter implements MapFunction<Tuple3<Integer, Double, Double>, Centroid> {
 
 		@Override
 		public Centroid map(Tuple3<Integer, Double, Double> t) throws Exception {
@@ -215,7 +216,7 @@ public class KMeans {
 	}
 	
 	/** Determines the closest cluster center for a data point. */
-	public static final class SelectNearestCenter extends MapFunction<Point, Tuple2<Integer, Point>> {
+	public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple2<Integer, Point>> {
 		private Collection<Centroid> centroids;
 
 		/** Reads the centroid values from a broadcast variable into a collection. */
@@ -248,7 +249,7 @@ public class KMeans {
 	}
 	
 	/** Appends a count variable to the tuple. */ 
-	public static final class CountAppender extends MapFunction<Tuple2<Integer, Point>, Tuple3<Integer, Point, Long>> {
+	public static final class CountAppender implements MapFunction<Tuple2<Integer, Point>, Tuple3<Integer, Point, Long>> {
 
 		@Override
 		public Tuple3<Integer, Point, Long> map(Tuple2<Integer, Point> t) {
@@ -257,7 +258,7 @@ public class KMeans {
 	}
 	
 	/** Sums and counts point coordinates. */
-	public static final class CentroidAccumulator extends ReduceFunction<Tuple3<Integer, Point, Long>> {
+	public static final class CentroidAccumulator implements ReduceFunction<Tuple3<Integer, Point, Long>> {
 
 		@Override
 		public Tuple3<Integer, Point, Long> reduce(Tuple3<Integer, Point, Long> val1, Tuple3<Integer, Point, Long> val2) {
@@ -266,7 +267,7 @@ public class KMeans {
 	}
 	
 	/** Computes new centroid from coordinate sum and count of points. */
-	public static final class CentroidAverager extends MapFunction<Tuple3<Integer, Point, Long>, Centroid> {
+	public static final class CentroidAverager implements MapFunction<Tuple3<Integer, Point, Long>, Centroid> {
 
 		@Override
 		public Centroid map(Tuple3<Integer, Point, Long> value) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/ConnectedComponents.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/ConnectedComponents.java
index b71347a..0d38e06 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/ConnectedComponents.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/ConnectedComponents.java
@@ -20,10 +20,11 @@
 package org.apache.flink.example.java.graph;
 
 import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.JoinFunction;
-import org.apache.flink.api.java.functions.MapFunction;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsSecond;
@@ -127,7 +128,7 @@ public class ConnectedComponents implements ProgramDescription {
 	 * Function that turns a value into a 2-tuple where both fields are that value.
 	 */
 	@ConstantFields("0 -> 0,1") 
-	public static final class DuplicateValue<T> extends MapFunction<T, Tuple2<T, T>> {
+	public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {
 		
 		@Override
 		public Tuple2<T, T> map(T vertex) {
@@ -138,7 +139,7 @@ public class ConnectedComponents implements ProgramDescription {
 	/**
 	 * Undirected edges by emitting for each input edge the input edges itself and an inverted version.
 	 */
-	public static final class UndirectEdge extends FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+	public static final class UndirectEdge implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 		Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();
 		
 		@Override
@@ -157,7 +158,7 @@ public class ConnectedComponents implements ProgramDescription {
 	 */
 	@ConstantFieldsFirst("1 -> 0")
 	@ConstantFieldsSecond("1 -> 1")
-	public static final class NeighborWithComponentIDJoin extends JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+	public static final class NeighborWithComponentIDJoin implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
 		@Override
 		public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
@@ -165,11 +166,10 @@ public class ConnectedComponents implements ProgramDescription {
 		}
 	}
 	
-	/**
-	 * The input is nested tuples ( (vertex-id, candidate-component) , (vertex-id, current-component) )
-	 */
+
+
 	@ConstantFieldsFirst("0")
-	public static final class ComponentIdFilter extends JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+	public static final class ComponentIdFilter implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
 		@Override
 		public void join(Tuple2<Long, Long> candidate, Tuple2<Long, Long> old, Collector<Tuple2<Long, Long>> out) {
@@ -177,10 +177,10 @@ public class ConnectedComponents implements ProgramDescription {
 				out.collect(candidate);
 			}
 		}
-		@Override
-		public Tuple2<Long, Long> join(Tuple2<Long, Long> first, Tuple2<Long, Long> second) { return null; }
 	}
 
+
+
 	@Override
 	public String getDescription() {
 		return "Parameters: <vertices-path> <edges-path> <result-path> <max-number-of-iterations>";

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java
index fba18fc..2d794bd 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java
@@ -22,10 +22,10 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.JoinFunction;
-import org.apache.flink.api.java.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 import org.apache.flink.api.java.DataSet;
@@ -119,7 +119,7 @@ public class EnumTrianglesBasic {
 	// *************************************************************************
 
 	/** Converts a Tuple2 into an Edge */
-	public static class TupleEdgeConverter extends MapFunction<Tuple2<Integer, Integer>, Edge> {
+	public static class TupleEdgeConverter implements MapFunction<Tuple2<Integer, Integer>, Edge> {
 		private final Edge outEdge = new Edge();
 		
 		@Override
@@ -130,7 +130,7 @@ public class EnumTrianglesBasic {
 	}
 	
 	/** Projects an edge (pair of vertices) such that the id of the first is smaller than the id of the second. */
-	private static class EdgeByIdProjector extends MapFunction<Edge, Edge> {
+	private static class EdgeByIdProjector implements MapFunction<Edge, Edge> {
 	
 		@Override
 		public Edge map(Edge inEdge) throws Exception {
@@ -149,7 +149,7 @@ public class EnumTrianglesBasic {
 	 *  The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId. 
 	 *  Assumes that input edges share the first vertex and are in ascending order of the second vertex.
 	 */
-	private static class TriadBuilder extends GroupReduceFunction<Edge, Triad> {
+	private static class TriadBuilder implements GroupReduceFunction<Edge, Triad> {
 		private final List<Integer> vertices = new ArrayList<Integer>();
 		private final Triad outTriad = new Triad();
 		
@@ -180,7 +180,7 @@ public class EnumTrianglesBasic {
 	}
 	
 	/** Filters triads (three vertices connected by two edges) without a closing third edge. */
-	private static class TriadFilter extends JoinFunction<Triad, Edge, Triad> {
+	private static class TriadFilter implements JoinFunction<Triad, Edge, Triad> {
 		
 		@Override
 		public Triad join(Triad triad, Edge edge) throws Exception {