You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/01 02:36:52 UTC
[11/16] git commit: [FLINK-701] Several cleanups after SAM
refactoring. - Lambda detection compiles on earlier java versions - Add
lambda detection test. - Fix JavaDocs
[FLINK-701] Several cleanups after SAM refactoring.
- Lambda detection compiles on earlier java versions
- Add lambda detection test.
- Fix JavaDocs
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/bc89e911
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/bc89e911
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/bc89e911
Branch: refs/heads/master
Commit: bc89e911b2cd433bb841beab9e8e513ca0c3525d
Parents: 22b24f2
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jul 31 19:46:14 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jul 31 20:18:00 2014 +0200
----------------------------------------------------------------------
.../examples/SpargelConnectedComponents.java | 4 +-
.../spargel/java/examples/SpargelPageRank.java | 8 +-
.../SpargelPageRankCountingVertices.java | 11 +-
.../apache/flink/client/web/JobsServlet.java | 1 -
.../org/apache/flink/compiler/PactCompiler.java | 6 +-
.../apache/flink/compiler/CompilerTestBase.java | 6 +-
.../flink/compiler/util/DummyCrossStub.java | 4 -
.../common/functions/AbstractRichFunction.java | 8 +-
.../api/common/functions/CoGroupFunction.java | 38 +++++--
.../api/common/functions/CombineFunction.java | 24 ++++-
.../api/common/functions/CrossFunction.java | 38 +++++--
.../api/common/functions/ExecutionContext.java | 51 ----------
.../api/common/functions/FilterFunction.java | 25 ++++-
.../common/functions/FlatCombineFunction.java | 22 +++-
.../api/common/functions/FlatJoinFunction.java | 45 +++++++-
.../api/common/functions/FlatMapFunction.java | 23 +++--
.../flink/api/common/functions/Function.java | 1 -
.../common/functions/GenericCollectorMap.java | 10 +-
.../common/functions/GroupReduceFunction.java | 34 +++++--
.../api/common/functions/JoinFunction.java | 40 +++++++-
.../flink/api/common/functions/MapFunction.java | 21 +++-
.../api/common/functions/ReduceFunction.java | 26 ++++-
.../api/common/functions/RichFunction.java | 1 -
.../api/common/functions/RuntimeContext.java | 28 -----
.../common/functions/util/FunctionUtils.java | 38 ++++---
.../flink/api/common/operators/Union.java | 1 -
.../operators/base/BulkIterationBase.java | 1 +
.../base/CollectorMapOperatorBase.java | 2 +-
.../api/common/operators/util/OperatorUtil.java | 3 +-
.../common/operators/util/OperatorUtilTest.java | 5 +
.../java/org/apache/flink/api/java/DataSet.java | 49 ++++-----
.../flink/api/java/ExecutionEnvironment.java | 2 +-
.../api/java/functions/FlatMapIterator.java | 2 +-
.../api/java/functions/RichCoGroupFunction.java | 39 +------
.../api/java/functions/RichCrossFunction.java | 35 +------
.../api/java/functions/RichFilterFunction.java | 28 +----
.../java/functions/RichFlatCombineFunction.java | 9 ++
.../java/functions/RichFlatJoinFunction.java | 43 +-------
.../api/java/functions/RichFlatMapFunction.java | 28 +----
.../java/functions/RichGroupReduceFunction.java | 32 +-----
.../api/java/functions/RichJoinFunction.java | 12 ++-
.../api/java/functions/RichMapFunction.java | 29 +-----
.../api/java/functions/RichReduceFunction.java | 35 +------
.../api/java/operators/CoGroupOperator.java | 2 +-
.../flink/api/java/operators/CrossOperator.java | 2 +-
.../api/java/operators/DistinctOperator.java | 28 ++---
.../api/java/operators/GroupReduceOperator.java | 7 +-
.../flink/api/java/operators/Grouping.java | 4 +-
.../flink/api/java/operators/JoinOperator.java | 6 +-
.../api/java/operators/SortedGrouping.java | 4 +-
.../api/java/operators/UnsortedGrouping.java | 2 +-
.../PlanUnwrappingReduceGroupOperator.java | 2 +-
.../java/record/functions/CoGroupFunction.java | 2 +-
.../java/record/functions/CrossFunction.java | 6 +-
.../api/java/record/functions/JoinFunction.java | 15 +--
.../api/java/record/functions/MapFunction.java | 2 +-
.../java/record/functions/ReduceFunction.java | 2 +-
.../api/java/record/operators/MapOperator.java | 1 +
.../DeltaIterationTranslationTest.java | 2 +-
.../translation/DistrinctTranslationTest.java | 57 +++++++++++
.../java/type/extractor/TypeExtractorTest.java | 4 +
.../javaApiOperators/lambdas/CoGroupITCase.java | 1 +
.../javaApiOperators/lambdas/CrossITCase.java | 1 +
.../javaApiOperators/lambdas/FilterITCase.java | 70 ++-----------
.../lambdas/FlatJoinITCase.java | 1 +
.../javaApiOperators/lambdas/FlatMapITCase.java | 1 +
.../lambdas/GroupReduceITCase.java | 1 +
.../javaApiOperators/lambdas/JoinITCase.java | 1 +
.../lambdas/LambdaExtractionTest.java | 82 +++++++++++++++
.../javaApiOperators/lambdas/MapITCase.java | 1 +
.../javaApiOperators/lambdas/ReduceITCase.java | 102 +++++--------------
.../SolutionSetFastUpdateOutputCollector.java | 6 +-
.../io/SolutionSetUpdateOutputCollector.java | 2 +-
.../task/AbstractIterativePactTask.java | 4 +-
.../iterative/task/IterationHeadPactTask.java | 7 +-
.../task/IterationIntermediatePactTask.java | 9 +-
.../task/IterationSynchronizationSinkTask.java | 4 +-
.../iterative/task/IterationTailPactTask.java | 9 +-
.../operators/RuntimeExecutionContext.java | 56 ----------
.../flink/test/operators/CrossITCase.java | 18 +---
80 files changed, 658 insertions(+), 734 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
index a4ba6fa..0f1f26d 100644
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
@@ -18,7 +18,7 @@
package org.apache.flink.spargel.java.examples;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.spargel.java.MessageIterator;
import org.apache.flink.spargel.java.MessagingFunction;
@@ -70,7 +70,7 @@ public class SpargelConnectedComponents {
* A map function that takes a Long value and creates a 2-tuple out of it:
* <pre>(Long value) -> (value, value)</pre>
*/
- public static final class IdAssigner extends RichMapFunction<Long, Tuple2<Long, Long>> {
+ public static final class IdAssigner implements MapFunction<Long, Tuple2<Long, Long>> {
@Override
public Tuple2<Long, Long> map(Long value) {
return new Tuple2<Long, Long>(value, value);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
index 9dfc327..6d3dc95 100644
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
@@ -18,8 +18,8 @@
package org.apache.flink.spargel.java.examples;
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.spargel.java.MessageIterator;
@@ -48,7 +48,7 @@ public class SpargelPageRank {
// enumerate some sample edges and assign an initial uniform probability (rank)
DataSet<Tuple2<Long, Double>> intialRanks = env.generateSequence(1, numVertices)
- .map(new RichMapFunction<Long, Tuple2<Long, Double>>() {
+ .map(new MapFunction<Long, Tuple2<Long, Double>>() {
public Tuple2<Long, Double> map(Long value) {
return new Tuple2<Long, Double>(value, 1.0/numVertices);
}
@@ -56,7 +56,7 @@ public class SpargelPageRank {
// generate some random edges. the transition probability on each edge is 1/num-out-edges of the source vertex
DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = env.generateSequence(1, numVertices)
- .flatMap(new RichFlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
+ .flatMap(new FlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
int numOutEdges = (int) (Math.random() * (numVertices / 2));
for (int i = 0; i < numOutEdges; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
index 43c0b84..01d2cd7 100644
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
@@ -18,9 +18,10 @@
package org.apache.flink.spargel.java.examples;
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.RichMapFunction;
-import org.apache.flink.api.java.functions.RichReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
@@ -53,7 +54,7 @@ public class SpargelPageRankCountingVertices {
// generate some random edges. the transition probability on each edge is 1/num-out-edges of the source vertex
DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = env.generateSequence(1, NUM_VERTICES)
- .flatMap(new RichFlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
+ .flatMap(new FlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
int numOutEdges = (int) (Math.random() * (NUM_VERTICES / 2));
for (int i = 0; i < numOutEdges; i++) {
@@ -67,12 +68,12 @@ public class SpargelPageRankCountingVertices {
// count the number of vertices
DataSet<Long> count = vertices
- .map(new RichMapFunction<Long, Long>() {
+ .map(new MapFunction<Long, Long>() {
public Long map(Long value) {
return 1L;
}
})
- .reduce(new RichReduceFunction<Long>() {
+ .reduce(new ReduceFunction<Long>() {
public Long reduce(Long value1, Long value2) {
return value1 + value2;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
index c62f0d7..cd53115 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
@@ -180,7 +180,6 @@ public class JobsServlet extends HttpServlet {
// parse the request
ServletFileUpload uploadHandler = new ServletFileUpload(fileItemFactory);
try {
- @SuppressWarnings("unchecked")
Iterator<FileItem> itr = ((List<FileItem>) uploadHandler.parseRequest(req)).iterator();
// go over the form fields and look for our file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index b55dea0..e22a365 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -37,7 +37,6 @@ import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.Union;
import org.apache.flink.api.common.operators.base.BulkIterationBase;
import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
-import org.apache.flink.api.common.operators.base.CollectorMapOperatorBase;
import org.apache.flink.api.common.operators.base.CrossOperatorBase;
import org.apache.flink.api.common.operators.base.DeltaIterationBase;
import org.apache.flink.api.common.operators.base.FilterOperatorBase;
@@ -648,6 +647,7 @@ public class PactCompiler {
this.forceDOP = forceDOP;
}
+ @SuppressWarnings("deprecation")
@Override
public boolean preVisit(Operator<?> c) {
// check if we have been here before
@@ -671,8 +671,8 @@ public class PactCompiler {
else if (c instanceof MapOperatorBase) {
n = new MapNode((MapOperatorBase<?, ?, ?>) c);
}
- else if (c instanceof CollectorMapOperatorBase) {
- n = new CollectorMapNode((CollectorMapOperatorBase<?, ?, ?>) c);
+ else if (c instanceof org.apache.flink.api.common.operators.base.CollectorMapOperatorBase) {
+ n = new CollectorMapNode((org.apache.flink.api.common.operators.base.CollectorMapOperatorBase<?, ?, ?>) c);
}
else if (c instanceof FlatMapOperatorBase) {
n = new FlatMapNode((FlatMapOperatorBase<?, ?, ?>) c);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java b/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
index b2c163b..0115c31 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
@@ -27,7 +27,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.base.GenericDataSourceBase;
@@ -179,7 +179,7 @@ public abstract class CompilerTestBase implements java.io.Serializable {
}
@SuppressWarnings("unchecked")
- public <T extends PlanNode> T getNode(String name, Class<? extends RichFunction> stubClass) {
+ public <T extends PlanNode> T getNode(String name, Class<? extends Function> stubClass) {
List<PlanNode> nodes = this.map.get(name);
if (nodes == null || nodes.isEmpty()) {
throw new RuntimeException("No node found with the given name and stub class.");
@@ -241,7 +241,7 @@ public abstract class CompilerTestBase implements java.io.Serializable {
}
@SuppressWarnings("unchecked")
- public <T extends Operator<?>> T getNode(String name, Class<? extends RichFunction> stubClass) {
+ public <T extends Operator<?>> T getNode(String name, Class<? extends Function> stubClass) {
List<Operator<?>> nodes = this.map.get(name);
if (nodes == null || nodes.isEmpty()) {
throw new RuntimeException("No node found with the given name and stub class.");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java b/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java
index 736ee14..d6ecc40 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java
@@ -16,18 +16,14 @@
* limitations under the License.
*/
-
package org.apache.flink.compiler.util;
-import java.io.Serializable;
-
import org.apache.flink.api.java.record.functions.CrossFunction;
import org.apache.flink.types.Record;
public class DummyCrossStub extends CrossFunction {
private static final long serialVersionUID = 1L;
-
@Override
public Record cross(Record first, Record second) throws Exception {
return first;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
index 07b957d..981de14 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.functions;
import java.io.Serializable;
@@ -24,9 +23,10 @@ import java.io.Serializable;
import org.apache.flink.configuration.Configuration;
/**
- * An abstract stub implementation for user-defined functions. It offers default implementations
- * for {@link #open(Configuration)} and {@link #close()}. It also offers access to the
- * {@link RuntimeContext} and {@link IterationRuntimeContext}.
+ * An abstract stub implementation for rich user-defined functions.
+ * Rich functions have additional methods for initialization ({@link #open(Configuration)}) and
+ * teardown ({@link #close()}), as well as access to their runtime execution context via
+ * {@link #getRuntimeContext()}.
*/
public abstract class AbstractRichFunction implements RichFunction, Serializable {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
index 5c200af..cc8fe78 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.functions;
import java.io.Serializable;
@@ -24,18 +23,41 @@ import java.util.Iterator;
import org.apache.flink.util.Collector;
-
+/**
+ * The interface for CoGroup functions. CoGroup functions combine two data sets by first grouping each data set
+ * after a key and then "joining" the groups by calling this function with the two sets for each key.
+ * If a key is present in only one of the two inputs, it may be that one of the groups is empty.
+ * <p>
+ * The basic syntax for using CoGoup on two data sets is as follows:
+ * <pre><blockquote>
+ * DataSet<X> set1 = ...;
+ * DataSet<Y> set2 = ...;
+ *
+ * set1.coGroup(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyCoGroupFunction());
+ * </blockquote></pre>
+ * <p>
+ * {@code set1} is here considered the first input, {@code set2} the second input.
+ * <p>
+ * Some keys may only be contained in one of the two original data sets. In that case, the CoGroup function is invoked
+ * with in empty input for the side of the data set that did not contain elements with that specific key.
+ *
+ * @param <V1> The data type of the first input data set.
+ * @param <V2> The data type of the second input data set.
+ * @param <O> The data type of the returned elements.
+ */
public interface CoGroupFunction<V1, V2, O> extends Function, Serializable {
/**
* This method must be implemented to provide a user implementation of a
- * coGroup. It is called for each two key-value pairs that share the same
- * key and come from different inputs.
+ * coGroup. It is called for each pair of element groups where the elements share the
+ * same key.
+ *
+ * @param first The records from the first input.
+ * @param second The records from the second.
+ * @param out A collector to return elements.
*
- * @param first The records from the first input which were paired with the key.
- * @param second The records from the second input which were paired with the key.
- * @param out A collector that collects all output pairs.
+ * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
+ * and may trigger the recovery logic.
*/
void coGroup(Iterator<V1> first, Iterator<V2> second, Collector<O> out) throws Exception;
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
index d72c4c8..aed7fae 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
@@ -16,16 +16,34 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.functions;
import java.io.Serializable;
import java.util.Iterator;
/**
- * Generic interface used for combiners.
+ * Generic interface used for combine functions ("combiners"). Combiners act as auxiliaries to a {@link GroupReduceFunction}
+ * and "pre-reduce" the data. The combine functions typically do not see the entire group of elements, but
+ * only a sub-group.
+ * <p>
+ * Combine functions are frequently helpful in increasing the program efficiency, because they allow the system to
+ * reduce the data volume earlier, before the entire groups have been collected.
+ * <p>
+ * This special variant of the combine function reduces the group of elements into a single element. A variant
+ * that can return multiple values per group is defined in {@link FlatCombineFunction}.
+ *
+ * @param <T> The data type processed by the combine function.
*/
public interface CombineFunction<T> extends Function, Serializable {
- T combine(Iterator<T> records) throws Exception;
+ /**
+ * The combine method, called (potentially multiple timed) with subgroups of elements.
+ *
+ * @param values The elements to be combined.
+ * @return The single resulting value.
+ *
+ * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
+ * and may trigger the recovery logic.
+ */
+ T combine(Iterator<T> values) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
index 0c8bc97..5e2c122 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
@@ -16,27 +16,43 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.functions;
import java.io.Serializable;
-
/**
- * @param <IN1> First input type
- * @param <IN2> Second input type
- * @param <OUT> Output type
+ * Interface for Cross functions. Cross functions are applied to the Cartesian produce of their inputs
+ * and call are called for each pair of elements.
+ *
+ * They are optional, a means of convenience the can be used to directly produce manipulate the
+ * pair of elements, instead of processing 2-tuples that contain the pairs.
+ * <p>
+ * The basic syntax for using Cross on two data sets is as follows:
+ * <pre><blockquote>
+ * DataSet<X> set1 = ...;
+ * DataSet<Y> set2 = ...;
+ *
+ * set1.cross(set2).with(new MyCrossFunction());
+ * </blockquote></pre>
+ * <p>
+ * {@code set1} is here considered the first input, {@code set2} the second input.
+ *
+ * @param <IN1> The type of the elements in the first input.
+ * @param <IN2> The type of the elements in the second input.
+ * @param <OUT> The type of the result elements.
*/
public interface CrossFunction<IN1, IN2, OUT> extends Function, Serializable {
/**
- * User defined function for the cross operator.
+ * Cross UDF method. Called once per pair of elements in the Cartesian product of the inputs.
+ *
+ * @param val1 Element from first input.
+ * @param val2 Element from the second input.
+ * @return The result element.
*
- * @param record1 Record from first input
- * @param record2 Record from the second input
- * @return result of cross UDF.
- * @throws Exception
+ * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
+ * and may trigger the recovery logic.
*/
- OUT cross(IN1 record1, IN2 record2) throws Exception;
+ OUT cross(IN1 val1, IN2 val2) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/ExecutionContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/ExecutionContext.java
deleted file mode 100644
index 9540dc1..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/ExecutionContext.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.functions;
-
-
-/**
- * The execution context provides basic information about the parallel runtime context
- * in which a stub instance lives. Such information includes the current number of
- * parallel stub instances, the stub's parallel task index, the pact name, or the iteration context.
- */
-public interface ExecutionContext {
-
- /**
- * Gets the name of the task. This is the name supplied to the contract upon instantiation. If
- * no name was given at contract instantiation time, a default name will be returned.
- *
- * @return The task's name.
- */
- String getTaskName();
-
- /**
- * Gets the number of parallel subtasks in which the stub is executed.
- *
- * @return The number of parallel subtasks in which the stub is executed.
- */
- int getNumberOfSubtasks();
-
- /**
- * Gets the subtask's parallel task number.
- *
- * @return The subtask's parallel task number.
- */
- int getSubtaskIndex();
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
index 2f68477..2380f60 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
@@ -16,18 +16,33 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.functions;
+
import java.io.Serializable;
+/**
+ * base interface for Filter functions. A filter function take elements and evaluates a
+ * predicate on them to decide whether to keep the element, or to discard it.
+ * <p>
+ * The basic syntax for using a FilterFunction is as follows:
+ * <pre><blockquote>
+ * DataSet<X> input = ...;
+ *
+ * DataSet<X> result = input.filter(new MyFilterFunction());
+ * </blockquote></pre>
+ *
+ * @param <T> The type of the filtered elements.
+ */
public interface FilterFunction<T> extends Function, Serializable {
/**
- * User defined function for a filter.
+ * The filter function that evaluates the predicate.
+ *
+ * @param value The value to be filtered.
+ * @return True for values that should be retained, false for values to be filtered out.
*
- * @param value Incoming tuples
- * @return true for tuples that are allowed to pass the filter
- * @throws Exception
+ * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+ * to fail and may trigger recovery.
*/
boolean filter(T value) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
index b2c8f30..edf1614 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.functions;
import java.io.Serializable;
@@ -25,9 +24,28 @@ import java.util.Iterator;
import org.apache.flink.util.Collector;
/**
- * Generic interface used for combiners.
+ * Generic interface used for combine functions ("combiners"). Combiners act as auxiliaries to a {@link GroupReduceFunction}
+ * and "pre-reduce" the data. The combine functions typically do not see the entire group of elements, but
+ * only a sub-group.
+ * <p>
+ * Combine functions are frequently helpful in increasing the program efficiency, because they allow the system to
+ * reduce the data volume earlier, before the entire groups have been collected.
+ * <p>
+ * This special variant of the combine function supports to return more than one element per group.
+ * It is frequently less efficient to use than the {@link CombineFunction}.
+ *
+ * @param <T> The data type processed by the combine function.
*/
public interface FlatCombineFunction<T> extends Function, Serializable {
+ /**
+ * The combine method, called (potentially multiple timed) with subgroups of elements.
+ *
+ * @param values The elements to be combined.
+ * @param out The collector to use to return values from the function.
+ *
+ * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
+ * and may trigger the recovery logic.
+ */
void combine(Iterator<T> values, Collector<T> out) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
index 6a6b971..3ad2c82 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
@@ -16,15 +16,54 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.functions;
import org.apache.flink.util.Collector;
import java.io.Serializable;
-
+/**
+ * Interface for Join functions. Joins combine two data sets by joining their
+ * elements on specified keys. This function is called with each pair of joining elements.
+ * <p>
+ * This particular variant of the join function supports to return zero, one, or more result values
+ * per pair of joining values.
+ * <p>
+ * By default, the joins follows strictly the semantics of an "inner join" in SQL.
+ * the semantics are those of an "inner join", meaning that elements are filtered out
+ * if their key is not contained in the other data set.
+ * <p>
+ * The basic syntax for using Join on two data sets is as follows:
+ * <pre><blockquote>
+ * DataSet<X> set1 = ...;
+ * DataSet<Y> set2 = ...;
+ *
+ * set1.join(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyJoinFunction());
+ * </blockquote></pre>
+ * <p>
+ * {@code set1} is here considered the first input, {@code set2} the second input.
+ * <p>
+ * The Join function is an optional part of a join operation. If no JoinFunction is provided,
+ * the result of the operation is a sequence of 2-tuples, where the elements in the tuple are those that
+ * the JoinFunction would have been invoked with.
+ * <P>
+ * Note: You can use a {@link CoGroupFunction} to perform an outer join.
+ *
+ * @param <IN1> The type of the elements in the first input.
+ * @param <IN2> The type of the elements in the second input.
+ * @param <OUT> The type of the result elements.
+ */
public interface FlatJoinFunction<IN1, IN2, OUT> extends Function, Serializable {
- void join (IN1 left, IN2 right, Collector<OUT> out) throws Exception;
+ /**
+ * The join method, called once per joined pair of elements.
+ *
+ * @param first The element from first input.
+ * @param second The element from second input.
+ * @param out The collector used to return zero, one, or more elements.
+ *
+ * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+ * to fail and may trigger recovery.
+ */
+ void join (IN1 first, IN2 second, Collector<OUT> out) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
index a8696cf..0c32eae 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
@@ -16,27 +16,36 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.functions;
import org.apache.flink.util.Collector;
import java.io.Serializable;
-
/**
- *
- * @param <T>
- * @param <O>
+ * interface flatMap functions. FlatMap functions take elements and transform them,
+ * into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists
+ * and arrays. Operations that produce multiple strictly one result element per input element can also
+ * use the {@link MapFunction}.
+ * <p>
+ * The basic syntax for using a FlatMapFunction is as follows:
+ * <pre><blockquote>
+ * DataSet<X> input = ...;
+ *
+ * DataSet<Y> result = input.flatMap(new MyFlatMapFunction());
+ * </blockquote></pre>
+ *
+ * @param <T> Type of the input elements.
+ * @param <O> Type of the returned elements.
*/
public interface FlatMapFunction<T, O> extends Function, Serializable {
/**
- * The core method of FlatMappable. Takes an element from the input data set and transforms
+ * The core method of the FlatMapFunction. Takes an element from the input data set and transforms
* it into zero, one, or more elements.
*
* @param value The input value.
- * @param out The collector for for emitting result values.
+ * @param out The collector for returning result values.
*
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
index c2a201f..9e9d88d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
@@ -24,5 +24,4 @@ package org.apache.flink.api.common.functions;
* can be called as Java 8 lambdas
*/
public interface Function {
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
index 41cfa1d..edac4a8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
@@ -16,13 +16,17 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.functions;
import org.apache.flink.util.Collector;
-
-
+/**
+ * Variant of the flat map that is used for backwards compatibility in the deprecated Record-API-
+ *
+ * @param <T> The input data type.
+ * @param <O> The result data type.
+ */
+@Deprecated
public interface GenericCollectorMap<T, O> extends RichFunction {
void map(T record, Collector<O> out) throws Exception;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
index 984d1fd..5fa959c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.functions;
import java.io.Serializable;
@@ -24,22 +23,35 @@ import java.util.Iterator;
import org.apache.flink.util.Collector;
-
/**
- *
- * @param <T> Incoming types
- * @param <O> Outgoing types
+ * The interface for group reduce functions. GroupReduceFunctions process groups of elements.
+ * They may aggregate them to a single value, or produce multiple result values for each group.
+ * The group may be defined by sharing a common grouping key, or the group may simply be
+ * all elements of a data set.
+ * <p>
+ * For a reduce functions that works incrementally by combining always two elements, see
+ * {@link ReduceFunction}.
+ * <p>
+ * The basic syntax for using a grouped GroupReduceFunction is as follows:
+ * <pre><blockquote>
+ * DataSet<X> input = ...;
+ *
+ * DataSet<X> result = input.groupBy(<key-definition>).reduceGroup(new MyGroupReduceFunction());
+ * </blockquote></pre>
+ *
+ * @param <T> Type of the elements that this function processes.
+ * @param <O> The type of the elements returned by the user-defined function.
*/
public interface GroupReduceFunction<T, O> extends Function, Serializable {
+
/**
+ * The reduce method. The function receives one call per group of elements.
*
- * The central function to be implemented for a reducer. The function receives per call one
- * key and all the values that belong to that key. Each key is guaranteed to be processed by exactly
- * one function call across all involved instances across all computing nodes.
- *
- * @param records All records that belong to the given input key.
+ * @param values All records that belong to the given input key.
* @param out The collector to hand results to.
- * @throws Exception
+ *
+ * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+ * to fail and may trigger recovery.
*/
void reduce(Iterator<T> values, Collector<O> out) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
index 02f526a..9a8943c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
@@ -16,13 +16,49 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.functions;
import java.io.Serializable;
-
+/**
+ * Interface for Join functions. Joins combine two data sets by joining their
+ * elements on specified keys. This function is called with each pair of joining elements.
+ * <p>
+ * By default, the joins follows strictly the semantics of an "inner join" in SQL.
+ * the semantics are those of an "inner join", meaning that elements are filtered out
+ * if their key is not contained in the other data set.
+ * <p>
+ * The basic syntax for using Join on two data sets is as follows:
+ * <pre><blockquote>
+ * DataSet<X> set1 = ...;
+ * DataSet<Y> set2 = ...;
+ *
+ * set1.join(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyJoinFunction());
+ * </blockquote></pre>
+ * <p>
+ * {@code set1} is here considered the first input, {@code set2} the second input.
+ * <p>
+ * The Join function is an optional part of a join operation. If no JoinFunction is provided,
+ * the result of the operation is a sequence of 2-tuples, where the elements in the tuple are those that
+ * the JoinFunction would have been invoked with.
+ * <P>
+ * Note: You can use a {@link CoGroupFunction} to perform an outer join.
+ *
+ * @param <IN1> The type of the elements in the first input.
+ * @param <IN2> The type of the elements in the second input.
+ * @param <OUT> The type of the result elements.
+ */
public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {
+ /**
+ * The join method, called once per joined pair of elements.
+ *
+ * @param first The element from first input.
+ * @param second The element from second input.
+ * @return The resulting element.
+ *
+ * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+ * to fail and may trigger recovery.
+ */
OUT join(IN1 first, IN2 second) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
index 4e2520d..dccc980 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
@@ -16,16 +16,31 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.functions;
-
import java.io.Serializable;
+/**
+ * Base interface for Map functions. Map functions take elements and transform them,
+ * element wise. A Map function always produces a single result element for each input element.
+ * Typical applications are parsing elements, converting data types, or projecting out fields.
+ * Operations that produce multiple result elements from a single input element can be implemented
+ * using the {@link FlatMapFunction}.
+ * <p>
+ * The basic syntax for using a MapFunction is as follows:
+ * <pre><blockquote>
+ * DataSet<X> input = ...;
+ *
+ * DataSet<Y> result = input.map(new MyMapFunction());
+ * </blockquote></pre>
+ *
+ * @param <T> Type of the input elements.
+ * @param <O> Type of the returned elements.
+ */
public interface MapFunction<T, O> extends Function, Serializable {
/**
- * The core method of Mappable. Takes an element from the input data set and transforms
+ * The mapping method. Takes an element from the input data set and transforms
* it into exactly one element.
*
* @param value The input value.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
index 04f690a..7a61c97 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
@@ -16,16 +16,36 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.functions;
-
import java.io.Serializable;
+/**
+ * Base interface for Reduce functions. Reduce functions combine groups of elements to
+ * a single value, by taking always two elements and combining them into one. Reduce functions
+ * may be used on entire data sets, or on grouped data sets. In the latter case, each group is reduced
+ * individually.
+ * <p>
+ * For a reduce functions that work on an entire group at the same time (such as the
+ * MapReduce/Hadoop-style reduce), see {@link GroupReduceFunction}. In the general case,
+ * ReduceFunctions are considered faster, because they allow the system to use more efficient
+ * execution strategies.
+ * <p>
+ * The basic syntax for using a grouped ReduceFunction is as follows:
+ * <pre><blockquote>
+ * DataSet<X> input = ...;
+ *
+ * DataSet<X> result = input.groupBy(<key-definition>).reduce(new MyReduceFunction());
+ * </blockquote></pre>
+ * <p>
+ * Like all functions, the ReduceFunction needs to be serializable, as defined in {@link java.io.Serializable}.
+ *
+ * @param <T> Type of the elements that this function processes.
+ */
public interface ReduceFunction<T> extends Function, Serializable {
/**
- * The core method of Reducible, combining two values into one value of the same type.
+ * The core method of ReduceFunction, combining two values into one value of the same type.
* The reduce function is consecutively applied to all values of a group until only a single value remains.
*
* @param value1 The first value to combine.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
index ffc3ac2..8f53252 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.functions;
import org.apache.flink.configuration.Configuration;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 3cf30ff..e18858b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -105,34 +105,6 @@ public interface RuntimeContext {
* Convenience function to create a counter object for histograms.
*/
Histogram getHistogram(String name);
-
-// /**
-// * I propose to remove this and only keep the other more explicit functions
-// * (to add or get an accumulator object)
-// *
-// * Get an existing or new named accumulator object. Use this function to get
-// * an counter for an custom accumulator type. For the integrated
-// * accumulators you better use convenience functions (e.g. getIntCounter).
-// *
-// * There is no need to register accumulators - they will be created when a
-// * UDF asks the first time for a counter that does not exist yet locally.
-// * This implies that there can be conflicts when a counter is requested with
-// * the same name but with different types, either in the same UDF or in
-// * different. In the last case the conflict occurs during merging.
-// *
-// * @param name
-// * @param accumulatorClass
-// * If the accumulator was not created previously
-// * @return
-// */
-// <V, A> Accumulator<V, A> getAccumulator(String name,
-// Class<? extends Accumulator<V, A>> accumulatorClass);
-//
-// /**
-// * See getAccumulable
-// */
-// <T> SimpleAccumulator<T> getSimpleAccumulator(String name,
-// Class<? extends SimpleAccumulator<T>> accumulatorClass);
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
index bc4ffd0..6455c77 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
@@ -18,18 +18,15 @@
package org.apache.flink.api.common.functions.util;
-
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
-import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Method;
public class FunctionUtils {
-
public static void openFunction (Function function, Configuration parameters) throws Exception{
if (function instanceof RichFunction) {
RichFunction richFunction = (RichFunction) function;
@@ -61,21 +58,30 @@ public class FunctionUtils {
}
}
- public static boolean isSerializedLambdaFunction(Function function) {
- Class<?> clazz = function.getClass();
- try {
- Method replaceMethod = clazz.getDeclaredMethod("writeReplace");
- replaceMethod.setAccessible(true);
- Object serializedForm = replaceMethod.invoke(function);
- if (serializedForm instanceof SerializedLambda) {
- return true;
+ public static boolean isLambdaFunction(Function function) {
+ if (function == null) {
+ throw new IllegalArgumentException();
+ }
+
+ for (Class<?> clazz = function.getClass(); clazz != null; clazz = clazz.getSuperclass()) {
+ try {
+ Method replaceMethod = clazz.getDeclaredMethod("writeReplace");
+ replaceMethod.setAccessible(true);
+ Object serialVersion = replaceMethod.invoke(function);
+
+ if (serialVersion.getClass().getName().equals("java.lang.invoke.SerializedLambda")) {
+ return true;
+ }
}
- else {
- return false;
+ catch (NoSuchMethodException e) {
+ // thrown if the method is not there. fall through the loop
+ }
+ catch (Throwable t) {
+ // this should not happen, we are not executing any method code.
+ throw new RuntimeException("Error while checking whether function is a lambda.", t);
}
}
- catch (Exception e) {
- return false;
- }
+
+ return false;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
index c416765..6761c17 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.operators;
import org.apache.flink.api.common.functions.AbstractRichFunction;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
index 66bea7f..c91e6ab 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
@@ -48,6 +48,7 @@ import org.apache.flink.util.Visitor;
/**
*
*/
+@SuppressWarnings("deprecation")
public class BulkIterationBase<T> extends SingleInputOperator<T, T, AbstractRichFunction> implements IterationOperator {
private static String DEFAULT_NAME = "<Unnamed Bulk Iteration>";
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
index ef00a46..6909dd4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.operators.base;
import org.apache.flink.api.common.functions.GenericCollectorMap;
@@ -33,6 +32,7 @@ import org.apache.flink.api.common.operators.util.UserCodeWrapper;
*
* @see GenericCollectorMap
*/
+@Deprecated
public class CollectorMapOperatorBase<IN, OUT, FT extends GenericCollectorMap<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
public CollectorMapOperatorBase(UserCodeWrapper<FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java
index 7d6495b..2683583 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java
@@ -48,6 +48,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase;
/**
* Convenience methods when dealing with {@link Operator}s.
*/
+@SuppressWarnings("deprecation")
public class OperatorUtil {
@SuppressWarnings("rawtypes")
@@ -126,7 +127,7 @@ public class OperatorUtil {
* @param inputs
* all input contracts to this contract
*/
- @SuppressWarnings({ "deprecation", "rawtypes", "unchecked" })
+ @SuppressWarnings({ "rawtypes", "unchecked" })
public static void setInputs(final Operator<?> contract, final List<List<Operator>> inputs) {
if (contract instanceof GenericDataSinkBase) {
if (inputs.size() != 1) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java
index 30091ab..88809bb 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java
@@ -42,6 +42,7 @@ import org.junit.Test;
/**
* Tests {@link OperatorUtil}.
*/
+@SuppressWarnings("deprecation")
public class OperatorUtilTest {
/**
* Test {@link OperatorUtil#getContractClass(Class)}
@@ -115,13 +116,17 @@ public class OperatorUtilTest {
assertEquals(GenericDataSourceBase.class, result);
}
+ @SuppressWarnings("serial")
static abstract class CoGrouper implements CoGroupFunction<IntValue, IntValue, IntValue> {}
+ @SuppressWarnings("serial")
static abstract class Crosser implements CrossFunction<IntValue, IntValue, IntValue> {}
static abstract class Mapper implements GenericCollectorMap<IntValue, IntValue> {}
+ @SuppressWarnings("serial")
static abstract class Matcher implements FlatJoinFunction<IntValue, IntValue, IntValue> {}
+ @SuppressWarnings("serial")
static abstract class Reducer implements GroupReduceFunction<IntValue, IntValue> {}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index e7199f9..3d1238a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -50,9 +50,9 @@ import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.ProjectOperator.Projection;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.ReduceOperator;
+import org.apache.flink.api.java.operators.SortedGrouping;
import org.apache.flink.api.java.operators.UnionOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
-import org.apache.flink.api.java.record.functions.CrossFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
@@ -65,8 +65,8 @@ import org.apache.flink.types.TypeInformation;
* A DataSet represents a collection of elements of the same type.<br/>
* A DataSet can be transformed into another DataSet by applying a transformation as for example
* <ul>
- * <li>{@link DataSet#map(org.apache.flink.api.java.functions.RichMapFunction)},</li>
- * <li>{@link DataSet#reduce(org.apache.flink.api.java.functions.RichReduceFunction)},</li>
+ * <li>{@link DataSet#map(org.apache.flink.api.common.functions.MapFunction)},</li>
+ * <li>{@link DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction)},</li>
* <li>{@link DataSet#join(DataSet)}, or</li>
* <li>{@link DataSet#coGroup(DataSet)}.</li>
* </ul>
@@ -135,7 +135,7 @@ public abstract class DataSet<T> {
if (mapper == null) {
throw new NullPointerException("Map function must not be null.");
}
- if (FunctionUtils.isSerializedLambdaFunction(mapper)) {
+ if (FunctionUtils.isLambdaFunction(mapper)) {
throw new UnsupportedLambdaExpressionException();
}
return new MapOperator<T, R>(this, mapper);
@@ -157,7 +157,7 @@ public abstract class DataSet<T> {
if (flatMapper == null) {
throw new NullPointerException("FlatMap function must not be null.");
}
- if (FunctionUtils.isSerializedLambdaFunction(flatMapper)) {
+ if (FunctionUtils.isLambdaFunction(flatMapper)) {
throw new UnsupportedLambdaExpressionException();
}
return new FlatMapOperator<T, R>(this, flatMapper);
@@ -197,13 +197,13 @@ public abstract class DataSet<T> {
*
* @param fieldIndexes The field indexes of the input tuples that are retained.
* The order of fields in the output tuple corresponds to the order of field indexes.
- * @return A Projection that needs to be converted into a {@link ProjectOperator} to complete the
+ * @return A Projection that needs to be converted into a {@link org.apache.flink.api.java.operators.ProjectOperator} to complete the
* Project transformation by calling {@link Projection#types()}.
*
* @see Tuple
* @see DataSet
* @see Projection
- * @see ProjectOperator
+ * @see org.apache.flink.api.java.operators.ProjectOperator
*/
public Projection<T> project(int... fieldIndexes) {
return new Projection<T>(this, fieldIndexes);
@@ -303,7 +303,7 @@ public abstract class DataSet<T> {
if (reducer == null) {
throw new NullPointerException("GroupReduce function must not be null.");
}
- if (FunctionUtils.isSerializedLambdaFunction(reducer)) {
+ if (FunctionUtils.isLambdaFunction(reducer)) {
throw new UnsupportedLambdaExpressionException();
}
return new GroupReduceOperator<T, R>(this, reducer);
@@ -366,17 +366,15 @@ public abstract class DataSet<T> {
* <ul>
* <li>{@link UnsortedGrouping#sortGroup(int, org.apache.flink.api.common.operators.Order)} to get a {@link SortedGrouping}.
* <li>{@link UnsortedGrouping#aggregate(Aggregations, int)} to apply an Aggregate transformation.
- * <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.java.functions.RichReduceFunction)} to apply a Reduce transformation.
- * <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)} to apply a GroupReduce transformation.
+ * <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.common.functions.ReduceFunction)} to apply a Reduce transformation.
+ * <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)} to apply a GroupReduce transformation.
* </ul>
*
* @param keyExtractor The KeySelector function which extracts the key values from the DataSet on which it is grouped.
* @return An UnsortedGrouping on which a transformation needs to be applied to obtain a transformed DataSet.
*
* @see KeySelector
- * @see Grouping
* @see UnsortedGrouping
- * @see SortedGrouping
* @see AggregateOperator
* @see ReduceOperator
* @see org.apache.flink.api.java.operators.GroupReduceOperator
@@ -395,17 +393,15 @@ public abstract class DataSet<T> {
* <ul>
* <li>{@link UnsortedGrouping#sortGroup(int, org.apache.flink.api.common.operators.Order)} to get a {@link SortedGrouping}.
* <li>{@link UnsortedGrouping#aggregate(Aggregations, int)} to apply an Aggregate transformation.
- * <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.java.functions.RichReduceFunction)} to apply a Reduce transformation.
- * <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)} to apply a GroupReduce transformation.
+ * <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.common.functions.ReduceFunction)} to apply a Reduce transformation.
+ * <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)} to apply a GroupReduce transformation.
* </ul>
*
* @param fields One or more field positions on which the DataSet will be grouped.
* @return A Grouping on which a transformation needs to be applied to obtain a transformed DataSet.
*
* @see Tuple
- * @see Grouping
* @see UnsortedGrouping
- * @see SortedGrouping
* @see AggregateOperator
* @see ReduceOperator
* @see org.apache.flink.api.java.operators.GroupReduceOperator
@@ -424,17 +420,15 @@ public abstract class DataSet<T> {
* <ul>
* <li>{@link UnsortedGrouping#sortGroup(int, org.apache.flink.api.common.operators.Order)} to get a {@link SortedGrouping}.
* <li>{@link UnsortedGrouping#aggregate(Aggregations, int)} to apply an Aggregate transformation.
- * <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.java.functions.RichReduceFunction)} to apply a Reduce transformation.
- * <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)} to apply a GroupReduce transformation.
+ * <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.common.functions.ReduceFunction)} to apply a Reduce transformation.
+ * <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)} to apply a GroupReduce transformation.
* </ul>
*
* @param fields One or more field expressions on which the DataSet will be grouped.
* @return A Grouping on which a transformation needs to be applied to obtain a transformed DataSet.
*
* @see Tuple
- * @see Grouping
* @see UnsortedGrouping
- * @see SortedGrouping
* @see AggregateOperator
* @see ReduceOperator
* @see org.apache.flink.api.java.operators.GroupReduceOperator
@@ -462,7 +456,6 @@ public abstract class DataSet<T> {
* @return A JoinOperatorSets to continue the definition of the Join transformation.
*
* @see JoinOperatorSets
- * @see JoinOperator
* @see DataSet
*/
public <R> JoinOperatorSets<T, R> join(DataSet<R> other) {
@@ -484,7 +477,6 @@ public abstract class DataSet<T> {
* @return A JoinOperatorSets to continue the definition of the Join transformation.
*
* @see JoinOperatorSets
- * @see JoinOperator
* @see DataSet
*/
public <R> JoinOperatorSets<T, R> joinWithTiny(DataSet<R> other) {
@@ -506,7 +498,6 @@ public abstract class DataSet<T> {
* @return A JoinOperatorSet to continue the definition of the Join transformation.
*
* @see JoinOperatorSets
- * @see JoinOperator
* @see DataSet
*/
public <R> JoinOperatorSets<T, R> joinWithHuge(DataSet<R> other) {
@@ -570,7 +561,7 @@ public abstract class DataSet<T> {
* second input being the second field of the tuple.
*
* <p>
- * Call {@link DefaultCross.with(CrossFunction)} to define a {@link CrossFunction} which is called for
+ * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(CrossFunction)} to define a {@link CrossFunction} which is called for
* each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.</br>
*
* @param other The other DataSet with which this DataSet is crossed.
@@ -599,14 +590,15 @@ public abstract class DataSet<T> {
* second input being the second field of the tuple.
*
* <p>
- * Call {@link DefaultCross.with(CrossFunction)} to define a {@link CrossFunction} which is called for
+ * Call {@link DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a
+ * {@link org.apache.flink.api.common.functions.CrossFunction} which is called for
* each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.</br>
*
* @param other The other DataSet with which this DataSet is crossed.
* @return A DefaultCross that returns a Tuple2 for each pair of crossed elements.
*
* @see DefaultCross
- * @see CrossFunction
+ * @see org.apache.flink.api.common.functions.CrossFunction
* @see DataSet
* @see Tuple2
*/
@@ -628,14 +620,15 @@ public abstract class DataSet<T> {
* second input being the second field of the tuple.
*
* <p>
- * Call {@link DefaultCross.with(CrossFunction)} to define a {@link CrossFunction} which is called for
+ * Call {@link DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a
+ * {@link org.apache.flink.api.common.functions.CrossFunction} which is called for
* each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.</br>
*
* @param other The other DataSet with which this DataSet is crossed.
* @return A DefaultCross that returns a Tuple2 for each pair of crossed elements.
*
* @see DefaultCross
- * @see CrossFunction
+ * @see org.apache.flink.api.common.functions.CrossFunction
* @see DataSet
* @see Tuple2
*/
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index ebd1422..af0ea52 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -112,7 +112,7 @@ public abstract class ExecutionEnvironment {
* individually override this value to use a specific degree of parallelism via
* {@link Operator#setParallelism(int)}. Other operations may need to run with a different
* degree of parallelism - for example calling
- * {@link DataSet#reduce(org.apache.flink.api.java.functions.RichReduceFunction)} over the entire
+ * {@link DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction)} over the entire
* set will insert eventually an operation that runs non-parallel (degree of parallelism of one).
*
* @return The degree of parallelism used by operations, unless they override that value. This method
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
index 012ab57..0a83235 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
@@ -23,7 +23,7 @@ import java.util.Iterator;
import org.apache.flink.util.Collector;
/**
- * A variant of the {@link RichFlatMapFunction} that returns elements through an iterator, rather then
+ * A convenience variant of the {@link RichFlatMapFunction} that returns elements through an iterator, rather then
* through a collector. In all other respects, it behaves exactly like the FlatMapFunction.
* <p>
* The function needs to be serializable, as defined in {@link java.io.Serializable}.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
index 8aaaf86..e78c31e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
@@ -22,29 +22,14 @@ import java.util.Iterator;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.util.Collector;
/**
- * The abstract base class for CoGroup functions. CoGroup functions combine two data sets by first grouping each data set
- * after a key and then "joining" the groups by calling this function with the two sets for each key.
- * If a key is present in only one of the two inputs, it may be that one of the groups is empty.
- * <p>
- * The basic syntax for using CoGoup on two data sets is as follows:
- * <pre><blockquote>
- * DataSet<X> set1 = ...;
- * DataSet<Y> set2 = ...;
- *
- * set1.coGroup(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyCoGroupFunction());
- * </blockquote></pre>
- * <p>
- * {@code set1} is here considered the first input, {@code set2} the second input.
- * The keys can be defined through tuple field positions or key extractors.
- * See {@link org.apache.flink.api.java.operators.Keys} for details.
- * <p>
- * Some keys may only be contained in one of the two original data sets. In that case, the CoGroup function is invoked
- * with in empty input for the side of the data set that did not contain elements with that specific key.
- * <p>
- * All functions need to be serializable, as defined in {@link java.io.Serializable}.
+ * Rich variant of the {@link CoGroupFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
*
* @param <IN1> The type of the elements in the first input.
* @param <IN2> The type of the elements in the second input.
@@ -54,20 +39,6 @@ public abstract class RichCoGroupFunction<IN1, IN2, OUT> extends AbstractRichFun
private static final long serialVersionUID = 1L;
-
- /**
- * The core method of the CoGroupFunction. This method is called for each pair of groups that have the same
- * key. The elements of the groups are returned by the respective iterators.
- *
- * It is possible that one of the two groups is empty, in which case the respective iterator has no elements.
- *
- * @param first The group from the first input.
- * @param second The group from the second input.
- * @param out The collector through which to return the result elements.
- *
- * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
- * to fail and may trigger recovery.
- */
@Override
public abstract void coGroup(Iterator<IN1> first, Iterator<IN2> second, Collector<OUT> out) throws Exception;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java
index a4e1248..58be279 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java
@@ -20,26 +20,13 @@ package org.apache.flink.api.java.functions;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.CrossFunction;
-
+import org.apache.flink.api.common.functions.RichFunction;
/**
- * The abstract base class for Cross functions. Cross functions build a Cartesian produce of their inputs
- * and call the function or each pair of elements.
- * They are a means of convenience and can be used to directly produce manipulate the
- * pair of elements, instead of having the operator build 2-tuples, and then using a
- * MapFunction over those 2-tuples.
- * <p>
- * The basic syntax for using Cross on two data sets is as follows:
- * <pre><blockquote>
- * DataSet<X> set1 = ...;
- * DataSet<Y> set2 = ...;
- *
- * set1.cross(set2).with(new MyCrossFunction());
- * </blockquote></pre>
- * <p>
- * {@code set1} is here considered the first input, {@code set2} the second input.
- * <p>
- * All functions need to be serializable, as defined in {@link java.io.Serializable}.
+ * Rich variant of the {@link CrossFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
*
* @param <IN1> The type of the elements in the first input.
* @param <IN2> The type of the elements in the second input.
@@ -48,19 +35,7 @@ import org.apache.flink.api.common.functions.CrossFunction;
public abstract class RichCrossFunction<IN1, IN2, OUT> extends AbstractRichFunction implements CrossFunction<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
-
- /**
- * The core method of the cross operation. The method will be invoked for each pair of elements
- * in the Cartesian product.
- *
- * @param first The element from the first input.
- * @param second The element from the second input.
- * @return The result element.
- *
- * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
- * to fail and may trigger recovery.
- */
@Override
public abstract OUT cross(IN1 first, IN2 second) throws Exception;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java
index e3baa74..9057a0f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java
@@ -20,19 +20,13 @@ package org.apache.flink.api.java.functions;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichFunction;
/**
- * The abstract base class for Filter functions. A filter function take elements and evaluates a
- * predicate on them to decide whether to keep the element, or to discard it.
- * <p>
- * The basic syntax for using a FilterFunction is as follows:
- * <pre><blockquote>
- * DataSet<X> input = ...;
- *
- * DataSet<X> result = input.filter(new MyFilterFunction());
- * </blockquote></pre>
- * <p>
- * Like all functions, the FilterFunction needs to be serializable, as defined in {@link java.io.Serializable}.
+ * Rich variant of the {@link FilterFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
*
* @param <T> The type of the filtered elements.
*/
@@ -40,18 +34,6 @@ public abstract class RichFilterFunction<T> extends AbstractRichFunction impleme
private static final long serialVersionUID = 1L;
- /**
- * The core method of the FilterFunction. The method is called for each element in the input,
- * and determines whether the element should be kept or filtered out. If the method returns true,
- * the element passes the filter and is kept, if the method returns false, the element is
- * filtered out.
- *
- * @param value The input value to be filtered.
- * @return Flag to indicate whether to keep the value (true) or to discard it (false).
- *
- * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
- * to fail and may trigger recovery.
- */
@Override
public abstract boolean filter(T value) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
index 8c326c6..97c15ff 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
@@ -21,10 +21,19 @@ package org.apache.flink.api.java.functions;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.util.Collector;
import java.util.Iterator;
+/**
+ * Rich variant of the {@link FlatCombineFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <T> The data type of the elements to be combined.
+ */
public abstract class RichFlatCombineFunction<T> extends AbstractRichFunction implements FlatCombineFunction<T> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java
index 15b4539..6918364 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java
@@ -20,36 +20,14 @@ package org.apache.flink.api.java.functions;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.util.Collector;
/**
- * The abstract base class for Join functions. Join functions combine two data sets by joining their
- * elements on specified keys and calling this function with each pair of joining elements.
- * By default, this follows strictly the semantics of an "inner join" in SQL.
- * the semantics are those of an "inner join", meaning that elements are filtered out
- * if their key is not contained in the other data set.
- * <p>
- * Per the semantics of an inner join, the function is
- * <p>
- * The basic syntax for using Join on two data sets is as follows:
- * <pre><blockquote>
- * DataSet<X> set1 = ...;
- * DataSet<Y> set2 = ...;
- *
- * set1.join(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyJoinFunction());
- * </blockquote></pre>
- * <p>
- * {@code set1} is here considered the first input, {@code set2} the second input.
- * The keys can be defined through tuple field positions or key extractors.
- * See {@link org.apache.flink.api.java.operators.Keys} for details.
- * <p>
- * The Join function is actually not a necessary part of a join operation. If no JoinFunction is provided,
- * the result of the operation is a sequence of Tuple2, where the elements in the tuple are those that
- * the JoinFunction would have been invoked with.
- * <P>
- * Note: You can use a {@link RichCoGroupFunction} to perform an outer join.
- * <p>
- * All functions need to be serializable, as defined in {@link java.io.Serializable}.
+ * Rich variant of the {@link FlatJoinFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
*
* @param <IN1> The type of the elements in the first input.
* @param <IN2> The type of the elements in the second input.
@@ -59,17 +37,6 @@ public abstract class RichFlatJoinFunction<IN1, IN2, OUT> extends AbstractRichFu
private static final long serialVersionUID = 1L;
- /**
- * The user-defined method for performing transformations after a join.
- * The method is called with matching pairs of elements from the inputs.
- *
- * @param first The element from first input.
- * @param second The element from second input.
- * @return The resulting element.
- *
- * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
- * to fail and may trigger recovery.
- */
@Override
public abstract void join(IN1 first, IN2 second, Collector<OUT> out) throws Exception;
}