You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/01 02:36:52 UTC

[11/16] git commit: [FLINK-701] Several cleanups after SAM refactoring. - Lambda detection compiles on earlier java versions - Add lambda detection test. - Fix JavaDocs

[FLINK-701] Several cleanups after SAM refactoring.
  - Lambda detection compiles on earlier java versions
  - Add lambda detection test.
  - Fix JavaDocs


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/bc89e911
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/bc89e911
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/bc89e911

Branch: refs/heads/master
Commit: bc89e911b2cd433bb841beab9e8e513ca0c3525d
Parents: 22b24f2
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jul 31 19:46:14 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jul 31 20:18:00 2014 +0200

----------------------------------------------------------------------
 .../examples/SpargelConnectedComponents.java    |   4 +-
 .../spargel/java/examples/SpargelPageRank.java  |   8 +-
 .../SpargelPageRankCountingVertices.java        |  11 +-
 .../apache/flink/client/web/JobsServlet.java    |   1 -
 .../org/apache/flink/compiler/PactCompiler.java |   6 +-
 .../apache/flink/compiler/CompilerTestBase.java |   6 +-
 .../flink/compiler/util/DummyCrossStub.java     |   4 -
 .../common/functions/AbstractRichFunction.java  |   8 +-
 .../api/common/functions/CoGroupFunction.java   |  38 +++++--
 .../api/common/functions/CombineFunction.java   |  24 ++++-
 .../api/common/functions/CrossFunction.java     |  38 +++++--
 .../api/common/functions/ExecutionContext.java  |  51 ----------
 .../api/common/functions/FilterFunction.java    |  25 ++++-
 .../common/functions/FlatCombineFunction.java   |  22 +++-
 .../api/common/functions/FlatJoinFunction.java  |  45 +++++++-
 .../api/common/functions/FlatMapFunction.java   |  23 +++--
 .../flink/api/common/functions/Function.java    |   1 -
 .../common/functions/GenericCollectorMap.java   |  10 +-
 .../common/functions/GroupReduceFunction.java   |  34 +++++--
 .../api/common/functions/JoinFunction.java      |  40 +++++++-
 .../flink/api/common/functions/MapFunction.java |  21 +++-
 .../api/common/functions/ReduceFunction.java    |  26 ++++-
 .../api/common/functions/RichFunction.java      |   1 -
 .../api/common/functions/RuntimeContext.java    |  28 -----
 .../common/functions/util/FunctionUtils.java    |  38 ++++---
 .../flink/api/common/operators/Union.java       |   1 -
 .../operators/base/BulkIterationBase.java       |   1 +
 .../base/CollectorMapOperatorBase.java          |   2 +-
 .../api/common/operators/util/OperatorUtil.java |   3 +-
 .../common/operators/util/OperatorUtilTest.java |   5 +
 .../java/org/apache/flink/api/java/DataSet.java |  49 ++++-----
 .../flink/api/java/ExecutionEnvironment.java    |   2 +-
 .../api/java/functions/FlatMapIterator.java     |   2 +-
 .../api/java/functions/RichCoGroupFunction.java |  39 +------
 .../api/java/functions/RichCrossFunction.java   |  35 +------
 .../api/java/functions/RichFilterFunction.java  |  28 +----
 .../java/functions/RichFlatCombineFunction.java |   9 ++
 .../java/functions/RichFlatJoinFunction.java    |  43 +-------
 .../api/java/functions/RichFlatMapFunction.java |  28 +----
 .../java/functions/RichGroupReduceFunction.java |  32 +-----
 .../api/java/functions/RichJoinFunction.java    |  12 ++-
 .../api/java/functions/RichMapFunction.java     |  29 +-----
 .../api/java/functions/RichReduceFunction.java  |  35 +------
 .../api/java/operators/CoGroupOperator.java     |   2 +-
 .../flink/api/java/operators/CrossOperator.java |   2 +-
 .../api/java/operators/DistinctOperator.java    |  28 ++---
 .../api/java/operators/GroupReduceOperator.java |   7 +-
 .../flink/api/java/operators/Grouping.java      |   4 +-
 .../flink/api/java/operators/JoinOperator.java  |   6 +-
 .../api/java/operators/SortedGrouping.java      |   4 +-
 .../api/java/operators/UnsortedGrouping.java    |   2 +-
 .../PlanUnwrappingReduceGroupOperator.java      |   2 +-
 .../java/record/functions/CoGroupFunction.java  |   2 +-
 .../java/record/functions/CrossFunction.java    |   6 +-
 .../api/java/record/functions/JoinFunction.java |  15 +--
 .../api/java/record/functions/MapFunction.java  |   2 +-
 .../java/record/functions/ReduceFunction.java   |   2 +-
 .../api/java/record/operators/MapOperator.java  |   1 +
 .../DeltaIterationTranslationTest.java          |   2 +-
 .../translation/DistrinctTranslationTest.java   |  57 +++++++++++
 .../java/type/extractor/TypeExtractorTest.java  |   4 +
 .../javaApiOperators/lambdas/CoGroupITCase.java |   1 +
 .../javaApiOperators/lambdas/CrossITCase.java   |   1 +
 .../javaApiOperators/lambdas/FilterITCase.java  |  70 ++-----------
 .../lambdas/FlatJoinITCase.java                 |   1 +
 .../javaApiOperators/lambdas/FlatMapITCase.java |   1 +
 .../lambdas/GroupReduceITCase.java              |   1 +
 .../javaApiOperators/lambdas/JoinITCase.java    |   1 +
 .../lambdas/LambdaExtractionTest.java           |  82 +++++++++++++++
 .../javaApiOperators/lambdas/MapITCase.java     |   1 +
 .../javaApiOperators/lambdas/ReduceITCase.java  | 102 +++++--------------
 .../SolutionSetFastUpdateOutputCollector.java   |   6 +-
 .../io/SolutionSetUpdateOutputCollector.java    |   2 +-
 .../task/AbstractIterativePactTask.java         |   4 +-
 .../iterative/task/IterationHeadPactTask.java   |   7 +-
 .../task/IterationIntermediatePactTask.java     |   9 +-
 .../task/IterationSynchronizationSinkTask.java  |   4 +-
 .../iterative/task/IterationTailPactTask.java   |   9 +-
 .../operators/RuntimeExecutionContext.java      |  56 ----------
 .../flink/test/operators/CrossITCase.java       |  18 +---
 80 files changed, 658 insertions(+), 734 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
index a4ba6fa..0f1f26d 100644
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.spargel.java.examples;
 
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.spargel.java.MessageIterator;
 import org.apache.flink.spargel.java.MessagingFunction;
