You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/08/01 09:29:26 UTC
[10/22] [FLINK-701] Refactor Java API to use SAM interfaces.
Introduce RichFunction stubs for all UDFs.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index 5ca1068..a07a157 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -22,8 +22,10 @@ import java.security.InvalidParameterException;
import java.util.Arrays;
import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.GenericJoiner;
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.common.operators.DualInputSemanticProperties;
import org.apache.flink.api.common.operators.Operator;
@@ -32,19 +34,22 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.DeltaIteration.SolutionSetPlaceHolder;
-import org.apache.flink.api.java.functions.JoinFunction;
+import org.apache.flink.api.java.functions.RichFlatJoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
import org.apache.flink.api.java.operators.Keys.FieldPositionKeys;
import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingJoinOperator;
import org.apache.flink.api.java.operators.translation.TupleKeyExtractingMapper;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.types.TypeInformation;
//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
import org.apache.flink.api.java.tuple.*;
+import org.apache.flink.util.Collector;
//CHECKSTYLE.ON: AvoidStarImport
/**
@@ -147,12 +152,12 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
* @param <I2> The type of the second input DataSet of the Join transformation.
* @param <OUT> The type of the result of the Join transformation.
*
- * @see JoinFunction
+ * @see org.apache.flink.api.java.functions.RichFlatJoinFunction
* @see DataSet
*/
public static class EquiJoin<I1, I2, OUT> extends JoinOperator<I1, I2, OUT> {
- private final JoinFunction<I1, I2, OUT> function;
+ private final FlatJoinFunction<I1, I2, OUT> function;
@SuppressWarnings("unused")
private boolean preserve1;
@@ -160,7 +165,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
private boolean preserve2;
protected EquiJoin(DataSet<I1> input1, DataSet<I2> input2,
- Keys<I1> keys1, Keys<I2> keys2, JoinFunction<I1, I2, OUT> function,
+ Keys<I1> keys1, Keys<I2> keys2, FlatJoinFunction<I1, I2, OUT> function,
TypeInformation<OUT> returnType, JoinHint hint)
{
super(input1, input2, keys1, keys2, returnType, hint);
@@ -171,14 +176,33 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
this.function = function;
- if (!(function instanceof ProjectJoinFunction)) {
+ if (!(function instanceof ProjectFlatJoinFunction)) {
extractSemanticAnnotationsFromUdf(function.getClass());
} else {
- generateProjectionProperties(((ProjectJoinFunction<?, ?, ?>) function));
+ generateProjectionProperties(((ProjectFlatJoinFunction<?, ?, ?>) function));
}
}
- public void generateProjectionProperties(ProjectJoinFunction<?, ?, ?> pjf) {
+ protected EquiJoin(DataSet<I1> input1, DataSet<I2> input2,
+ Keys<I1> keys1, Keys<I2> keys2, FlatJoinFunction<I1, I2, OUT> generatedFunction, JoinFunction<I1, I2, OUT> function,
+ TypeInformation<OUT> returnType, JoinHint hint)
+ {
+ super(input1, input2, keys1, keys2, returnType, hint);
+
+ if (function == null) {
+ throw new NullPointerException();
+ }
+
+ this.function = generatedFunction;
+
+ if (!(generatedFunction instanceof ProjectFlatJoinFunction)) {
+ extractSemanticAnnotationsFromUdf(function.getClass());
+ } else {
+ generateProjectionProperties(((ProjectFlatJoinFunction<?, ?, ?>) generatedFunction));
+ }
+ }
+
+ public void generateProjectionProperties(ProjectFlatJoinFunction<?, ?, ?> pjf) {
DualInputSemanticProperties props = SemanticPropUtil.createProjectionPropertiesDual(pjf.getFields(), pjf.getIsFromFirst());
setSemanticProperties(props);
}
@@ -238,8 +262,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
int[] logicalKeyPositions1 = super.keys1.computeLogicalKeyPositions();
int[] logicalKeyPositions2 = super.keys2.computeLogicalKeyPositions();
- JoinOperatorBase<I1, I2, OUT, GenericJoiner<I1, I2, OUT>> po =
- new JoinOperatorBase<I1, I2, OUT, GenericJoiner<I1, I2, OUT>>(function,
+ JoinOperatorBase<I1, I2, OUT, FlatJoinFunction<I1, I2, OUT>> po =
+ new JoinOperatorBase<I1, I2, OUT, FlatJoinFunction<I1, I2, OUT>>(function,
new BinaryOperatorInformation<I1, I2, OUT>(getInput1Type(), getInput2Type(), getResultType()),
logicalKeyPositions1, logicalKeyPositions2,
name);
@@ -298,7 +322,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
private static <I1, I2, K, OUT> PlanUnwrappingJoinOperator<I1, I2, OUT, K> translateSelectorFunctionJoin(
Keys.SelectorFunctionKeys<I1, ?> rawKeys1, Keys.SelectorFunctionKeys<I2, ?> rawKeys2,
- JoinFunction<I1, I2, OUT> function,
+ FlatJoinFunction<I1, I2, OUT> function,
TypeInformation<I1> inputType1, TypeInformation<I2> inputType2, TypeInformation<OUT> outputType, String name,
Operator<I1> input1, Operator<I2> input2)
{
@@ -313,10 +337,10 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
final KeyExtractingMapper<I1, K> extractor1 = new KeyExtractingMapper<I1, K>(keys1.getKeyExtractor());
final KeyExtractingMapper<I2, K> extractor2 = new KeyExtractingMapper<I2, K>(keys2.getKeyExtractor());
- final MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>> keyMapper1 =
- new MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
- final MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>> keyMapper2 =
- new MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
+ final MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>> keyMapper1 =
+ new MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
+ final MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>> keyMapper2 =
+ new MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
final PlanUnwrappingJoinOperator<I1, I2, OUT, K> join = new PlanUnwrappingJoinOperator<I1, I2, OUT, K>(function, keys1, keys2, name, outputType, typeInfoWithKey1, typeInfoWithKey2);
join.setFirstInput(keyMapper1);
@@ -333,7 +357,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
private static <I1, I2, K, OUT> PlanUnwrappingJoinOperator<I1, I2, OUT, K> translateSelectorFunctionJoinRight(
int[] logicalKeyPositions1, Keys.SelectorFunctionKeys<I2, ?> rawKeys2,
- JoinFunction<I1, I2, OUT> function,
+ FlatJoinFunction<I1, I2, OUT> function,
TypeInformation<I1> inputType1, TypeInformation<I2> inputType2, TypeInformation<OUT> outputType, String name,
Operator<I1> input1, Operator<I2> input2)
{
@@ -350,10 +374,10 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
final TupleKeyExtractingMapper<I1, K> extractor1 = new TupleKeyExtractingMapper<I1, K>(logicalKeyPositions1[0]);
final KeyExtractingMapper<I2, K> extractor2 = new KeyExtractingMapper<I2, K>(keys2.getKeyExtractor());
- final MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>> keyMapper1 =
- new MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
- final MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>> keyMapper2 =
- new MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
+ final MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>> keyMapper1 =
+ new MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
+ final MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>> keyMapper2 =
+ new MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
final PlanUnwrappingJoinOperator<I1, I2, OUT, K> join = new PlanUnwrappingJoinOperator<I1, I2, OUT, K>(function, logicalKeyPositions1, keys2, name, outputType, typeInfoWithKey1, typeInfoWithKey2);
@@ -371,7 +395,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
private static <I1, I2, K, OUT> PlanUnwrappingJoinOperator<I1, I2, OUT, K> translateSelectorFunctionJoinLeft(
Keys.SelectorFunctionKeys<I1, ?> rawKeys1, int[] logicalKeyPositions2,
- JoinFunction<I1, I2, OUT> function,
+ FlatJoinFunction<I1, I2, OUT> function,
TypeInformation<I1> inputType1, TypeInformation<I2> inputType2, TypeInformation<OUT> outputType, String name,
Operator<I1> input1, Operator<I2> input2)
{
@@ -388,10 +412,10 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
final KeyExtractingMapper<I1, K> extractor1 = new KeyExtractingMapper<I1, K>(keys1.getKeyExtractor());
final TupleKeyExtractingMapper<I2, K> extractor2 = new TupleKeyExtractingMapper<I2, K>(logicalKeyPositions2[0]);
- final MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>> keyMapper1 =
- new MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
- final MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>> keyMapper2 =
- new MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
+ final MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>> keyMapper1 =
+ new MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
+ final MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>> keyMapper2 =
+ new MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
final PlanUnwrappingJoinOperator<I1, I2, OUT, K> join = new PlanUnwrappingJoinOperator<I1, I2, OUT, K>(function, keys1, logicalKeyPositions2, name, outputType, typeInfoWithKey1, typeInfoWithKey2);
@@ -424,28 +448,73 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
Keys<I1> keys1, Keys<I2> keys2, JoinHint hint)
{
super(input1, input2, keys1, keys2,
- (JoinFunction<I1, I2, Tuple2<I1, I2>>) new DefaultJoinFunction<I1, I2>(),
+ (RichFlatJoinFunction<I1, I2, Tuple2<I1, I2>>) new DefaultFlatJoinFunction<I1, I2>(),
new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), hint);
}
/**
- * Finalizes a Join transformation by applying a {@link JoinFunction} to each pair of joined elements.<br/>
+ * Finalizes a Join transformation by applying a {@link org.apache.flink.api.java.functions.RichFlatJoinFunction} to each pair of joined elements.<br/>
* Each JoinFunction call returns exactly one element.
*
* @param function The JoinFunction that is called for each pair of joined elements.
* @return An EquiJoin that represents the joined result DataSet
*
- * @see JoinFunction
+ * @see org.apache.flink.api.java.functions.RichFlatJoinFunction
* @see org.apache.flink.api.java.operators.JoinOperator.EquiJoin
* @see DataSet
*/
- public <R> EquiJoin<I1, I2, R> with(JoinFunction<I1, I2, R> function) {
+ public <R> EquiJoin<I1, I2, R> with(FlatJoinFunction<I1, I2, R> function) {
if (function == null) {
throw new NullPointerException("Join function must not be null.");
}
+ if (FunctionUtils.isSerializedLambdaFunction(function)) {
+ throw new UnsupportedLambdaExpressionException();
+ }
TypeInformation<R> returnType = TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type());
return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), function, returnType, getJoinHint());
}
+
+ public <R> EquiJoin<I1, I2, R> with (JoinFunction<I1, I2, R> function) {
+ if (function == null) {
+ throw new NullPointerException("Join function must not be null.");
+ }
+ if (FunctionUtils.isSerializedLambdaFunction(function)) {
+ throw new UnsupportedLambdaExpressionException();
+ }
+ FlatJoinFunction generatedFunction = new WrappingFlatJoinFunction<I1, I2, R>(function);
+ TypeInformation<R> returnType = TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type());
+ return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), generatedFunction, function, returnType, getJoinHint());
+ }
+
+ private static class WrappingFlatJoinFunction<IN1, IN2, OUT> extends WrappingFunction<JoinFunction<IN1,IN2,OUT>> implements FlatJoinFunction<IN1, IN2, OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ private WrappingFlatJoinFunction(JoinFunction<IN1, IN2, OUT> wrappedFunction) {
+ super(wrappedFunction);
+ }
+
+ @Override
+ public void join(IN1 left, IN2 right, Collector<OUT> out) throws Exception {
+ out.collect (this.wrappedFunction.join(left, right));
+ }
+ }
+
+ /*
+ private static class GeneratedFlatJoinFunction<IN1, IN2, OUT> extends FlatJoinFunction<IN1, IN2, OUT> {
+
+ private Joinable<IN1,IN2,OUT> function;
+
+ private GeneratedFlatJoinFunction(Joinable<IN1, IN2, OUT> function) {
+ this.function = function;
+ }
+
+ @Override
+ public void join(IN1 first, IN2 second, Collector<OUT> out) throws Exception {
+ out.collect(function.join(first, second));
+ }
+ }
+ */
/**
* Initiates a ProjectJoin transformation and projects the first join input<br/>
@@ -530,7 +599,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
protected ProjectJoin(DataSet<I1> input1, DataSet<I2> input2, Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, int[] fields, boolean[] isFromFirst, TupleTypeInfo<OUT> returnType) {
super(input1, input2, keys1, keys2,
- new ProjectJoinFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()),
+ new ProjectFlatJoinFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()),
returnType, hint);
}
@@ -821,20 +890,20 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
// default join functions
// --------------------------------------------------------------------------------------------
- public static final class DefaultJoinFunction<T1, T2> extends JoinFunction<T1, T2, Tuple2<T1, T2>> {
+ public static final class DefaultFlatJoinFunction<T1, T2> extends RichFlatJoinFunction<T1, T2, Tuple2<T1, T2>> {
private static final long serialVersionUID = 1L;
private final Tuple2<T1, T2> outTuple = new Tuple2<T1, T2>();
@Override
- public Tuple2<T1, T2> join(T1 first, T2 second) throws Exception {
+ public void join(T1 first, T2 second, Collector<Tuple2<T1,T2>> out) throws Exception {
outTuple.f0 = first;
outTuple.f1 = second;
- return outTuple;
+ out.collect(outTuple);
}
}
- public static final class ProjectJoinFunction<T1, T2, R extends Tuple> extends JoinFunction<T1, T2, R> {
+ public static final class ProjectFlatJoinFunction<T1, T2, R extends Tuple> extends RichFlatJoinFunction<T1, T2, R> {
private static final long serialVersionUID = 1L;
@@ -851,7 +920,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
* @param isFromFirst List of flags indicating whether the field should be copied from the first (true) or the second (false) input.
* @param outTupleInstance An instance of an output tuple.
*/
- private ProjectJoinFunction(int[] fields, boolean[] isFromFirst, R outTupleInstance) {
+ private ProjectFlatJoinFunction(int[] fields, boolean[] isFromFirst, R outTupleInstance) {
if(fields.length != isFromFirst.length) {
throw new IllegalArgumentException("Fields and isFromFirst arrays must have same length!");
@@ -869,7 +938,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
return isFromFirst;
}
- public R join(T1 in1, T2 in2) {
+ public void join(T1 in1, T2 in2, Collector<R> out) {
for(int i=0; i<fields.length; i++) {
if(isFromFirst[i]) {
if(fields[i] >= 0) {
@@ -885,27 +954,33 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
}
}
}
- return outTuple;
+ out.collect(outTuple);
}
}
- public static final class LeftSemiJoinFunction<T1, T2> extends JoinFunction<T1, T2, T1> {
+ public static final class LeftSemiFlatJoinFunction<T1, T2> extends RichFlatJoinFunction<T1, T2, T1> {
private static final long serialVersionUID = 1L;
@Override
- public T1 join(T1 left, T2 right) throws Exception {
- return left;
+ //public T1 join(T1 left, T2 right) throws Exception {
+ // return left;
+ //}
+ public void join (T1 left, T2 right, Collector<T1> out) {
+ out.collect(left);
}
}
- public static final class RightSemiJoinFunction<T1, T2> extends JoinFunction<T1, T2, T2> {
+ public static final class RightSemiFlatJoinFunction<T1, T2> extends RichFlatJoinFunction<T1, T2, T2> {
private static final long serialVersionUID = 1L;
@Override
- public T2 join(T1 left, T2 right) throws Exception {
- return right;
+ //public T2 join(T1 left, T2 right) throws Exception {
+ // return right;
+ //}
+ public void join (T1 left, T2 right, Collector<T2> out) {
+ out.collect(right);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
index 03c6037..eccdeec 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
@@ -18,11 +18,10 @@
package org.apache.flink.api.java.operators;
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.java.functions.MapFunction;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.DataSet;
@@ -34,7 +33,7 @@ import org.apache.flink.api.java.DataSet;
* @param <IN> The type of the data set consumed by the operator.
* @param <OUT> The type of the data set created by the operator.
*
- * @see MapFunction
+ * @see org.apache.flink.api.common.functions.MapFunction
*/
public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOperator<IN, OUT>> {
@@ -42,6 +41,7 @@ public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOpe
public MapOperator(DataSet<IN> input, MapFunction<IN, OUT> function) {
+
super(input, TypeExtractor.getMapReturnTypes(function, input.getType()));
this.function = function;
@@ -49,11 +49,11 @@ public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOpe
}
@Override
- protected org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, GenericMap<IN, OUT>> translateToDataFlow(Operator<IN> input) {
+ protected org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {
String name = getName() != null ? getName() : function.getClass().getName();
// create operator
- MapOperatorBase<IN, OUT, GenericMap<IN, OUT>> po = new MapOperatorBase<IN, OUT, GenericMap<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
+ MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> po = new MapOperatorBase<IN, OUT, MapFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
// set input
po.setInput(input);
// set dop
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
index 9e94670..dd5a3bd 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
@@ -20,7 +20,7 @@ package org.apache.flink.api.java.operators;
import java.util.Arrays;
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.SemanticPropUtil;
@@ -51,7 +51,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
}
@Override
- protected org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, GenericMap<IN,OUT>> translateToDataFlow(Operator<IN> input) {
+ protected org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, MapFunction<IN,OUT>> translateToDataFlow(Operator<IN> input) {
String name = getName() != null ? getName() : "Projection " + Arrays.toString(fields);
// create operator
PlanProjectOperator<IN, OUT> ppo = new PlanProjectOperator<IN, OUT>(fields, name, getInputType(), getResultType());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java
deleted file mode 100644
index d88d43d..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.operators;
-
-import org.apache.flink.api.common.functions.GenericCombine;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
-import org.apache.flink.api.common.functions.GenericMap;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction.Combinable;
-import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
-import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.types.TypeInformation;
-
-import org.apache.flink.api.java.DataSet;
-
-/**
- * This operator represents the application of a "reduceGroup" function on a data set, and the
- * result data set produced by the function.
- *
- * @param <IN> The type of the data set consumed by the operator.
- * @param <OUT> The type of the data set created by the operator.
- */
-public class ReduceGroupOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, ReduceGroupOperator<IN, OUT>> {
-
- private final GroupReduceFunction<IN, OUT> function;
-
- private final Grouping<IN> grouper;
-
- private boolean combinable;
-
-
- /**
- * Constructor for a non-grouped reduce (all reduce).
- *
- * @param input The input data set to the groupReduce function.
- * @param function The user-defined GroupReduce function.
- */
- public ReduceGroupOperator(DataSet<IN> input, GroupReduceFunction<IN, OUT> function) {
- super(input, TypeExtractor.getGroupReduceReturnTypes(function, input.getType()));
-
- this.function = function;
- this.grouper = null;
- checkCombinability();
- }
-
- /**
- * Constructor for a grouped reduce.
- *
- * @param input The grouped input to be processed group-wise by the groupReduce function.
- * @param function The user-defined GroupReduce function.
- */
- public ReduceGroupOperator(Grouping<IN> input, GroupReduceFunction<IN, OUT> function) {
- super(input != null ? input.getDataSet() : null, TypeExtractor.getGroupReduceReturnTypes(function, input.getDataSet().getType()));
-
- this.function = function;
- this.grouper = input;
- checkCombinability();
-
- extractSemanticAnnotationsFromUdf(function.getClass());
- }
-
- private void checkCombinability() {
- if (function instanceof GenericCombine && function.getClass().getAnnotation(Combinable.class) != null) {
- this.combinable = true;
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Properties
- // --------------------------------------------------------------------------------------------
-
- public boolean isCombinable() {
- return combinable;
- }
-
- public void setCombinable(boolean combinable) {
- // sanity check that the function is a subclass of the combine interface
- if (combinable && !(function instanceof GenericCombine)) {
- throw new IllegalArgumentException("The function does not implement the combine interface.");
- }
-
- this.combinable = combinable;
- }
-
- @Override
- protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<?, OUT, ?> translateToDataFlow(Operator<IN> input) {
-
- String name = getName() != null ? getName() : function.getClass().getName();
-
- // distinguish between grouped reduce and non-grouped reduce
- if (grouper == null) {
- // non grouped reduce
- UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
- GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>> po =
- new GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>>(function, operatorInfo, new int[0], name);
-
- po.setCombinable(combinable);
- // set input
- po.setInput(input);
- // the degree of parallelism for a non grouped reduce can only be 1
- po.setDegreeOfParallelism(1);
- return po;
- }
-
- if (grouper.getKeys() instanceof Keys.SelectorFunctionKeys) {
-
- @SuppressWarnings("unchecked")
- Keys.SelectorFunctionKeys<IN, ?> selectorKeys = (Keys.SelectorFunctionKeys<IN, ?>) grouper.getKeys();
-
- PlanUnwrappingReduceGroupOperator<IN, OUT, ?> po = translateSelectorFunctionReducer(
- selectorKeys, function, getInputType(), getResultType(), name, input, isCombinable());
-
- po.setDegreeOfParallelism(this.getParallelism());
-
- return po;
- }
- else if (grouper.getKeys() instanceof Keys.FieldPositionKeys) {
-
- int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions();
- UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
- GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>> po =
- new GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);
-
- po.setCombinable(combinable);
- po.setInput(input);
- po.setDegreeOfParallelism(this.getParallelism());
-
- // set group order
- if (grouper instanceof SortedGrouping) {
- SortedGrouping<IN> sortedGrouper = (SortedGrouping<IN>) grouper;
-
- int[] sortKeyPositions = sortedGrouper.getGroupSortKeyPositions();
- Order[] sortOrders = sortedGrouper.getGroupSortOrders();
-
- Ordering o = new Ordering();
- for(int i=0; i < sortKeyPositions.length; i++) {
- o.appendOrdering(sortKeyPositions[i], null, sortOrders[i]);
- }
- po.setGroupOrder(o);
- }
-
- return po;
- }
- else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {
-
- int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions();
- UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
- GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>> po =
- new GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);
-
- po.setCombinable(combinable);
- po.setInput(input);
- po.setDegreeOfParallelism(this.getParallelism());
-
- return po;
- }
- else {
- throw new UnsupportedOperationException("Unrecognized key type.");
- }
-
- }
-
-
- // --------------------------------------------------------------------------------------------
-
- private static <IN, OUT, K> PlanUnwrappingReduceGroupOperator<IN, OUT, K> translateSelectorFunctionReducer(
- Keys.SelectorFunctionKeys<IN, ?> rawKeys, GroupReduceFunction<IN, OUT> function,
- TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input,
- boolean combinable)
- {
- @SuppressWarnings("unchecked")
- final Keys.SelectorFunctionKeys<IN, K> keys = (Keys.SelectorFunctionKeys<IN, K>) rawKeys;
-
- TypeInformation<Tuple2<K, IN>> typeInfoWithKey = new TupleTypeInfo<Tuple2<K, IN>>(keys.getKeyType(), inputType);
-
- KeyExtractingMapper<IN, K> extractor = new KeyExtractingMapper<IN, K>(keys.getKeyExtractor());
-
- PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer = new PlanUnwrappingReduceGroupOperator<IN, OUT, K>(function, keys, name, outputType, typeInfoWithKey, combinable);
-
- MapOperatorBase<IN, Tuple2<K, IN>, GenericMap<IN, Tuple2<K, IN>>> mapper = new MapOperatorBase<IN, Tuple2<K, IN>, GenericMap<IN, Tuple2<K, IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple2<K, IN>>(inputType, typeInfoWithKey), "Key Extractor");
-
- reducer.setInput(mapper);
- mapper.setInput(input);
-
- // set the mapper's parallelism to the input parallelism to make sure it is chained
- mapper.setDegreeOfParallelism(input.getDegreeOfParallelism());
-
- return reducer;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
index 12e0f89..13a6c91 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
@@ -18,13 +18,12 @@
package org.apache.flink.api.java.operators;
-import org.apache.flink.api.common.functions.GenericMap;
-import org.apache.flink.api.common.functions.GenericReduce;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
-import org.apache.flink.api.java.functions.ReduceFunction;
import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceOperator;
@@ -40,7 +39,7 @@ import org.apache.flink.api.java.DataSet;
*
* @param <IN> The type of the data set reduced by the operator.
*
- * @see ReduceFunction
+ * @see org.apache.flink.api.common.functions.ReduceFunction
*/
public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOperator<IN>> {
@@ -83,8 +82,8 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
if (grouper == null) {
// non grouped reduce
UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<IN, IN>(getInputType(), getInputType());
- ReduceOperatorBase<IN, GenericReduce<IN>> po =
- new ReduceOperatorBase<IN, GenericReduce<IN>>(function, operatorInfo, new int[0], name);
+ ReduceOperatorBase<IN, ReduceFunction<IN>> po =
+ new ReduceOperatorBase<IN, ReduceFunction<IN>>(function, operatorInfo, new int[0], name);
// set input
po.setInput(input);
@@ -109,8 +108,8 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
// reduce with field positions
int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions();
UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<IN, IN>(getInputType(), getInputType());
- ReduceOperatorBase<IN, GenericReduce<IN>> po =
- new ReduceOperatorBase<IN, GenericReduce<IN>>(function, operatorInfo, logicalKeyPositions, name);
+ ReduceOperatorBase<IN, ReduceFunction<IN>> po =
+ new ReduceOperatorBase<IN, ReduceFunction<IN>>(function, operatorInfo, logicalKeyPositions, name);
// set input
po.setInput(input);
@@ -139,8 +138,8 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
PlanUnwrappingReduceOperator<T, K> reducer = new PlanUnwrappingReduceOperator<T, K>(function, keys, name, inputType, typeInfoWithKey);
- MapOperatorBase<T, Tuple2<K, T>, GenericMap<T, Tuple2<K, T>>> keyExtractingMap = new MapOperatorBase<T, Tuple2<K, T>, GenericMap<T, Tuple2<K, T>>>(extractor, new UnaryOperatorInformation<T, Tuple2<K, T>>(inputType, typeInfoWithKey), "Key Extractor");
- MapOperatorBase<Tuple2<K, T>, T, GenericMap<Tuple2<K, T>, T>> keyRemovingMap = new MapOperatorBase<Tuple2<K, T>, T, GenericMap<Tuple2<K, T>, T>>(new KeyRemovingMapper<T, K>(), new UnaryOperatorInformation<Tuple2<K, T>, T>(typeInfoWithKey, inputType), "Key Extractor");
+ MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>> keyExtractingMap = new MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>>(extractor, new UnaryOperatorInformation<T, Tuple2<K, T>>(inputType, typeInfoWithKey), "Key Extractor");
+ MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>> keyRemovingMap = new MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>>(new KeyRemovingMapper<T, K>(), new UnaryOperatorInformation<Tuple2<K, T>, T>(typeInfoWithKey, inputType), "Key Extractor");
keyExtractingMap.setInput(input);
reducer.setInput(keyExtractingMap);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
index fa2c1aa..dcdbed4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
@@ -35,8 +35,8 @@ import org.apache.flink.api.java.DataSet;
/**
* The <tt>SingleInputUdfOperator</tt> is the base class of all unary operators that execute
* user-defined functions (UDFs). The UDFs encapsulated by this operator are naturally UDFs that
- * have one input (such as {@link org.apache.flink.api.java.functions.MapFunction} or
- * {@link org.apache.flink.api.java.functions.ReduceFunction}).
+ * have one input (such as {@link org.apache.flink.api.java.functions.RichMapFunction} or
+ * {@link org.apache.flink.api.java.functions.RichReduceFunction}).
* <p>
* This class encapsulates utilities for the UDFs, such as broadcast variables, parameterization
* through configuration objects, and semantic properties.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index 89c8bb2..97b2417 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -21,16 +21,18 @@ package org.apache.flink.api.java.operators;
import java.util.Arrays;
import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
/**
* SortedGrouping is an intermediate step for a transformation on a grouped and sorted DataSet.<br/>
* The following transformation can be applied on sorted groups:
* <ul>
- * <li>{@link SortedGrouping#reduceGroup(GroupReduceFunction)},</li>
+ * <li>{@link SortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)},</li>
* </ul>
*
* @param <T> The type of the elements of the sorted and grouped DataSet.
@@ -65,23 +67,27 @@ public class SortedGrouping<T> extends Grouping<T> {
/**
* Applies a GroupReduce transformation on a grouped and sorted {@link DataSet}.<br/>
- * The transformation calls a {@link GroupReduceFunction} for each group of the DataSet.
+ * The transformation calls a {@link org.apache.flink.api.java.functions.RichGroupReduceFunction} for each group of the DataSet.
* A GroupReduceFunction can iterate over all elements of a group and emit any
* number of output elements including none.
*
* @param reducer The GroupReduceFunction that is applied on each group of the DataSet.
* @return A GroupReduceOperator that represents the reduced DataSet.
*
- * @see GroupReduceFunction
- * @see ReduceGroupOperator
+ * @see org.apache.flink.api.java.functions.RichGroupReduceFunction
+ * @see GroupReduceOperator
* @see DataSet
*/
- public <R> ReduceGroupOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reducer) {
+ public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reducer) {
if (reducer == null) {
throw new NullPointerException("GroupReduce function must not be null.");
}
- return new ReduceGroupOperator<T, R>(this, reducer);
+ if (FunctionUtils.isSerializedLambdaFunction(reducer)) {
+ throw new UnsupportedLambdaExpressionException();
+ }
+ return new GroupReduceOperator<T, R>(this, reducer);
}
+
// --------------------------------------------------------------------------------------------
// Group Operations
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
index a85ca3f..f347fef 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
@@ -35,8 +35,8 @@ import org.apache.flink.api.java.DataSet;
/**
* The <tt>TwoInputUdfOperator</tt> is the base class of all binary operators that execute
* user-defined functions (UDFs). The UDFs encapsulated by this operator are naturally UDFs that
- * have two inputs (such as {@link org.apache.flink.api.java.functions.JoinFunction} or
- * {@link org.apache.flink.api.java.functions.CoGroupFunction}).
+ * have two inputs (such as {@link org.apache.flink.api.java.functions.RichJoinFunction} or
+ * {@link org.apache.flink.api.java.functions.RichCoGroupFunction}).
* <p>
* This class encapsulates utilities for the UDFs, such as broadcast variables, parameterization
* through configuration objects, and semantic properties.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
index 2040a27..bf33f4e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
@@ -27,8 +27,8 @@ import org.apache.flink.api.java.DataSet;
/**
* This interface marks operators as operators that execute user-defined functions (UDFs), such as
- * {@link org.apache.flink.api.java.functions.MapFunction}, {@link org.apache.flink.api.java.functions.ReduceFunction},
- * or {@link org.apache.flink.api.java.functions.CoGroupFunction}.
+ * {@link org.apache.flink.api.java.functions.RichMapFunction}, {@link org.apache.flink.api.java.functions.RichReduceFunction},
+ * or {@link org.apache.flink.api.java.functions.RichCoGroupFunction}.
* The UDF operators stand in contrast to operators that execute built-in operations, like aggregations.
*/
public interface UdfOperator<O extends UdfOperator<O>> {
@@ -39,7 +39,7 @@ public interface UdfOperator<O extends UdfOperator<O>> {
/**
* Gets the configuration parameters that will be passed to the UDF's open method
- * {@link org.apache.flink.api.common.functions.AbstractFunction#open(Configuration)}.
+ * {@link org.apache.flink.api.common.functions.AbstractRichFunction#open(Configuration)}.
* The configuration is set via the {@link #withParameters(Configuration)}
* method.
*
@@ -69,7 +69,7 @@ public interface UdfOperator<O extends UdfOperator<O>> {
/**
* Sets the configuration parameters for the UDF. These are optional parameters that are passed
- * to the UDF in the {@link org.apache.flink.api.common.functions.AbstractFunction#open(Configuration)} method.
+ * to the UDF in the {@link org.apache.flink.api.common.functions.AbstractRichFunction#open(Configuration)} method.
*
* @param parameters The configuration parameters for the UDF.
* @return The operator itself, to allow chaining function calls.
@@ -83,7 +83,7 @@ public interface UdfOperator<O extends UdfOperator<O>> {
* {@link org.apache.flink.api.common.functions.RuntimeContext#getBroadcastVariable(String)}.
*
* The runtime context itself is available in all UDFs via
- * {@link org.apache.flink.api.common.functions.AbstractFunction#getRuntimeContext()}.
+ * {@link org.apache.flink.api.common.functions.AbstractRichFunction#getRuntimeContext()}.
*
* @param data The data set to be broadcasted.
* @param name The name under which the broadcast data set retrieved.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index 1d9d70d..9e71ba0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -18,12 +18,14 @@
package org.apache.flink.api.java.operators;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
public class UnsortedGrouping<T> extends Grouping<T> {
@@ -90,14 +92,14 @@ public class UnsortedGrouping<T> extends Grouping<T> {
/**
* Applies a Reduce transformation on a grouped {@link DataSet}.<br/>
- * For each group, the transformation consecutively calls a {@link ReduceFunction}
+ * For each group, the transformation consecutively calls a {@link org.apache.flink.api.java.functions.RichReduceFunction}
* until only a single element for each group remains.
* A ReduceFunction combines two elements into one new element of the same type.
*
* @param reducer The ReduceFunction that is applied on each group of the DataSet.
* @return A ReduceOperator that represents the reduced DataSet.
*
- * @see ReduceFunction
+ * @see org.apache.flink.api.java.functions.RichReduceFunction
* @see ReduceOperator
* @see DataSet
*/
@@ -110,24 +112,28 @@ public class UnsortedGrouping<T> extends Grouping<T> {
/**
* Applies a GroupReduce transformation on a grouped {@link DataSet}.<br/>
- * The transformation calls a {@link GroupReduceFunction} for each group of the DataSet.
+ * The transformation calls a {@link org.apache.flink.api.java.functions.RichGroupReduceFunction} for each group of the DataSet.
* A GroupReduceFunction can iterate over all elements of a group and emit any
* number of output elements including none.
*
* @param reducer The GroupReduceFunction that is applied on each group of the DataSet.
* @return A GroupReduceOperator that represents the reduced DataSet.
*
- * @see GroupReduceFunction
- * @see ReduceGroupOperator
+ * @see org.apache.flink.api.java.functions.RichGroupReduceFunction
+ * @see GroupReduceOperator
* @see DataSet
*/
- public <R> ReduceGroupOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reducer) {
+ public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reducer) {
if (reducer == null) {
throw new NullPointerException("GroupReduce function must not be null.");
}
- return new ReduceGroupOperator<T, R>(this, reducer);
+ if (FunctionUtils.isSerializedLambdaFunction(reducer)) {
+ throw new UnsupportedLambdaExpressionException();
+ }
+ return new GroupReduceOperator<T, R>(this, reducer);
}
+
// --------------------------------------------------------------------------------------------
// Group Operations
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
index aea99e3..c7f65f0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
@@ -19,11 +19,11 @@
package org.apache.flink.api.java.operators.translation;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
-public final class KeyExtractingMapper<T, K> extends MapFunction<T, Tuple2<K, T>> {
+public final class KeyExtractingMapper<T, K> extends RichMapFunction<T, Tuple2<K, T>> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
index 52cbcd3..a6cd837 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
@@ -18,11 +18,11 @@
package org.apache.flink.api.java.operators.translation;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
-public final class KeyRemovingMapper<T, K> extends MapFunction<Tuple2<K, T>, T> {
+public final class KeyRemovingMapper<T, K> extends RichMapFunction<Tuple2<K, T>, T> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
index 7fb9c0f..8ac2d01 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
@@ -18,22 +18,22 @@
package org.apache.flink.api.java.operators.translation;
-import org.apache.flink.api.common.functions.GenericFlatMap;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.FilterOperatorBase;
-import org.apache.flink.api.java.functions.FilterFunction;
import org.apache.flink.types.TypeInformation;
import org.apache.flink.util.Collector;
-public class PlanFilterOperator<T> extends FilterOperatorBase<T, GenericFlatMap<T, T>> {
+public class PlanFilterOperator<T> extends FilterOperatorBase<T, FlatMapFunction<T, T>> {
public PlanFilterOperator(FilterFunction<T> udf, String name, TypeInformation<T> type) {
super(new FlatMapFilter<T>(udf), new UnaryOperatorInformation<T, T>(type, type), name);
}
public static final class FlatMapFilter<T> extends WrappingFunction<FilterFunction<T>>
- implements GenericFlatMap<T, T>
+ implements FlatMapFunction<T, T>
{
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
index 521814c..4de7311 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
@@ -18,22 +18,22 @@
package org.apache.flink.api.java.operators.translation;
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.types.TypeInformation;
-public class PlanProjectOperator<T, R extends Tuple> extends MapOperatorBase<T, R, GenericMap<T, R>> {
+public class PlanProjectOperator<T, R extends Tuple> extends MapOperatorBase<T, R, MapFunction<T, R>> {
public PlanProjectOperator(int[] fields, String name, TypeInformation<T> inType, TypeInformation<R> outType) {
super(new MapProjector<T, R>(fields, outType.createSerializer().createInstance()), new UnaryOperatorInformation<T, R>(inType, outType), name);
}
public static final class MapProjector<T, R extends Tuple>
- extends AbstractFunction
- implements GenericMap<T, R>
+ extends AbstractRichFunction
+ implements MapFunction<T, R>
{
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java
index 20bd3b0..89290f0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java
@@ -20,20 +20,19 @@ package org.apache.flink.api.java.operators.translation;
import java.util.Iterator;
-import org.apache.flink.api.common.functions.GenericCoGrouper;
+import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
-import org.apache.flink.api.java.functions.CoGroupFunction;
import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.types.TypeInformation;
import org.apache.flink.util.Collector;
public class PlanUnwrappingCoGroupOperator<I1, I2, OUT, K>
- extends CoGroupOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, GenericCoGrouper<Tuple2<K, I1>, Tuple2<K, I2>, OUT>>
+ extends CoGroupOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>>
{
- public PlanUnwrappingCoGroupOperator(CoGroupFunction<I1, I2, OUT> udf,
+ public PlanUnwrappingCoGroupOperator(CoGroupFunction<I1, I2, OUT> udf,
Keys.SelectorFunctionKeys<I1, K> key1, Keys.SelectorFunctionKeys<I2, K> key2, String name,
TypeInformation<OUT> type, TypeInformation<Tuple2<K, I1>> typeInfoWithKey1, TypeInformation<Tuple2<K, I2>> typeInfoWithKey2)
{
@@ -42,7 +41,7 @@ public class PlanUnwrappingCoGroupOperator<I1, I2, OUT, K>
key1.computeLogicalKeyPositions(), key2.computeLogicalKeyPositions(), name);
}
- public PlanUnwrappingCoGroupOperator(CoGroupFunction<I1, I2, OUT> udf,
+ public PlanUnwrappingCoGroupOperator(CoGroupFunction<I1, I2, OUT> udf,
int[] key1, Keys.SelectorFunctionKeys<I2, K> key2, String name,
TypeInformation<OUT> type, TypeInformation<Tuple2<K, I1>> typeInfoWithKey1, TypeInformation<Tuple2<K, I2>> typeInfoWithKey2)
{
@@ -51,7 +50,7 @@ public class PlanUnwrappingCoGroupOperator<I1, I2, OUT, K>
new int[]{0}, key2.computeLogicalKeyPositions(), name);
}
- public PlanUnwrappingCoGroupOperator(CoGroupFunction<I1, I2, OUT> udf,
+ public PlanUnwrappingCoGroupOperator(CoGroupFunction<I1, I2, OUT> udf,
Keys.SelectorFunctionKeys<I1, K> key1, int[] key2, String name,
TypeInformation<OUT> type, TypeInformation<Tuple2<K, I1>> typeInfoWithKey1, TypeInformation<Tuple2<K, I2>> typeInfoWithKey2)
{
@@ -63,7 +62,7 @@ public class PlanUnwrappingCoGroupOperator<I1, I2, OUT, K>
// --------------------------------------------------------------------------------------------
public static final class TupleUnwrappingCoGrouper<I1, I2, OUT, K> extends WrappingFunction<CoGroupFunction<I1, I2, OUT>>
- implements GenericCoGrouper<Tuple2<K, I1>, Tuple2<K, I2>, OUT>
+ implements CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>
{
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java
index c121efe..73ea004 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java
@@ -18,20 +18,19 @@
package org.apache.flink.api.java.operators.translation;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.java.functions.JoinFunction;
import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.types.TypeInformation;
import org.apache.flink.util.Collector;
public class PlanUnwrappingJoinOperator<I1, I2, OUT, K>
- extends JoinOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, GenericJoiner<Tuple2<K, I1>, Tuple2<K, I2>, OUT>>
+ extends JoinOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, FlatJoinFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>>
{
- public PlanUnwrappingJoinOperator(JoinFunction<I1, I2, OUT> udf,
+ public PlanUnwrappingJoinOperator(FlatJoinFunction<I1, I2, OUT> udf,
Keys.SelectorFunctionKeys<I1, K> key1, Keys.SelectorFunctionKeys<I2, K> key2, String name,
TypeInformation<OUT> type, TypeInformation<Tuple2<K, I1>> typeInfoWithKey1, TypeInformation<Tuple2<K, I2>> typeInfoWithKey2)
{
@@ -40,7 +39,7 @@ public class PlanUnwrappingJoinOperator<I1, I2, OUT, K>
key1.computeLogicalKeyPositions(), key2.computeLogicalKeyPositions(), name);
}
- public PlanUnwrappingJoinOperator(JoinFunction<I1, I2, OUT> udf,
+ public PlanUnwrappingJoinOperator(FlatJoinFunction<I1, I2, OUT> udf,
int[] key1, Keys.SelectorFunctionKeys<I2, K> key2, String name,
TypeInformation<OUT> type, TypeInformation<Tuple2<K, I1>> typeInfoWithKey1, TypeInformation<Tuple2<K, I2>> typeInfoWithKey2)
{
@@ -49,7 +48,7 @@ public class PlanUnwrappingJoinOperator<I1, I2, OUT, K>
new int[]{0}, key2.computeLogicalKeyPositions(), name);
}
- public PlanUnwrappingJoinOperator(JoinFunction<I1, I2, OUT> udf,
+ public PlanUnwrappingJoinOperator(FlatJoinFunction<I1, I2, OUT> udf,
Keys.SelectorFunctionKeys<I1, K> key1, int[] key2, String name,
TypeInformation<OUT> type, TypeInformation<Tuple2<K, I1>> typeInfoWithKey1, TypeInformation<Tuple2<K, I2>> typeInfoWithKey2)
{
@@ -59,21 +58,26 @@ public class PlanUnwrappingJoinOperator<I1, I2, OUT, K>
}
public static final class TupleUnwrappingJoiner<I1, I2, OUT, K>
- extends WrappingFunction<JoinFunction<I1, I2, OUT>>
- implements GenericJoiner<Tuple2<K, I1>, Tuple2<K, I2>, OUT>
+ extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>
+ implements FlatJoinFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>
{
private static final long serialVersionUID = 1L;
- private TupleUnwrappingJoiner(JoinFunction<I1, I2, OUT> wrapped) {
+ private TupleUnwrappingJoiner(FlatJoinFunction<I1, I2, OUT> wrapped) {
super(wrapped);
}
+ //@SuppressWarnings("unchecked")
+ //@Override
+ //public OUT join(Tuple2<K, I1> value1, Tuple2<K, I2> value2) throws Exception {
+ // return wrappedFunction.join((I1)(value1.getField(1)), (I2)(value2.getField(1)));
+ //}
+
@SuppressWarnings("unchecked")
@Override
- public void join(Tuple2<K, I1> value1, Tuple2<K, I2> value2,
- Collector<OUT> out) throws Exception {
- out.collect(wrappedFunction.join((I1)(value1.getField(1)), (I2)(value2.getField(1))));
+ public void join (Tuple2<K, I1> value1, Tuple2<K, I2> value2, Collector<OUT> collector) throws Exception {
+ wrappedFunction.join ((I1)(value1.getField(1)), (I2)(value2.getField(1)), collector);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
index 5a59664..5e80455 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
@@ -20,12 +20,11 @@ package org.apache.flink.api.java.operators.translation;
import java.util.Iterator;
-import org.apache.flink.api.common.functions.GenericCombine;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction.Combinable;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.types.TypeInformation;
@@ -35,12 +34,12 @@ import org.apache.flink.util.Collector;
* A reduce operator that takes 2-tuples (key-value pairs), and applies the group reduce operation only
* on the unwrapped values.
*/
-public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOperatorBase<Tuple2<K, IN>, OUT, GenericGroupReduce<Tuple2<K, IN>,OUT>> {
+public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOperatorBase<Tuple2<K, IN>, OUT, GroupReduceFunction<Tuple2<K, IN>,OUT>> {
public PlanUnwrappingReduceGroupOperator(GroupReduceFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K> key, String name,
TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> typeInfoWithKey, boolean combinable)
{
- super(combinable ? new TupleUnwrappingCombinableGroupReducer<IN, OUT, K>(udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf),
+ super(combinable ? new TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K>((RichGroupReduceFunction) udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf),
new UnaryOperatorInformation<Tuple2<K, IN>, OUT>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name);
super.setCombinable(combinable);
@@ -48,9 +47,9 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
// --------------------------------------------------------------------------------------------
- @Combinable
- public static final class TupleUnwrappingCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
- implements GenericGroupReduce<Tuple2<K, IN>, OUT>, GenericCombine<Tuple2<K, IN>>
+ @RichGroupReduceFunction.Combinable
+ public static final class TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<RichGroupReduceFunction<IN, OUT>>
+ implements GroupReduceFunction<Tuple2<K, IN>, OUT>, FlatCombineFunction<Tuple2<K, IN>>
{
private static final long serialVersionUID = 1L;
@@ -58,7 +57,7 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
private TupleUnwrappingIterator<IN, K> iter;
private TupleWrappingCollector<IN, K> coll;
- private TupleUnwrappingCombinableGroupReducer(GroupReduceFunction<IN, OUT> wrapped) {
+ private TupleUnwrappingFlatCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> wrapped) {
super(wrapped);
this.iter = new TupleUnwrappingIterator<IN, K>();
this.coll = new TupleWrappingCollector<IN, K>(this.iter);
@@ -85,7 +84,7 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
}
public static final class TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
- implements GenericGroupReduce<Tuple2<K, IN>, OUT>
+ implements GroupReduceFunction<Tuple2<K, IN>, OUT>
{
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
index 66aa430..4da981c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
@@ -18,10 +18,9 @@
package org.apache.flink.api.java.operators.translation;
-import org.apache.flink.api.common.functions.GenericReduce;
+import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
-import org.apache.flink.api.java.functions.ReduceFunction;
import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.types.TypeInformation;
@@ -31,7 +30,7 @@ import org.apache.flink.types.TypeInformation;
* A reduce operator that takes 2-tuples (key-value pairs), and applies the reduce operation only
* on the unwrapped values.
*/
-public class PlanUnwrappingReduceOperator<T, K> extends ReduceOperatorBase<Tuple2<K, T>, GenericReduce<Tuple2<K, T>>> {
+public class PlanUnwrappingReduceOperator<T, K> extends ReduceOperatorBase<Tuple2<K, T>, ReduceFunction<Tuple2<K, T>>> {
public PlanUnwrappingReduceOperator(ReduceFunction<T> udf, Keys.SelectorFunctionKeys<T, K> key, String name,
TypeInformation<T> type, TypeInformation<Tuple2<K, T>> typeInfoWithKey)
@@ -40,7 +39,7 @@ public class PlanUnwrappingReduceOperator<T, K> extends ReduceOperatorBase<Tuple
}
public static final class ReduceWrapper<T, K> extends WrappingFunction<ReduceFunction<T>>
- implements GenericReduce<Tuple2<K, T>>
+ implements ReduceFunction<Tuple2<K, T>>
{
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java
index a915d1c..ecac775 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java
@@ -18,12 +18,12 @@
package org.apache.flink.api.java.operators.translation;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
-public final class TupleKeyExtractingMapper<T, K> extends MapFunction<T, Tuple2<K, T>> {
+public final class TupleKeyExtractingMapper<T, K> extends RichMapFunction<T, Tuple2<K, T>> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
index c98df6b..267d879 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
@@ -29,20 +29,21 @@ import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.cache.DistributedCache;
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Value;
-public abstract class WrappingFunction<T extends AbstractFunction> extends AbstractFunction {
+public abstract class WrappingFunction<T extends Function> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
protected final T wrappedFunction;
-
-
+
protected WrappingFunction(T wrappedFunction) {
this.wrappedFunction = wrappedFunction;
}
@@ -50,12 +51,12 @@ public abstract class WrappingFunction<T extends AbstractFunction> extends Abstr
@Override
public void open(Configuration parameters) throws Exception {
- this.wrappedFunction.open(parameters);
+ FunctionUtils.openFunction(this.wrappedFunction, parameters);
}
@Override
public void close() throws Exception {
- this.wrappedFunction.close();
+ FunctionUtils.closeFunction(this.wrappedFunction);
}
@Override
@@ -63,13 +64,16 @@ public abstract class WrappingFunction<T extends AbstractFunction> extends Abstr
super.setRuntimeContext(t);
if (t instanceof IterationRuntimeContext) {
- this.wrappedFunction.setRuntimeContext(new WrappingIterationRuntimeContext(t));
+ FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, new WrappingIterationRuntimeContext(t));
}
else{
- this.wrappedFunction.setRuntimeContext(new WrappingRuntimeContext(t));
+ FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, new WrappingRuntimeContext(t));
}
}
-
+
+ public T getWrappedFunction () {
+ return this.wrappedFunction;
+ }
private static class WrappingRuntimeContext implements RuntimeContext {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
index 633adab..fa0ca11 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
@@ -21,15 +21,14 @@ package org.apache.flink.api.java.record.functions;
import java.util.Iterator;
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericCoGrouper;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
/**
* The CoGroupFunction is the base class for functions that are invoked by a {@link org.apache.flink.api.java.operators.CoGroupOperator}.
*/
-public abstract class CoGroupFunction extends AbstractFunction implements GenericCoGrouper<Record, Record, Record> {
+public abstract class CoGroupFunction extends AbstractRichFunction implements org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
index b2185a2..c4587fd 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
@@ -19,15 +19,13 @@
package org.apache.flink.api.java.record.functions;
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericCrosser;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
/**
* The CrossFunction is the base class for functions that are invoked by a {@link org.apache.flink.api.java.operators.CrossOperator}.
*/
-public abstract class CrossFunction extends AbstractFunction implements GenericCrosser<Record, Record, Record> {
+public abstract class CrossFunction extends AbstractRichFunction implements org.apache.flink.api.common.functions.CrossFunction<Record, Record, Record> {
private static final long serialVersionUID = 1L;
@@ -35,14 +33,19 @@ public abstract class CrossFunction extends AbstractFunction implements GenericC
* This method must be implemented to provide a user implementation of a cross.
* It is called for each element of the Cartesian product of both input sets.
- * @param record1 The record from the second input.
- * @param record2 The record from the second input.
- * @param out A collector that collects all output records.
+ * @param first The record from the second input.
+ * @param second The record from the second input.
+ * @return The result of the cross UDF
*
* @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
* runtime catches an exception, it aborts the task and lets the fail-over logic
* decide whether to retry the task execution.
*/
+
+ //@Override
+ //public abstract void cross(Record record1, Record record2, Collector<Record> out) throws Exception;
+
@Override
- public abstract void cross(Record record1, Record record2, Collector<Record> out) throws Exception;
+ public abstract Record cross(Record first, Record second) throws Exception;
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
index 0222c63..dce24a3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
@@ -19,8 +19,8 @@
package org.apache.flink.api.java.record.functions;
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
@@ -28,7 +28,7 @@ import org.apache.flink.util.Collector;
* The JoinFunction must implementation by functions of a {@link org.apache.flink.api.java.operators.JoinOperator}.
* It resembles an equality join of both inputs on their key fields.
*/
-public abstract class JoinFunction extends AbstractFunction implements GenericJoiner<Record, Record, Record> {
+public abstract class JoinFunction extends AbstractRichFunction implements FlatJoinFunction<Record, Record, Record> {
private static final long serialVersionUID = 1L;
@@ -38,7 +38,7 @@ public abstract class JoinFunction extends AbstractFunction implements GenericJo
*
* @param value1 The record that comes from the first input.
* @param value2 The record that comes from the second input.
- * @param out A collector that collects all output pairs.
+ * @return The result of the join UDF as record
*
* @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
* runtime catches an exception, it aborts the combine task and lets the fail-over logic
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
index 88b6282..99c945d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
@@ -19,7 +19,7 @@
package org.apache.flink.api.java.record.functions;
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.GenericCollectorMap;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
@@ -28,7 +28,7 @@ import org.apache.flink.util.Collector;
* The MapFunction must be extended to provide a mapper implementation
* By definition, the mapper is called for each individual input record.
*/
-public abstract class MapFunction extends AbstractFunction implements GenericCollectorMap<Record, Record> {
+public abstract class MapFunction extends AbstractRichFunction implements GenericCollectorMap<Record, Record> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
index 4b1dbb3..073b11a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
@@ -21,9 +21,9 @@ package org.apache.flink.api.java.record.functions;
import java.util.Iterator;
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericCombine;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
@@ -31,7 +31,7 @@ import org.apache.flink.util.Collector;
* The ReduceFunction must be extended to provide a reducer implementation, as invoked by a
* {@link org.apache.flink.api.java.operators.ReduceOperator}.
*/
-public abstract class ReduceFunction extends AbstractFunction implements GenericGroupReduce<Record, Record>, GenericCombine<Record> {
+public abstract class ReduceFunction extends AbstractRichFunction implements GroupReduceFunction<Record, Record>, FlatCombineFunction<Record> {
private static final long serialVersionUID = 1L;