@@ -70,7 +70,7 @@ public class SpargelConnectedComponents {
 	 * A map function that takes a Long value and creates a 2-tuple out of it:
 	 * <pre>(Long value) -> (value, value)</pre>
 	 */
-	public static final class IdAssigner extends RichMapFunction<Long, Tuple2<Long, Long>> {
+	public static final class IdAssigner implements MapFunction<Long, Tuple2<Long, Long>> {
 		@Override
 		public Tuple2<Long, Long> map(Long value) {
 			return new Tuple2<Long, Long>(value, value);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
index 9dfc327..6d3dc95 100644
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.spargel.java.examples;
 
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.spargel.java.MessageIterator;
@@ -48,7 +48,7 @@ public class SpargelPageRank {
 		
 		// enumerate some sample edges and assign an initial uniform probability (rank)
 		DataSet<Tuple2<Long, Double>> intialRanks = env.generateSequence(1, numVertices)
-								.map(new RichMapFunction<Long, Tuple2<Long, Double>>() {
+								.map(new MapFunction<Long, Tuple2<Long, Double>>() {
 									public Tuple2<Long, Double> map(Long value) {
 										return new Tuple2<Long, Double>(value, 1.0/numVertices);
 									}
@@ -56,7 +56,7 @@ public class SpargelPageRank {
 		
 		// generate some random edges. the transition probability on each edge is 1/num-out-edges of the source vertex
 		DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = env.generateSequence(1, numVertices)
-								.flatMap(new RichFlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
+								.flatMap(new FlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
 									public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
 										int numOutEdges = (int) (Math.random() * (numVertices / 2));
 										for (int i = 0; i < numOutEdges; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
index 43c0b84..01d2cd7 100644
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.spargel.java.examples;
 
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.RichMapFunction;
-import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
@@ -53,7 +54,7 @@ public class SpargelPageRankCountingVertices {
 		
 		// generate some random edges. the transition probability on each edge is 1/num-out-edges of the source vertex
 		DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = env.generateSequence(1, NUM_VERTICES)
-								.flatMap(new RichFlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
+								.flatMap(new FlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
 									public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
 										int numOutEdges = (int) (Math.random() * (NUM_VERTICES / 2));
 										for (int i = 0; i < numOutEdges; i++) {
@@ -67,12 +68,12 @@ public class SpargelPageRankCountingVertices {
 		
 		// count the number of vertices
 		DataSet<Long> count = vertices
-			.map(new RichMapFunction<Long, Long>() {
+			.map(new MapFunction<Long, Long>() {
 				public Long map(Long value) {
 					return 1L;
 				}
 			})
-			.reduce(new RichReduceFunction<Long>() {
+			.reduce(new ReduceFunction<Long>() {
 				public Long reduce(Long value1, Long value2) {
 					return value1 + value2;
 				}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
index c62f0d7..cd53115 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
@@ -180,7 +180,6 @@ public class JobsServlet extends HttpServlet {
 		// parse the request
 		ServletFileUpload uploadHandler = new ServletFileUpload(fileItemFactory);
 		try {
-			@SuppressWarnings("unchecked")
 			Iterator<FileItem> itr = ((List<FileItem>) uploadHandler.parseRequest(req)).iterator();
 
 			// go over the form fields and look for our file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index b55dea0..e22a365 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -37,7 +37,6 @@ import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Union;
 import org.apache.flink.api.common.operators.base.BulkIterationBase;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
-import org.apache.flink.api.common.operators.base.CollectorMapOperatorBase;
 import org.apache.flink.api.common.operators.base.CrossOperatorBase;
 import org.apache.flink.api.common.operators.base.DeltaIterationBase;
 import org.apache.flink.api.common.operators.base.FilterOperatorBase;
@@ -648,6 +647,7 @@ public class PactCompiler {
 			this.forceDOP = forceDOP;
 		}
 
+		@SuppressWarnings("deprecation")
 		@Override
 		public boolean preVisit(Operator<?> c) {
 			// check if we have been here before
@@ -671,8 +671,8 @@ public class PactCompiler {
 			else if (c instanceof MapOperatorBase) {
 				n = new MapNode((MapOperatorBase<?, ?, ?>) c);
 			}
-			else if (c instanceof CollectorMapOperatorBase) {
-				n = new CollectorMapNode((CollectorMapOperatorBase<?, ?, ?>) c);
+			else if (c instanceof org.apache.flink.api.common.operators.base.CollectorMapOperatorBase) {
+				n = new CollectorMapNode((org.apache.flink.api.common.operators.base.CollectorMapOperatorBase<?, ?, ?>) c);
 			}
 			else if (c instanceof FlatMapOperatorBase) {
 				n = new FlatMapNode((FlatMapOperatorBase<?, ?, ?>) c);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java b/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
index b2c163b..0115c31 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
@@ -27,7 +27,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.base.GenericDataSourceBase;
@@ -179,7 +179,7 @@ public abstract class CompilerTestBase implements java.io.Serializable {
 		}
 		
 		@SuppressWarnings("unchecked")
-		public <T extends PlanNode> T getNode(String name, Class<? extends RichFunction> stubClass) {
+		public <T extends PlanNode> T getNode(String name, Class<? extends Function> stubClass) {
 			List<PlanNode> nodes = this.map.get(name);
 			if (nodes == null || nodes.isEmpty()) {
 				throw new RuntimeException("No node found with the given name and stub class.");
@@ -241,7 +241,7 @@ public abstract class CompilerTestBase implements java.io.Serializable {
 		}
 		
 		@SuppressWarnings("unchecked")
-		public <T extends Operator<?>> T getNode(String name, Class<? extends RichFunction> stubClass) {
+		public <T extends Operator<?>> T getNode(String name, Class<? extends Function> stubClass) {
 			List<Operator<?>> nodes = this.map.get(name);
 			if (nodes == null || nodes.isEmpty()) {
 				throw new RuntimeException("No node found with the given name and stub class.");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java b/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java
index 736ee14..d6ecc40 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java
@@ -16,18 +16,14 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.util;
 
-import java.io.Serializable;
-
 import org.apache.flink.api.java.record.functions.CrossFunction;
 import org.apache.flink.types.Record;
 
 public class DummyCrossStub extends CrossFunction {
 	private static final long serialVersionUID = 1L;
 
-
 	@Override
 	public Record cross(Record first, Record second) throws Exception {
 		return first;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
index 07b957d..981de14 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
 import java.io.Serializable;
@@ -24,9 +23,10 @@ import java.io.Serializable;
 import org.apache.flink.configuration.Configuration;
 
 /**
- * An abstract stub implementation for user-defined functions. It offers default implementations
- * for {@link #open(Configuration)} and {@link #close()}. It also offers access to the
- * {@link RuntimeContext} and {@link IterationRuntimeContext}.
+ * An abstract stub implementation for rich user-defined functions.
+ * Rich functions have additional methods for initialization ({@link #open(Configuration)}) and
+ * teardown ({@link #close()}), as well as access to their runtime execution context via
+ * {@link #getRuntimeContext()}.
  */
 public abstract class AbstractRichFunction implements RichFunction, Serializable {
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
index 5c200af..cc8fe78 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
 import java.io.Serializable;
@@ -24,18 +23,41 @@ import java.util.Iterator;
 
 import org.apache.flink.util.Collector;
 
-
+/**
+ * The interface for CoGroup functions. CoGroup functions combine two data sets by first grouping each data set
+ * after a key and then "joining" the groups by calling this function with the two sets for each key. 
+ * If a key is present in only one of the two inputs, it may be that one of the groups is empty.
+ * <p>
+ * The basic syntax for using CoGoup on two data sets is as follows:
+ * <pre><blockquote>
+ * DataSet<X> set1 = ...;
+ * DataSet<Y> set2 = ...;
+ * 
+ * set1.coGroup(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyCoGroupFunction());
+ * </blockquote></pre>
+ * <p>
+ * {@code set1} is here considered the first input, {@code set2} the second input.
+ * <p>
+ * Some keys may only be contained in one of the two original data sets. In that case, the CoGroup function is invoked
+ * with in empty input for the side of the data set that did not contain elements with that specific key.
+ * 
+ * @param <V1> The data type of the first input data set.
+ * @param <V2> The data type of the second input data set.
+ * @param <O> The data type of the returned elements.
+ */
 public interface CoGroupFunction<V1, V2, O> extends Function, Serializable {
 	
 	/**
 	 * This method must be implemented to provide a user implementation of a
-	 * coGroup. It is called for each two key-value pairs that share the same
-	 * key and come from different inputs.
+	 * coGroup. It is called for each pair of element groups where the elements share the
+	 * same key.
+	 * 
+	 * @param first The records from the first input.
+	 * @param second The records from the second.
+	 * @param out A collector to return elements.
 	 * 
-	 * @param first The records from the first input which were paired with the key.
-	 * @param second The records from the second input which were paired with the key.
-	 * @param out A collector that collects all output pairs.
+	 * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
+	 *                   and may trigger the recovery logic.
 	 */
 	void coGroup(Iterator<V1> first, Iterator<V2> second, Collector<O> out) throws Exception;
-	
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
index d72c4c8..aed7fae 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
@@ -16,16 +16,34 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
 import java.io.Serializable;
 import java.util.Iterator;
 
 /**
- * Generic interface used for combiners.
+ * Generic interface used for combine functions ("combiners"). Combiners act as auxiliaries to a {@link GroupReduceFunction}
+ * and "pre-reduce" the data. The combine functions typically do not see the entire group of elements, but
+ * only a sub-group.
+ * <p>
+ * Combine functions are frequently helpful in increasing the program efficiency, because they allow the system to
+ * reduce the data volume earlier, before the entire groups have been collected.
+ * <p>
+ * This special variant of the combine function reduces the group of elements into a single element. A variant
+ * that can return multiple values per group is defined in {@link FlatCombineFunction}.
+ * 
+ * @param <T> The data type processed by the combine function.
  */
 public interface CombineFunction<T> extends Function, Serializable {
 
-	T combine(Iterator<T> records) throws Exception;
+	/**
+	 * The combine method, called (potentially multiple timed) with subgroups of elements.
+	 * 
+	 * @param values The elements to be combined.
+	 * @return The single resulting value.
+	 * 
+	 * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
+	 *                   and may trigger the recovery logic.
+	 */
+	T combine(Iterator<T> values) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
index 0c8bc97..5e2c122 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
@@ -16,27 +16,43 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
 import java.io.Serializable;
 
-
 /**
- * @param <IN1> First input type
- * @param <IN2> Second input type
- * @param <OUT> Output type
+ * Interface for Cross functions. Cross functions are applied to the Cartesian produce of their inputs
+ * and call are called for each pair of elements.
+ * 
+ * They are optional, a means of convenience the can be used to directly produce manipulate the
+ * pair of elements, instead of processing 2-tuples that contain the pairs.
+ * <p>
+ * The basic syntax for using Cross on two data sets is as follows:
+ * <pre><blockquote>
+ * DataSet<X> set1 = ...;
+ * DataSet<Y> set2 = ...;
+ * 
+ * set1.cross(set2).with(new MyCrossFunction());
+ * </blockquote></pre>
+ * <p>
+ * {@code set1} is here considered the first input, {@code set2} the second input.
+ * 
+ * @param <IN1> The type of the elements in the first input.
+ * @param <IN2> The type of the elements in the second input.
+ * @param <OUT> The type of the result elements.
  */
 public interface CrossFunction<IN1, IN2, OUT> extends Function, Serializable {
 
 	/**
-	 * User defined function for the cross operator.
+	 * Cross UDF method. Called once per pair of elements in the Cartesian product of the inputs.
+	 * 
+	 * @param val1 Element from first input.
+	 * @param val2 Element from the second input.
+	 * @return The result element.
 	 * 
-	 * @param record1 Record from first input
-	 * @param record2 Record from the second input
-	 * @return result of cross UDF.
-	 * @throws Exception
+	 * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
+	 *                   and may trigger the recovery logic.
 	 */
-	OUT cross(IN1 record1, IN2 record2) throws Exception;
+	OUT cross(IN1 val1, IN2 val2) throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/ExecutionContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/ExecutionContext.java
deleted file mode 100644
index 9540dc1..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/ExecutionContext.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.functions;
-
-
-/**
- * The execution context provides basic information about the parallel runtime context
- * in which a stub instance lives. Such information includes the current number of
- * parallel stub instances, the stub's parallel task index, the pact name, or the iteration context.
- */
-public interface ExecutionContext {
-	
-	/**
-	 * Gets the name of the task. This is the name supplied to the contract upon instantiation. If
-	 * no name was given at contract instantiation time, a default name will be returned.
-	 * 
-	 * @return The task's name.
-	 */
-	String getTaskName();
-	
-	/**
-	 * Gets the number of parallel subtasks in which the stub is executed.
-	 * 
-	 * @return The number of parallel subtasks in which the stub is executed.
-	 */
-	int getNumberOfSubtasks();
-	
-	/**
-	 * Gets the subtask's parallel task number.
-	 * 
-	 * @return The subtask's parallel task number.
-	 */
-	int getSubtaskIndex();
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
index 2f68477..2380f60 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
@@ -16,18 +16,33 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
+
 import java.io.Serializable;
 
+/**
+ * base interface for Filter functions. A filter function take elements and evaluates a
+ * predicate on them to decide whether to keep the element, or to discard it.
+ * <p>
+ * The basic syntax for using a FilterFunction is as follows:
+ * <pre><blockquote>
+ * DataSet<X> input = ...;
+ * 
+ * DataSet<X> result = input.filter(new MyFilterFunction());
+ * </blockquote></pre>
+ * 
+ * @param <T> The type of the filtered elements.
+ */
 public interface FilterFunction<T> extends Function, Serializable {
 	
 	/**
-	 * User defined function for a filter.
+	 * The filter function that evaluates the predicate.
+	 * 
+	 * @param value The value to be filtered.
+	 * @return True for values that should be retained, false for values to be filtered out.
 	 * 
-	 * @param value Incoming tuples
-	 * @return true for tuples that are allowed to pass the filter
-	 * @throws Exception
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+	 *                   to fail and may trigger recovery.
 	 */
 	boolean filter(T value) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
index b2c8f30..edf1614 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
 import java.io.Serializable;
@@ -25,9 +24,28 @@ import java.util.Iterator;
 import org.apache.flink.util.Collector;
 
 /**
- * Generic interface used for combiners.
+ * Generic interface used for combine functions ("combiners"). Combiners act as auxiliaries to a {@link GroupReduceFunction}
+ * and "pre-reduce" the data. The combine functions typically do not see the entire group of elements, but
+ * only a sub-group.
+ * <p>
+ * Combine functions are frequently helpful in increasing the program efficiency, because they allow the system to
+ * reduce the data volume earlier, before the entire groups have been collected.
+ * <p>
+ * This special variant of the combine function supports to return more than one element per group.
+ * It is frequently less efficient to use than the {@link CombineFunction}.
+ * 
+ * @param <T> The data type processed by the combine function.
  */
 public interface FlatCombineFunction<T> extends Function, Serializable {
 
+	/**
+	 * The combine method, called (potentially multiple timed) with subgroups of elements.
+	 * 
+	 * @param values The elements to be combined.
+	 * @param out The collector to use to return values from the function.
+	 * 
+	 * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
+	 *                   and may trigger the recovery logic.
+	 */
 	void combine(Iterator<T> values, Collector<T> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
index 6a6b971..3ad2c82 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
@@ -16,15 +16,54 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
 import org.apache.flink.util.Collector;
 
 import java.io.Serializable;
 
-
+/**
+ * Interface for Join functions. Joins combine two data sets by joining their
+ * elements on specified keys. This function is called with each pair of joining elements.
+ * <p>
+ * This particular variant of the join function supports to return zero, one, or more result values
+ * per pair of joining values. 
+ * <p>
+ * By default, the joins follows strictly the semantics of an "inner join" in SQL.
+ * the semantics are those of an "inner join", meaning that elements are filtered out
+ * if their key is not contained in the other data set.
+ * <p>
+ * The basic syntax for using Join on two data sets is as follows:
+ * <pre><blockquote>
+ * DataSet<X> set1 = ...;
+ * DataSet<Y> set2 = ...;
+ * 
+ * set1.join(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyJoinFunction());
+ * </blockquote></pre>
+ * <p>
+ * {@code set1} is here considered the first input, {@code set2} the second input.
+ * <p>
+ * The Join function is an optional part of a join operation. If no JoinFunction is provided,
+ * the result of the operation is a sequence of 2-tuples, where the elements in the tuple are those that
+ * the JoinFunction would have been invoked with.
+ * <P>
+ * Note: You can use a {@link CoGroupFunction} to perform an outer join.
+ * 
+ * @param <IN1> The type of the elements in the first input.
+ * @param <IN2> The type of the elements in the second input.
+ * @param <OUT> The type of the result elements.
+ */
 public interface FlatJoinFunction<IN1, IN2, OUT> extends Function, Serializable {
 
-	void join (IN1 left, IN2 right, Collector<OUT> out) throws Exception;
+	/**
+	 * The join method, called once per joined pair of elements.
+	 * 
+	 * @param first The element from first input.
+	 * @param second The element from second input.
+	 * @param out The collector used to return zero, one, or more elements.
+	 * 
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+	 *                   to fail and may trigger recovery.
+	 */
+	void join (IN1 first, IN2 second, Collector<OUT> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
index a8696cf..0c32eae 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
@@ -16,27 +16,36 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
 import org.apache.flink.util.Collector;
 
 import java.io.Serializable;
 
-
 /**
- *
- * @param <T>
- * @param <O>
+ * interface flatMap functions. FlatMap functions take elements and transform them,
+ * into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists
+ * and arrays. Operations that produce multiple strictly one result element per input element can also
+ * use the {@link MapFunction}.
+ * <p>
+ * The basic syntax for using a FlatMapFunction is as follows:
+ * <pre><blockquote>
+ * DataSet<X> input = ...;
+ * 
+ * DataSet<Y> result = input.flatMap(new MyFlatMapFunction());
+ * </blockquote></pre>
+ * 
+ * @param <T> Type of the input elements.
+ * @param <O> Type of the returned elements.
  */
 public interface FlatMapFunction<T, O> extends Function, Serializable {
 
 	/**
-	 * The core method of FlatMappable. Takes an element from the input data set and transforms
+	 * The core method of the FlatMapFunction. Takes an element from the input data set and transforms
 	 * it into zero, one, or more elements.
 	 *
 	 * @param value The input value.
-	 * @param out The collector for for emitting result values.
+	 * @param out The collector for returning result values.
 	 *
 	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
 	 *                   to fail and may trigger recovery.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
index c2a201f..9e9d88d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
@@ -24,5 +24,4 @@ package org.apache.flink.api.common.functions;
  * can be called as Java 8 lambdas
  */
 public interface Function {
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
index 41cfa1d..edac4a8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
@@ -16,13 +16,17 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
 import org.apache.flink.util.Collector;
 
-
-
+/**
+ * Variant of the flat map that is used for backwards compatibility in the deprecated Record-API-
+ *
+ * @param <T> The input data type.
+ * @param <O> The result data type.
+ */
+@Deprecated
 public interface GenericCollectorMap<T, O> extends RichFunction {
 	
 	void map(T record, Collector<O> out) throws Exception;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
index 984d1fd..5fa959c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
 import java.io.Serializable;
@@ -24,22 +23,35 @@ import java.util.Iterator;
 
 import org.apache.flink.util.Collector;
 
-
 /**
- *
- * @param <T> Incoming types
- * @param <O> Outgoing types
+ * The interface for group reduce functions. GroupReduceFunctions process groups of elements.
+ * They may aggregate them to a single value, or produce multiple result values for each group.
+ * The group may be defined by sharing a common grouping key, or the group may simply be
+ * all elements of a data set.
+ * <p>
+ * For a reduce functions that works incrementally by combining always two elements, see 
+ * {@link ReduceFunction}.
+ * <p>
+ * The basic syntax for using a grouped GroupReduceFunction is as follows:
+ * <pre><blockquote>
+ * DataSet<X> input = ...;
+ * 
+ * DataSet<X> result = input.groupBy(<key-definition>).reduceGroup(new MyGroupReduceFunction());
+ * </blockquote></pre>
+ * 
+ * @param <T> Type of the elements that this function processes.
+ * @param <O> The type of the elements returned by the user-defined function.
  */
 public interface GroupReduceFunction<T, O> extends Function, Serializable {
+	
 	/**
+	 * The reduce method. The function receives one call per group of elements.
 	 * 
-	 * The central function to be implemented for a reducer. The function receives per call one
-	 * key and all the values that belong to that key. Each key is guaranteed to be processed by exactly
-	 * one function call across all involved instances across all computing nodes.
-	 * 
-	 * @param records All records that belong to the given input key.
+	 * @param values All records that belong to the given input key.
 	 * @param out The collector to hand results to.
-	 * @throws Exception
+	 *
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+	 *                   to fail and may trigger recovery.
 	 */
 	void reduce(Iterator<T> values, Collector<O> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
index 02f526a..9a8943c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
@@ -16,13 +16,49 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
 import java.io.Serializable;
 
-
+/**
+ * Interface for Join functions. Joins combine two data sets by joining their
+ * elements on specified keys. This function is called with each pair of joining elements.
+ * <p>
+ * By default, the joins follows strictly the semantics of an "inner join" in SQL.
+ * the semantics are those of an "inner join", meaning that elements are filtered out
+ * if their key is not contained in the other data set.
+ * <p>
+ * The basic syntax for using Join on two data sets is as follows:
+ * <pre><blockquote>
+ * DataSet<X> set1 = ...;
+ * DataSet<Y> set2 = ...;
+ * 
+ * set1.join(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyJoinFunction());
+ * </blockquote></pre>
+ * <p>
+ * {@code set1} is here considered the first input, {@code set2} the second input.
+ * <p>
+ * The Join function is an optional part of a join operation. If no JoinFunction is provided,
+ * the result of the operation is a sequence of 2-tuples, where the elements in the tuple are those that
+ * the JoinFunction would have been invoked with.
+ * <P>
+ * Note: You can use a {@link CoGroupFunction} to perform an outer join.
+ * 
+ * @param <IN1> The type of the elements in the first input.
+ * @param <IN2> The type of the elements in the second input.
+ * @param <OUT> The type of the result elements.
+ */
 public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {
 
+	/**
+	 * The join method, called once per joined pair of elements.
+	 * 
+	 * @param first The element from first input.
+	 * @param second The element from second input.
+	 * @return The resulting element.
+	 * 
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+	 *                   to fail and may trigger recovery.
+	 */
 	OUT join(IN1 first, IN2 second) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
index 4e2520d..dccc980 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
@@ -16,16 +16,31 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
-
 import java.io.Serializable;
 
+/**
+ * Base interface for Map functions. Map functions take elements and transform them,
+ * element wise. A Map function always produces a single result element for each input element.
+ * Typical applications are parsing elements, converting data types, or projecting out fields.
+ * Operations that produce multiple result elements from a single input element can be implemented
+ * using the {@link FlatMapFunction}.
+ * <p>
+ * The basic syntax for using a MapFunction is as follows:
+ * <pre><blockquote>
+ * DataSet<X> input = ...;
+ * 
+ * DataSet<Y> result = input.map(new MyMapFunction());
+ * </blockquote></pre>
+ * 
+ * @param <T> Type of the input elements.
+ * @param <O> Type of the returned elements.
+ */
 public interface MapFunction<T, O> extends Function, Serializable {
 
 	/**
-	 * The core method of Mappable. Takes an element from the input data set and transforms
+	 * The mapping method. Takes an element from the input data set and transforms
 	 * it into exactly one element.
 	 *
 	 * @param value The input value.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
index 04f690a..7a61c97 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
@@ -16,16 +16,36 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
-
 import java.io.Serializable;
 
+/**
+ * Base interface for Reduce functions. Reduce functions combine groups of elements to
+ * a single value, by taking always two elements and combining them into one. Reduce functions
+ * may be used on entire data sets, or on grouped data sets. In the latter case, each group is reduced
+ * individually.
+ * <p>
+ * For a reduce functions that work on an entire group at the same time (such as the 
+ * MapReduce/Hadoop-style reduce), see {@link GroupReduceFunction}. In the general case,
+ * ReduceFunctions are considered faster, because they allow the system to use more efficient
+ * execution strategies.
+ * <p>
+ * The basic syntax for using a grouped ReduceFunction is as follows:
+ * <pre><blockquote>
+ * DataSet<X> input = ...;
+ * 
+ * DataSet<X> result = input.groupBy(<key-definition>).reduce(new MyReduceFunction());
+ * </blockquote></pre>
+ * <p>
+ * Like all functions, the ReduceFunction needs to be serializable, as defined in {@link java.io.Serializable}.
+ * 
+ * @param <T> Type of the elements that this function processes.
+ */
 public interface ReduceFunction<T> extends Function, Serializable {
 
 	/**
-	 * The core method of Reducible, combining two values into one value of the same type.
+	 * The core method of ReduceFunction, combining two values into one value of the same type.
 	 * The reduce function is consecutively applied to all values of a group until only a single value remains.
 	 *
 	 * @param value1 The first value to combine.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
index ffc3ac2..8f53252 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
 import org.apache.flink.configuration.Configuration;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 3cf30ff..e18858b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -105,34 +105,6 @@ public interface RuntimeContext {
 	 * Convenience function to create a counter object for histograms.
 	 */
 	Histogram getHistogram(String name);
-
-//	/**
-//	 * I propose to remove this and only keep the other more explicit functions
-//	 * (to add or get an accumulator object)
-//	 * 
-//	 * Get an existing or new named accumulator object. Use this function to get
-//	 * an counter for an custom accumulator type. For the integrated
-//	 * accumulators you better use convenience functions (e.g. getIntCounter).
-//	 * 
-//	 * There is no need to register accumulators - they will be created when a
-//	 * UDF asks the first time for a counter that does not exist yet locally.
-//	 * This implies that there can be conflicts when a counter is requested with
-//	 * the same name but with different types, either in the same UDF or in
-//	 * different. In the last case the conflict occurs during merging.
-//	 * 
-//	 * @param name
-//	 * @param accumulatorClass
-//	 *            If the accumulator was not created previously
-//	 * @return
-//	 */
-//	<V, A> Accumulator<V, A> getAccumulator(String name,
-//			Class<? extends Accumulator<V, A>> accumulatorClass);
-//
-//	/**
-//	 * See getAccumulable
-//	 */
-//	<T> SimpleAccumulator<T> getSimpleAccumulator(String name,
-//			Class<? extends SimpleAccumulator<T>> accumulatorClass);
 	
 	// --------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
index bc4ffd0..6455c77 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
@@ -18,18 +18,15 @@
 
 package org.apache.flink.api.common.functions.util;
 
-
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
 
-import java.lang.invoke.SerializedLambda;
 import java.lang.reflect.Method;
 
 public class FunctionUtils {
 
-
 	public static void openFunction (Function function, Configuration parameters) throws Exception{
 		if (function instanceof RichFunction) {
 			RichFunction richFunction = (RichFunction) function;
@@ -61,21 +58,30 @@ public class FunctionUtils {
 		}
 	}
 
-	public static boolean isSerializedLambdaFunction(Function function) {
-		Class<?> clazz = function.getClass();
-		try {
-			Method replaceMethod = clazz.getDeclaredMethod("writeReplace");
-			replaceMethod.setAccessible(true);
-			Object serializedForm = replaceMethod.invoke(function);
-			if (serializedForm instanceof SerializedLambda) {
-				return true;
+	public static boolean isLambdaFunction(Function function) {
+		if (function == null) {
+			throw new IllegalArgumentException();
+		}
+		
+		for (Class<?> clazz = function.getClass(); clazz != null; clazz = clazz.getSuperclass()) {
+			try {
+				Method replaceMethod = clazz.getDeclaredMethod("writeReplace");
+				replaceMethod.setAccessible(true);
+				Object serialVersion = replaceMethod.invoke(function);
+				
+				if (serialVersion.getClass().getName().equals("java.lang.invoke.SerializedLambda")) {
+					return true;
+				}
 			}
-			else {
-				return false;
+			catch (NoSuchMethodException e) {
+				// thrown if the method is not there. fall through the loop
+			}
+			catch (Throwable t) {
+				// this should not happen, we are not executing any method code.
+				throw new RuntimeException("Error while checking whether function is a lambda.", t);
 			}
 		}
-		catch (Exception e) {
-			return false;
-		}
+		
+		return false;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
index c416765..6761c17 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.operators;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
index 66bea7f..c91e6ab 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
@@ -48,6 +48,7 @@ import org.apache.flink.util.Visitor;
 /**
  * 
  */
+@SuppressWarnings("deprecation")
 public class BulkIterationBase<T> extends SingleInputOperator<T, T, AbstractRichFunction> implements IterationOperator {
 	
 	private static String DEFAULT_NAME = "<Unnamed Bulk Iteration>";

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
index ef00a46..6909dd4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.operators.base;
 
 import org.apache.flink.api.common.functions.GenericCollectorMap;
@@ -33,6 +32,7 @@ import org.apache.flink.api.common.operators.util.UserCodeWrapper;
  * 
  * @see GenericCollectorMap
  */
+@Deprecated
 public class CollectorMapOperatorBase<IN, OUT, FT extends GenericCollectorMap<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
 	
 	public CollectorMapOperatorBase(UserCodeWrapper<FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java
index 7d6495b..2683583 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java
@@ -48,6 +48,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase;
 /**
  * Convenience methods when dealing with {@link Operator}s.
  */
+@SuppressWarnings("deprecation")
 public class OperatorUtil {
 	
 	@SuppressWarnings("rawtypes")
@@ -126,7 +127,7 @@ public class OperatorUtil {
 	 * @param inputs
 	 *        all input contracts to this contract
 	 */
-	@SuppressWarnings({ "deprecation", "rawtypes", "unchecked" })
+	@SuppressWarnings({ "rawtypes", "unchecked" })
 	public static void setInputs(final Operator<?> contract, final List<List<Operator>> inputs) {
 		if (contract instanceof GenericDataSinkBase) {
 			if (inputs.size() != 1) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java
index 30091ab..88809bb 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java
@@ -42,6 +42,7 @@ import org.junit.Test;
 /**
  * Tests {@link OperatorUtil}.
  */
+@SuppressWarnings("deprecation")
 public class OperatorUtilTest {
 	/**
 	 * Test {@link OperatorUtil#getContractClass(Class)}
@@ -115,13 +116,17 @@ public class OperatorUtilTest {
 		assertEquals(GenericDataSourceBase.class, result);
 	}
 
+	@SuppressWarnings("serial")
 	static abstract class CoGrouper implements CoGroupFunction<IntValue, IntValue, IntValue> {}
 
+	@SuppressWarnings("serial")
 	static abstract class Crosser implements CrossFunction<IntValue, IntValue, IntValue> {}
 
 	static abstract class Mapper implements GenericCollectorMap<IntValue, IntValue> {}
 
+	@SuppressWarnings("serial")
 	static abstract class Matcher implements FlatJoinFunction<IntValue, IntValue, IntValue> {}
 
+	@SuppressWarnings("serial")
 	static abstract class Reducer implements GroupReduceFunction<IntValue, IntValue> {}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index e7199f9..3d1238a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -50,9 +50,9 @@ import org.apache.flink.api.java.operators.MapOperator;
 import org.apache.flink.api.java.operators.ProjectOperator.Projection;
 import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.operators.ReduceOperator;
+import org.apache.flink.api.java.operators.SortedGrouping;
 import org.apache.flink.api.java.operators.UnionOperator;
 import org.apache.flink.api.java.operators.UnsortedGrouping;
-import org.apache.flink.api.java.record.functions.CrossFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
@@ -65,8 +65,8 @@ import org.apache.flink.types.TypeInformation;
  * A DataSet represents a collection of elements of the same type.<br/>
  * A DataSet can be transformed into another DataSet by applying a transformation as for example 
  * <ul>
- *   <li>{@link DataSet#map(org.apache.flink.api.java.functions.RichMapFunction)},</li>
- *   <li>{@link DataSet#reduce(org.apache.flink.api.java.functions.RichReduceFunction)},</li>
+ *   <li>{@link DataSet#map(org.apache.flink.api.common.functions.MapFunction)},</li>
+ *   <li>{@link DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction)},</li>
  *   <li>{@link DataSet#join(DataSet)}, or</li>
  *   <li>{@link DataSet#coGroup(DataSet)}.</li>
  * </ul>
@@ -135,7 +135,7 @@ public abstract class DataSet<T> {
 		if (mapper == null) {
 			throw new NullPointerException("Map function must not be null.");
 		}
-		if (FunctionUtils.isSerializedLambdaFunction(mapper)) {
+		if (FunctionUtils.isLambdaFunction(mapper)) {
 			throw new UnsupportedLambdaExpressionException();
 		}
 		return new MapOperator<T, R>(this, mapper);
@@ -157,7 +157,7 @@ public abstract class DataSet<T> {
 		if (flatMapper == null) {
 			throw new NullPointerException("FlatMap function must not be null.");
 		}
-		if (FunctionUtils.isSerializedLambdaFunction(flatMapper)) {
+		if (FunctionUtils.isLambdaFunction(flatMapper)) {
 			throw new UnsupportedLambdaExpressionException();
 		}
 		return new FlatMapOperator<T, R>(this, flatMapper);
@@ -197,13 +197,13 @@ public abstract class DataSet<T> {
 	 * 
 	 * @param fieldIndexes The field indexes of the input tuples that are retained.
 	 * 					   The order of fields in the output tuple corresponds to the order of field indexes.
-	 * @return A Projection that needs to be converted into a {@link ProjectOperator} to complete the 
+	 * @return A Projection that needs to be converted into a {@link org.apache.flink.api.java.operators.ProjectOperator} to complete the 
 	 *           Project transformation by calling {@link Projection#types()}.
 	 * 
 	 * @see Tuple
 	 * @see DataSet
 	 * @see Projection
-	 * @see ProjectOperator
+	 * @see org.apache.flink.api.java.operators.ProjectOperator
 	 */
 	public Projection<T> project(int... fieldIndexes) {
 		return new Projection<T>(this, fieldIndexes);
@@ -303,7 +303,7 @@ public abstract class DataSet<T> {
 		if (reducer == null) {
 			throw new NullPointerException("GroupReduce function must not be null.");
 		}
-		if (FunctionUtils.isSerializedLambdaFunction(reducer)) {
+		if (FunctionUtils.isLambdaFunction(reducer)) {
 			throw new UnsupportedLambdaExpressionException();
 		}
 		return new GroupReduceOperator<T, R>(this, reducer);
@@ -366,17 +366,15 @@ public abstract class DataSet<T> {
 	 * <ul>
 	 *   <li>{@link UnsortedGrouping#sortGroup(int, org.apache.flink.api.common.operators.Order)} to get a {@link SortedGrouping}. 
 	 *   <li>{@link UnsortedGrouping#aggregate(Aggregations, int)} to apply an Aggregate transformation.
-	 *   <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.java.functions.RichReduceFunction)} to apply a Reduce transformation.
-	 *   <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)} to apply a GroupReduce transformation.
+	 *   <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.common.functions.ReduceFunction)} to apply a Reduce transformation.
+	 *   <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)} to apply a GroupReduce transformation.
 	 * </ul>
 	 *  
 	 * @param keyExtractor The KeySelector function which extracts the key values from the DataSet on which it is grouped. 
 	 * @return An UnsortedGrouping on which a transformation needs to be applied to obtain a transformed DataSet.
 	 * 
 	 * @see KeySelector
-	 * @see Grouping
 	 * @see UnsortedGrouping
-	 * @see SortedGrouping
 	 * @see AggregateOperator
 	 * @see ReduceOperator
 	 * @see org.apache.flink.api.java.operators.GroupReduceOperator
@@ -395,17 +393,15 @@ public abstract class DataSet<T> {
 	 * <ul>
 	 *   <li>{@link UnsortedGrouping#sortGroup(int, org.apache.flink.api.common.operators.Order)} to get a {@link SortedGrouping}. 
 	 *   <li>{@link UnsortedGrouping#aggregate(Aggregations, int)} to apply an Aggregate transformation.
-	 *   <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.java.functions.RichReduceFunction)} to apply a Reduce transformation.
-	 *   <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)} to apply a GroupReduce transformation.
+	 *   <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.common.functions.ReduceFunction)} to apply a Reduce transformation.
+	 *   <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)} to apply a GroupReduce transformation.
 	 * </ul> 
 	 * 
 	 * @param fields One or more field positions on which the DataSet will be grouped. 
 	 * @return A Grouping on which a transformation needs to be applied to obtain a transformed DataSet.
 	 * 
 	 * @see Tuple
-	 * @see Grouping
 	 * @see UnsortedGrouping
-	 * @see SortedGrouping
 	 * @see AggregateOperator
 	 * @see ReduceOperator
 	 * @see org.apache.flink.api.java.operators.GroupReduceOperator
@@ -424,17 +420,15 @@ public abstract class DataSet<T> {
 	 * <ul>
 	 *   <li>{@link UnsortedGrouping#sortGroup(int, org.apache.flink.api.common.operators.Order)} to get a {@link SortedGrouping}.
 	 *   <li>{@link UnsortedGrouping#aggregate(Aggregations, int)} to apply an Aggregate transformation.
-	 *   <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.java.functions.RichReduceFunction)} to apply a Reduce transformation.
-	 *   <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)} to apply a GroupReduce transformation.
+	 *   <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.common.functions.ReduceFunction)} to apply a Reduce transformation.
+	 *   <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)} to apply a GroupReduce transformation.
 	 * </ul>
 	 *
 	 * @param fields One or more field expressions on which the DataSet will be grouped.
 	 * @return A Grouping on which a transformation needs to be applied to obtain a transformed DataSet.
 	 *
 	 * @see Tuple
-	 * @see Grouping
 	 * @see UnsortedGrouping
-	 * @see SortedGrouping
 	 * @see AggregateOperator
 	 * @see ReduceOperator
 	 * @see org.apache.flink.api.java.operators.GroupReduceOperator
@@ -462,7 +456,6 @@ public abstract class DataSet<T> {
 	 * @return A JoinOperatorSets to continue the definition of the Join transformation.
 	 * 
 	 * @see JoinOperatorSets
-	 * @see JoinOperator
 	 * @see DataSet
 	 */
 	public <R> JoinOperatorSets<T, R> join(DataSet<R> other) {
@@ -484,7 +477,6 @@ public abstract class DataSet<T> {
 	 * @return A JoinOperatorSets to continue the definition of the Join transformation.
 	 * 
 	 * @see JoinOperatorSets
-	 * @see JoinOperator
 	 * @see DataSet
 	 */
 	public <R> JoinOperatorSets<T, R> joinWithTiny(DataSet<R> other) {
@@ -506,7 +498,6 @@ public abstract class DataSet<T> {
 	 * @return A JoinOperatorSet to continue the definition of the Join transformation.
 	 * 
 	 * @see JoinOperatorSets
-	 * @see JoinOperator
 	 * @see DataSet
 	 */
 	public <R> JoinOperatorSets<T, R> joinWithHuge(DataSet<R> other) {
@@ -570,7 +561,7 @@ public abstract class DataSet<T> {
 	 * second input being the second field of the tuple.
 	 * 
 	 * <p>
-	 * Call {@link DefaultCross.with(CrossFunction)} to define a {@link CrossFunction} which is called for
+	 * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(CrossFunction)} to define a {@link CrossFunction} which is called for
 	 * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.</br>
 	 * 
 	 * @param other The other DataSet with which this DataSet is crossed. 
@@ -599,14 +590,15 @@ public abstract class DataSet<T> {
 	 * second input being the second field of the tuple.
 	 *   
 	 * <p>
-	 * Call {@link DefaultCross.with(CrossFunction)} to define a {@link CrossFunction} which is called for
+	 * Call {@link DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a
+	 * {@link org.apache.flink.api.common.functions.CrossFunction} which is called for
 	 * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.</br>
 	 * 
 	 * @param other The other DataSet with which this DataSet is crossed. 
 	 * @return A DefaultCross that returns a Tuple2 for each pair of crossed elements.
 	 * 
 	 * @see DefaultCross
-	 * @see CrossFunction
+	 * @see org.apache.flink.api.common.functions.CrossFunction
 	 * @see DataSet
 	 * @see Tuple2
 	 */
@@ -628,14 +620,15 @@ public abstract class DataSet<T> {
 	 * second input being the second field of the tuple.
 	 *   
 	 * <p>
-	 * Call {@link DefaultCross.with(CrossFunction)} to define a {@link CrossFunction} which is called for
+	 * Call {@link DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a
+	 * {@link org.apache.flink.api.common.functions.CrossFunction} which is called for
 	 * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.</br>
 	 * 
 	 * @param other The other DataSet with which this DataSet is crossed. 
 	 * @return A DefaultCross that returns a Tuple2 for each pair of crossed elements.
 	 * 
 	 * @see DefaultCross
-	 * @see CrossFunction
+	 * @see org.apache.flink.api.common.functions.CrossFunction
 	 * @see DataSet
 	 * @see Tuple2
 	 */

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index ebd1422..af0ea52 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -112,7 +112,7 @@ public abstract class ExecutionEnvironment {
 	 * individually override this value to use a specific degree of parallelism via
 	 * {@link Operator#setParallelism(int)}. Other operations may need to run with a different
 	 * degree of parallelism - for example calling
-	 * {@link DataSet#reduce(org.apache.flink.api.java.functions.RichReduceFunction)} over the entire
+	 * {@link DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction)} over the entire
 	 * set will insert eventually an operation that runs non-parallel (degree of parallelism of one).
 	 * 
 	 * @return The degree of parallelism used by operations, unless they override that value. This method

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
index 012ab57..0a83235 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
@@ -23,7 +23,7 @@ import java.util.Iterator;
 import org.apache.flink.util.Collector;
 
 /**
- * A variant of the {@link RichFlatMapFunction} that returns elements through an iterator, rather then
+ * A convenience variant of the {@link RichFlatMapFunction} that returns elements through an iterator, rather then
  * through a collector. In all other respects, it behaves exactly like the FlatMapFunction.
  * <p>
  * The function needs to be serializable, as defined in {@link java.io.Serializable}.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
index 8aaaf86..e78c31e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
@@ -22,29 +22,14 @@ import java.util.Iterator;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.util.Collector;
 
 /**
- * The abstract base class for CoGroup functions. CoGroup functions combine two data sets by first grouping each data set
- * after a key and then "joining" the groups by calling this function with the two sets for each key. 
- * If a key is present in only one of the two inputs, it may be that one of the groups is empty.
- * <p>
- * The basic syntax for using CoGoup on two data sets is as follows:
- * <pre><blockquote>
- * DataSet<X> set1 = ...;
- * DataSet<Y> set2 = ...;
- * 
- * set1.coGroup(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyCoGroupFunction());
- * </blockquote></pre>
- * <p>
- * {@code set1} is here considered the first input, {@code set2} the second input.
- * The keys can be defined through tuple field positions or key extractors.
- * See {@link org.apache.flink.api.java.operators.Keys} for details.
- * <p>
- * Some keys may only be contained in one of the two original data sets. In that case, the CoGroup function is invoked
- * with in empty input for the side of the data set that did not contain elements with that specific key.
- * <p>
- * All functions need to be serializable, as defined in {@link java.io.Serializable}.
+ * Rich variant of the {@link CoGroupFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
  * 
  * @param <IN1> The type of the elements in the first input.
  * @param <IN2> The type of the elements in the second input.
@@ -54,20 +39,6 @@ public abstract class RichCoGroupFunction<IN1, IN2, OUT> extends AbstractRichFun
 
 	private static final long serialVersionUID = 1L;
 	
-	
-	/**
-	 * The core method of the CoGroupFunction. This method is called for each pair of groups that have the same
-	 * key. The elements of the groups are returned by the respective iterators.
-	 * 
-	 * It is possible that one of the two groups is empty, in which case the respective iterator has no elements.
-	 * 
-	 * @param first The group from the first input.
-	 * @param second The group from the second input.
-	 * @param out The collector through which to return the result elements.
-	 * 
-	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
-	 *                   to fail and may trigger recovery.
-	 */
 	@Override
 	public abstract void coGroup(Iterator<IN1> first, Iterator<IN2> second, Collector<OUT> out) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java
index a4e1248..58be279 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java
@@ -20,26 +20,13 @@ package org.apache.flink.api.java.functions;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.CrossFunction;
-
+import org.apache.flink.api.common.functions.RichFunction;
 
 /**
- * The abstract base class for Cross functions. Cross functions build a Cartesian produce of their inputs
- * and call the function or each pair of elements.
- * They are a means of convenience and can be used to directly produce manipulate the
- * pair of elements, instead of having the operator build 2-tuples, and then using a
- * MapFunction over those 2-tuples.
- * <p>
- * The basic syntax for using Cross on two data sets is as follows:
- * <pre><blockquote>
- * DataSet<X> set1 = ...;
- * DataSet<Y> set2 = ...;
- * 
- * set1.cross(set2).with(new MyCrossFunction());
- * </blockquote></pre>
- * <p>
- * {@code set1} is here considered the first input, {@code set2} the second input.
- * <p>
- * All functions need to be serializable, as defined in {@link java.io.Serializable}.
+ * Rich variant of the {@link CrossFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
  * 
  * @param <IN1> The type of the elements in the first input.
  * @param <IN2> The type of the elements in the second input.
@@ -48,19 +35,7 @@ import org.apache.flink.api.common.functions.CrossFunction;
 public abstract class RichCrossFunction<IN1, IN2, OUT> extends AbstractRichFunction implements CrossFunction<IN1, IN2, OUT> {
 	
 	private static final long serialVersionUID = 1L;
-	
 
-	/**
-	 * The core method of the cross operation. The method will be invoked for each pair of elements
-	 * in the Cartesian product.
-	 * 
-	 * @param first The element from the first input.
-	 * @param second The element from the second input.
-	 * @return The result element.
-	 * 
-	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
-	 *                   to fail and may trigger recovery.
-	 */
 	@Override
 	public abstract OUT cross(IN1 first, IN2 second) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java
index e3baa74..9057a0f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java
@@ -20,19 +20,13 @@ package org.apache.flink.api.java.functions;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichFunction;
 
 /**
- * The abstract base class for Filter functions. A filter function take elements and evaluates a
- * predicate on them to decide whether to keep the element, or to discard it.
- * <p>
- * The basic syntax for using a FilterFunction is as follows:
- * <pre><blockquote>
- * DataSet<X> input = ...;
- * 
- * DataSet<X> result = input.filter(new MyFilterFunction());
- * </blockquote></pre>
- * <p>
- * Like all functions, the FilterFunction needs to be serializable, as defined in {@link java.io.Serializable}.
+ * Rich variant of the {@link FilterFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
  * 
  * @param <T> The type of the filtered elements.
  */
@@ -40,18 +34,6 @@ public abstract class RichFilterFunction<T> extends AbstractRichFunction impleme
 	
 	private static final long serialVersionUID = 1L;
 	
-	/**
-	 * The core method of the FilterFunction. The method is called for each element in the input,
-	 * and determines whether the element should be kept or filtered out. If the method returns true,
-	 * the element passes the filter and is kept, if the method returns false, the element is
-	 * filtered out.
-	 * 
-	 * @param value The input value to be filtered.
-	 * @return Flag to indicate whether to keep the value (true) or to discard it (false).
-	 * 
-	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
-	 *                   to fail and may trigger recovery.
-	 */
 	@Override
 	public abstract boolean filter(T value) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
index 8c326c6..97c15ff 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
@@ -21,10 +21,19 @@ package org.apache.flink.api.java.functions;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.util.Collector;
 
 import java.util.Iterator;
 
+/**
+ * Rich variant of the {@link FlatCombineFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <T> The data type of the elements to be combined.
+ */
 public abstract class RichFlatCombineFunction<T> extends AbstractRichFunction implements FlatCombineFunction<T> {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java
index 15b4539..6918364 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java
@@ -20,36 +20,14 @@ package org.apache.flink.api.java.functions;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.util.Collector;
 
 /**
- * The abstract base class for Join functions. Join functions combine two data sets by joining their
- * elements on specified keys and calling this function with each pair of joining elements.
- * By default, this follows strictly the semantics of an "inner join" in SQL.
- * the semantics are those of an "inner join", meaning that elements are filtered out
- * if their key is not contained in the other data set.
- * <p>
- * Per the semantics of an inner join, the function is 
- * <p>
- * The basic syntax for using Join on two data sets is as follows:
- * <pre><blockquote>
- * DataSet<X> set1 = ...;
- * DataSet<Y> set2 = ...;
- * 
- * set1.join(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyJoinFunction());
- * </blockquote></pre>
- * <p>
- * {@code set1} is here considered the first input, {@code set2} the second input.
- * The keys can be defined through tuple field positions or key extractors.
- * See {@link org.apache.flink.api.java.operators.Keys} for details.
- * <p>
- * The Join function is actually not a necessary part of a join operation. If no JoinFunction is provided,
- * the result of the operation is a sequence of Tuple2, where the elements in the tuple are those that
- * the JoinFunction would have been invoked with.
- * <P>
- * Note: You can use a {@link RichCoGroupFunction} to perform an outer join.
- * <p>
- * All functions need to be serializable, as defined in {@link java.io.Serializable}.
+ * Rich variant of the {@link FlatJoinFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
  * 
  * @param <IN1> The type of the elements in the first input.
  * @param <IN2> The type of the elements in the second input.
@@ -59,17 +37,6 @@ public abstract class RichFlatJoinFunction<IN1, IN2, OUT> extends AbstractRichFu
 
 	private static final long serialVersionUID = 1L;
 
-	/**
-	 * The user-defined method for performing transformations after a join.
-	 * The method is called with matching pairs of elements from the inputs.
-	 * 
-	 * @param first The element from first input.
-	 * @param second The element from second input.
-	 * @return The resulting element.
-	 * 
-	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
-	 *                   to fail and may trigger recovery.
-	 */
 	@Override
 	public abstract void join(IN1 first, IN2 second, Collector<OUT> out) throws Exception;
 }