You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/08/09 14:39:49 UTC
[18/39] [FLINK-701] Refactor Java API to use SAM interfaces.
Introduce RichFunction stubs for all UDFs.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index a41874c..e8ee0bb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -31,12 +31,13 @@ import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.Validate;
-import org.apache.flink.api.common.functions.GenericCoGrouper;
-import org.apache.flink.api.common.functions.GenericCrosser;
-import org.apache.flink.api.common.functions.GenericFlatMap;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
-import org.apache.flink.api.common.functions.GenericJoiner;
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.java.functions.InvalidTypesException;
import org.apache.flink.api.java.functions.KeySelector;
@@ -60,64 +61,75 @@ public class TypeExtractor {
// --------------------------------------------------------------------------------------------
@SuppressWarnings("unchecked")
- public static <IN, OUT> TypeInformation<OUT> getMapReturnTypes(GenericMap<IN, OUT> mapInterface, TypeInformation<IN> inType) {
- validateInputType(GenericMap.class, mapInterface.getClass(), 0, inType);
+ public static <IN, OUT> TypeInformation<OUT> getMapReturnTypes(MapFunction<IN, OUT> mapInterface, TypeInformation<IN> inType) {
+ validateInputType(MapFunction.class, mapInterface.getClass(), 0, inType);
if(mapInterface instanceof ResultTypeQueryable) {
return ((ResultTypeQueryable<OUT>) mapInterface).getProducedType();
}
- return new TypeExtractor().privateCreateTypeInfo(GenericMap.class, mapInterface.getClass(), 1, inType, null);
+ return new TypeExtractor().privateCreateTypeInfo(MapFunction.class, mapInterface.getClass(), 1, inType, null);
}
@SuppressWarnings("unchecked")
- public static <IN, OUT> TypeInformation<OUT> getFlatMapReturnTypes(GenericFlatMap<IN, OUT> flatMapInterface, TypeInformation<IN> inType) {
- validateInputType(GenericFlatMap.class, flatMapInterface.getClass(), 0, inType);
+ public static <IN, OUT> TypeInformation<OUT> getFlatMapReturnTypes(FlatMapFunction<IN, OUT> flatMapInterface, TypeInformation<IN> inType) {
+ validateInputType(FlatMapFunction.class, flatMapInterface.getClass(), 0, inType);
if(flatMapInterface instanceof ResultTypeQueryable) {
return ((ResultTypeQueryable<OUT>) flatMapInterface).getProducedType();
}
- return new TypeExtractor().privateCreateTypeInfo(GenericFlatMap.class, flatMapInterface.getClass(), 1, inType, null);
+ return new TypeExtractor().privateCreateTypeInfo(FlatMapFunction.class, flatMapInterface.getClass(), 1, inType, null);
}
@SuppressWarnings("unchecked")
- public static <IN, OUT> TypeInformation<OUT> getGroupReduceReturnTypes(GenericGroupReduce<IN, OUT> groupReduceInterface,
+ public static <IN, OUT> TypeInformation<OUT> getGroupReduceReturnTypes(GroupReduceFunction<IN, OUT> groupReduceInterface,
TypeInformation<IN> inType) {
- validateInputType(GenericGroupReduce.class, groupReduceInterface.getClass(), 0, inType);
+ validateInputType(GroupReduceFunction.class, groupReduceInterface.getClass(), 0, inType);
if(groupReduceInterface instanceof ResultTypeQueryable) {
return ((ResultTypeQueryable<OUT>) groupReduceInterface).getProducedType();
}
- return new TypeExtractor().privateCreateTypeInfo(GenericGroupReduce.class, groupReduceInterface.getClass(), 1, inType, null);
+ return new TypeExtractor().privateCreateTypeInfo(GroupReduceFunction.class, groupReduceInterface.getClass(), 1, inType, null);
}
@SuppressWarnings("unchecked")
- public static <IN1, IN2, OUT> TypeInformation<OUT> getJoinReturnTypes(GenericJoiner<IN1, IN2, OUT> joinInterface,
+ public static <IN1, IN2, OUT> TypeInformation<OUT> getJoinReturnTypes(FlatJoinFunction<IN1, IN2, OUT> joinInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
- validateInputType(GenericJoiner.class, joinInterface.getClass(), 0, in1Type);
- validateInputType(GenericJoiner.class, joinInterface.getClass(), 1, in2Type);
+ validateInputType(FlatJoinFunction.class, joinInterface.getClass(), 0, in1Type);
+ validateInputType(FlatJoinFunction.class, joinInterface.getClass(), 1, in2Type);
if(joinInterface instanceof ResultTypeQueryable) {
return ((ResultTypeQueryable<OUT>) joinInterface).getProducedType();
}
- return new TypeExtractor().privateCreateTypeInfo(GenericJoiner.class, joinInterface.getClass(), 2, in1Type, in2Type);
+ return new TypeExtractor().privateCreateTypeInfo(FlatJoinFunction.class, joinInterface.getClass(), 2, in1Type, in2Type);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <IN1, IN2, OUT> TypeInformation<OUT> getJoinReturnTypes(JoinFunction<IN1, IN2, OUT> joinInterface,
+ TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
+ validateInputType(JoinFunction.class, joinInterface.getClass(), 0, in1Type);
+ validateInputType(JoinFunction.class, joinInterface.getClass(), 1, in2Type);
+ if(joinInterface instanceof ResultTypeQueryable) {
+ return ((ResultTypeQueryable<OUT>) joinInterface).getProducedType();
+ }
+ return new TypeExtractor().privateCreateTypeInfo(JoinFunction.class, joinInterface.getClass(), 2, in1Type, in2Type);
}
@SuppressWarnings("unchecked")
- public static <IN1, IN2, OUT> TypeInformation<OUT> getCoGroupReturnTypes(GenericCoGrouper<IN1, IN2, OUT> coGroupInterface,
+ public static <IN1, IN2, OUT> TypeInformation<OUT> getCoGroupReturnTypes(CoGroupFunction<IN1, IN2, OUT> coGroupInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
- validateInputType(GenericCoGrouper.class, coGroupInterface.getClass(), 0, in1Type);
- validateInputType(GenericCoGrouper.class, coGroupInterface.getClass(), 1, in2Type);
+ validateInputType(CoGroupFunction.class, coGroupInterface.getClass(), 0, in1Type);
+ validateInputType(CoGroupFunction.class, coGroupInterface.getClass(), 1, in2Type);
if(coGroupInterface instanceof ResultTypeQueryable) {
return ((ResultTypeQueryable<OUT>) coGroupInterface).getProducedType();
}
- return new TypeExtractor().privateCreateTypeInfo(GenericCoGrouper.class, coGroupInterface.getClass(), 2, in1Type, in2Type);
+ return new TypeExtractor().privateCreateTypeInfo(CoGroupFunction.class, coGroupInterface.getClass(), 2, in1Type, in2Type);
}
@SuppressWarnings("unchecked")
- public static <IN1, IN2, OUT> TypeInformation<OUT> getCrossReturnTypes(GenericCrosser<IN1, IN2, OUT> crossInterface,
+ public static <IN1, IN2, OUT> TypeInformation<OUT> getCrossReturnTypes(CrossFunction<IN1, IN2, OUT> crossInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
- validateInputType(GenericCrosser.class, crossInterface.getClass(), 0, in1Type);
- validateInputType(GenericCrosser.class, crossInterface.getClass(), 1, in2Type);
+ validateInputType(CrossFunction.class, crossInterface.getClass(), 0, in1Type);
+ validateInputType(CrossFunction.class, crossInterface.getClass(), 1, in2Type);
if(crossInterface instanceof ResultTypeQueryable) {
return ((ResultTypeQueryable<OUT>) crossInterface).getProducedType();
}
- return new TypeExtractor().privateCreateTypeInfo(GenericCrosser.class, crossInterface.getClass(), 2, in1Type, in2Type);
+ return new TypeExtractor().privateCreateTypeInfo(CrossFunction.class, crossInterface.getClass(), 2, in1Type, in2Type);
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
index 8e0abcb..c786345 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
@@ -235,7 +235,7 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
try {
for (; i < keyPositions.length; i++) {
int keyPos = keyPositions[i];
- int cmp = comparators[i].compare(first.getField(keyPos), second.getField(keyPos));
+ int cmp = comparators[i].compare((T)first.getField(keyPos), (T)second.getField(keyPos));
if (cmp != 0) {
return cmp;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
index 1159512..474b022 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
@@ -28,8 +28,6 @@ import org.apache.flink.api.common.operators.base.GenericDataSinkBase;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.api.java.functions.JoinFunction;
-import org.apache.flink.api.java.functions.MapFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields;
import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst;
import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsSecond;
@@ -284,7 +282,7 @@ public class SemanticPropertiesTranslationTest {
@ConstantFields("*")
- public static class WildcardConstantMapper<T> extends MapFunction<T, T> {
+ public static class WildcardConstantMapper<T> extends RichMapFunction<T, T> {
@Override
public T map(T value) {
@@ -293,7 +291,7 @@ public class SemanticPropertiesTranslationTest {
}
@ConstantFields("0->0;1->1;2->2")
- public static class IndividualConstantMapper<X, Y, Z> extends MapFunction<Tuple3<X, Y, Z>, Tuple3<X, Y, Z>> {
+ public static class IndividualConstantMapper<X, Y, Z> extends RichMapFunction<Tuple3<X, Y, Z>, Tuple3<X, Y, Z>> {
@Override
public Tuple3<X, Y, Z> map(Tuple3<X, Y, Z> value) {
@@ -302,7 +300,7 @@ public class SemanticPropertiesTranslationTest {
}
@ConstantFields("0")
- public static class ZeroConstantMapper<T> extends MapFunction<T, T> {
+ public static class ZeroConstantMapper<T> extends RichMapFunction<T, T> {
@Override
public T map(T value) {
@@ -312,7 +310,7 @@ public class SemanticPropertiesTranslationTest {
@ConstantFieldsFirst("1 -> 0")
@ConstantFieldsSecond("1 -> 1")
- public static class ForwardingTupleJoin<A, B, C, D> extends JoinFunction<Tuple2<A, B>, Tuple2<C, D>, Tuple2<B, D>> {
+ public static class ForwardingTupleJoin<A, B, C, D> extends RichJoinFunction<Tuple2<A, B>, Tuple2<C, D>, Tuple2<B, D>> {
@Override
public Tuple2<B, D> join(Tuple2<A, B> first, Tuple2<C, D> second) {
@@ -322,7 +320,7 @@ public class SemanticPropertiesTranslationTest {
@ConstantFieldsFirst("0 -> 0")
@ConstantFieldsSecond("0 -> 1")
- public static class ForwardingBasicJoin<A, B> extends JoinFunction<A, B, Tuple2<A, B>> {
+ public static class ForwardingBasicJoin<A, B> extends RichJoinFunction<A, B, Tuple2<A, B>> {
@Override
public Tuple2<A, B> join(A first, B second) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
index db795d9..155bbd1 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
@@ -35,9 +35,9 @@ import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.DeltaIteration;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.CoGroupFunction;
-import org.apache.flink.api.java.functions.JoinFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichCoGroupFunction;
+import org.apache.flink.api.java.functions.RichJoinFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
@@ -128,8 +128,14 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
assertEquals(IdentityMapper.class, worksetMapper.getUserCodeWrapper().getUserCodeClass());
assertEquals(NextWorksetMapper.class, nextWorksetMapper.getUserCodeWrapper().getUserCodeClass());
- assertEquals(SolutionWorksetJoin.class, solutionSetJoin.getUserCodeWrapper().getUserCodeClass());
-
+ if (solutionSetJoin.getUserCodeWrapper().getUserCodeObject() instanceof WrappingFunction) {
+ WrappingFunction wf = (WrappingFunction) solutionSetJoin.getUserCodeWrapper().getUserCodeObject();
+ assertEquals(SolutionWorksetJoin.class, wf.getWrappedFunction().getClass());
+ }
+ else {
+ assertEquals(SolutionWorksetJoin.class, solutionSetJoin.getUserCodeWrapper().getUserCodeClass());
+ }
+
assertEquals(BEFORE_NEXT_WORKSET_MAP, nextWorksetMapper.getName());
assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
@@ -215,21 +221,21 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
// --------------------------------------------------------------------------------------------
- public static class SolutionWorksetJoin extends JoinFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
+ public static class SolutionWorksetJoin extends RichJoinFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
@Override
public Tuple3<Double, Long, String> join(Tuple2<Double, String> first, Tuple3<Double, Long, String> second){
return null;
}
}
- public static class NextWorksetMapper extends MapFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>> {
+ public static class NextWorksetMapper extends RichMapFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>> {
@Override
public Tuple2<Double, String> map(Tuple3<Double, Long, String> value) {
return null;
}
}
- public static class IdentityMapper<T> extends MapFunction<T, T> {
+ public static class IdentityMapper<T> extends RichMapFunction<T, T> {
@Override
public T map(T value) throws Exception {
@@ -237,7 +243,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
}
}
- public static class SolutionWorksetCoGroup1 extends CoGroupFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
+ public static class SolutionWorksetCoGroup1 extends RichCoGroupFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
@Override
public void coGroup(Iterator<Tuple2<Double, String>> first, Iterator<Tuple3<Double, Long, String>> second,
@@ -245,7 +251,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
}
}
- public static class SolutionWorksetCoGroup2 extends CoGroupFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>, Tuple3<Double, Long, String>> {
+ public static class SolutionWorksetCoGroup2 extends RichCoGroupFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>, Tuple3<Double, Long, String>> {
@Override
public void coGroup(Iterator<Tuple3<Double, Long, String>> second, Iterator<Tuple2<Double, String>> first,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
index 9f6a6d8..8e457ce 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
@@ -27,9 +27,7 @@ import org.apache.flink.api.common.operators.base.GenericDataSourceBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.ReduceFunction;
-import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
-import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceOperator;
+import org.apache.flink.api.java.functions.RichReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -53,7 +51,7 @@ public class ReduceTranslationTests implements java.io.Serializable {
DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
- initialData.reduce(new ReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
+ initialData.reduce(new RichReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
public Tuple3<Double, StringValue, LongValue> reduce(Tuple3<Double, StringValue, LongValue> value1, Tuple3<Double, StringValue, LongValue> value2) {
return value1;
}
@@ -94,7 +92,7 @@ public class ReduceTranslationTests implements java.io.Serializable {
initialData
.groupBy(2)
- .reduce(new ReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
+ .reduce(new RichReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
public Tuple3<Double, StringValue, LongValue> reduce(Tuple3<Double, StringValue, LongValue> value1, Tuple3<Double, StringValue, LongValue> value2) {
return value1;
}
@@ -141,7 +139,7 @@ public class ReduceTranslationTests implements java.io.Serializable {
return value.f1;
}
})
- .reduce(new ReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
+ .reduce(new RichReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
public Tuple3<Double, StringValue, LongValue> reduce(Tuple3<Double, StringValue, LongValue> value1, Tuple3<Double, StringValue, LongValue> value2) {
return value1;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index b284052..c6ad73d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -23,16 +23,16 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.functions.CoGroupFunction;
-import org.apache.flink.api.java.functions.CrossFunction;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
+import org.apache.flink.api.java.functions.RichCoGroupFunction;
+import org.apache.flink.api.java.functions.RichCrossFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.functions.InvalidTypesException;
-import org.apache.flink.api.java.functions.JoinFunction;
+import org.apache.flink.api.java.functions.RichFlatJoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -68,7 +68,7 @@ public class TypeExtractorTest {
@Test
public void testBasicType() {
// use getGroupReduceReturnTypes()
- GroupReduceFunction<?, ?> function = new GroupReduceFunction<Boolean, Boolean>() {
+ RichGroupReduceFunction<?, ?> function = new RichGroupReduceFunction<Boolean, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
@@ -107,7 +107,7 @@ public class TypeExtractorTest {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testWritableType() {
- MapFunction<?, ?> function = new MapFunction<MyWritable, MyWritable>() {
+ RichMapFunction<?, ?> function = new RichMapFunction<MyWritable, MyWritable>() {
private static final long serialVersionUID = 1L;
@Override
@@ -127,7 +127,7 @@ public class TypeExtractorTest {
@Test
public void testTupleWithBasicTypes() throws Exception {
// use getMapReturnTypes()
- MapFunction<?, ?> function = new MapFunction<Tuple9<Integer, Long, Double, Float, Boolean, String, Character, Short, Byte>, Tuple9<Integer, Long, Double, Float, Boolean, String, Character, Short, Byte>>() {
+ RichMapFunction<?, ?> function = new RichMapFunction<Tuple9<Integer, Long, Double, Float, Boolean, String, Character, Short, Byte>, Tuple9<Integer, Long, Double, Float, Boolean, String, Character, Short, Byte>>() {
private static final long serialVersionUID = 1L;
@Override
@@ -192,7 +192,7 @@ public class TypeExtractorTest {
@Test
public void testTupleWithTuples() {
// use getFlatMapReturnTypes()
- FlatMapFunction<?, ?> function = new FlatMapFunction<Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>, Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>>() {
+ RichFlatMapFunction<?, ?> function = new RichFlatMapFunction<Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>, Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>>() {
private static final long serialVersionUID = 1L;
@Override
@@ -247,12 +247,12 @@ public class TypeExtractorTest {
@Test
public void testSubclassOfTuple() {
// use getJoinReturnTypes()
- JoinFunction<?, ?, ?> function = new JoinFunction<CustomTuple, String, CustomTuple>() {
+ RichFlatJoinFunction<?, ?, ?> function = new RichFlatJoinFunction<CustomTuple, String, CustomTuple>() {
private static final long serialVersionUID = 1L;
@Override
- public CustomTuple join(CustomTuple first, String second) throws Exception {
- return null;
+ public void join(CustomTuple first, String second, Collector<CustomTuple> out) throws Exception {
+ out.collect(null);
}
};
@@ -295,7 +295,7 @@ public class TypeExtractorTest {
@Test
public void testCustomType() {
// use getCrossReturnTypes()
- CrossFunction<?, ?, ?> function = new CrossFunction<CustomType, Integer, CustomType>() {
+ RichCrossFunction<?, ?, ?> function = new RichCrossFunction<CustomType, Integer, CustomType>() {
private static final long serialVersionUID = 1L;
@Override
@@ -342,7 +342,7 @@ public class TypeExtractorTest {
@Test
public void testTupleWithCustomType() {
// use getMapReturnTypes()
- MapFunction<?, ?> function = new MapFunction<Tuple2<Long, CustomType>, Tuple2<Long, CustomType>>() {
+ RichMapFunction<?, ?> function = new RichMapFunction<Tuple2<Long, CustomType>, Tuple2<Long, CustomType>>() {
private static final long serialVersionUID = 1L;
@Override
@@ -412,7 +412,7 @@ public class TypeExtractorTest {
@Test
public void testTupleOfValues() {
// use getMapReturnTypes()
- MapFunction<?, ?> function = new MapFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>() {
+ RichMapFunction<?, ?> function = new RichMapFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>() {
private static final long serialVersionUID = 1L;
@Override
@@ -451,7 +451,7 @@ public class TypeExtractorTest {
@Test
public void testGenericsNotInSuperclass() {
// use getMapReturnTypes()
- MapFunction<?, ?> function = new MapFunction<LongKeyValue<String>, LongKeyValue<String>>() {
+ RichMapFunction<?, ?> function = new RichMapFunction<LongKeyValue<String>, LongKeyValue<String>>() {
private static final long serialVersionUID = 1L;
@Override
@@ -494,7 +494,7 @@ public class TypeExtractorTest {
@Test
public void testChainedGenericsNotInSuperclass() {
// use TypeExtractor
- MapFunction<?, ?> function = new MapFunction<ChainedTwo<Integer>, ChainedTwo<Integer>>() {
+ RichMapFunction<?, ?> function = new RichMapFunction<ChainedTwo<Integer>, ChainedTwo<Integer>>() {
private static final long serialVersionUID = 1L;
@Override
@@ -536,7 +536,7 @@ public class TypeExtractorTest {
@Test
public void testGenericsInDirectSuperclass() {
// use TypeExtractor
- MapFunction<?, ?> function = new MapFunction<ChainedThree, ChainedThree>() {
+ RichMapFunction<?, ?> function = new RichMapFunction<ChainedThree, ChainedThree>() {
private static final long serialVersionUID = 1L;
@Override
@@ -562,7 +562,7 @@ public class TypeExtractorTest {
@Test
public void testGenericsNotInSuperclassWithNonGenericClassAtEnd() {
// use TypeExtractor
- MapFunction<?, ?> function = new MapFunction<ChainedFour, ChainedFour>() {
+ RichMapFunction<?, ?> function = new RichMapFunction<ChainedFour, ChainedFour>() {
private static final long serialVersionUID = 1L;
@Override
@@ -587,7 +587,7 @@ public class TypeExtractorTest {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testMissingTupleGenericsException() {
- MapFunction<?, ?> function = new MapFunction<String, Tuple2>() {
+ RichMapFunction<?, ?> function = new RichMapFunction<String, Tuple2>() {
private static final long serialVersionUID = 1L;
@Override
@@ -607,7 +607,7 @@ public class TypeExtractorTest {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testTupleSupertype() {
- MapFunction<?, ?> function = new MapFunction<String, Tuple>() {
+ RichMapFunction<?, ?> function = new RichMapFunction<String, Tuple>() {
private static final long serialVersionUID = 1L;
@Override
@@ -635,7 +635,7 @@ public class TypeExtractorTest {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testSameGenericVariable() {
- MapFunction<?, ?> function = new MapFunction<SameTypeVariable<String>, SameTypeVariable<String>>() {
+ RichMapFunction<?, ?> function = new RichMapFunction<SameTypeVariable<String>, SameTypeVariable<String>>() {
private static final long serialVersionUID = 1L;
@Override
@@ -667,7 +667,7 @@ public class TypeExtractorTest {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testNestedTupleGenerics() {
- MapFunction<?, ?> function = new MapFunction<Nested<String, Integer>, Nested<String, Integer>>() {
+ RichMapFunction<?, ?> function = new RichMapFunction<Nested<String, Integer>, Nested<String, Integer>>() {
private static final long serialVersionUID = 1L;
@Override
@@ -706,7 +706,7 @@ public class TypeExtractorTest {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testNestedTupleGenerics2() {
- MapFunction<?, ?> function = new MapFunction<Nested2<Boolean>, Nested2<Boolean>>() {
+ RichMapFunction<?, ?> function = new RichMapFunction<Nested2<Boolean>, Nested2<Boolean>>() {
private static final long serialVersionUID = 1L;
@Override
@@ -746,7 +746,7 @@ public class TypeExtractorTest {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testFunctionWithMissingGenerics() {
- MapFunction function = new MapFunction() {
+ RichMapFunction function = new RichMapFunction() {
private static final long serialVersionUID = 1L;
@Override
@@ -776,7 +776,7 @@ public class TypeExtractorTest {
Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti);
}
- public class IdentityMapper<T> extends MapFunction<T, T> {
+ public class IdentityMapper<T> extends RichMapFunction<T, T> {
private static final long serialVersionUID = 1L;
@Override
@@ -807,7 +807,7 @@ public class TypeExtractorTest {
}
}
- public class IdentityMapper2<T> extends MapFunction<Tuple2<T, String>, T> {
+ public class IdentityMapper2<T> extends RichMapFunction<Tuple2<T, String>, T> {
private static final long serialVersionUID = 1L;
@Override
@@ -843,7 +843,7 @@ public class TypeExtractorTest {
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(1));
}
- public class IdentityMapper3<T, V> extends MapFunction<T, V> {
+ public class IdentityMapper3<T, V> extends RichMapFunction<T, V> {
private static final long serialVersionUID = 1L;
@Override
@@ -916,7 +916,7 @@ public class TypeExtractorTest {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testFunctionWithNoGenericSuperclass() {
- MapFunction<?, ?> function = new Mapper2();
+ RichMapFunction<?, ?> function = new Mapper2();
TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String"));
@@ -924,7 +924,7 @@ public class TypeExtractorTest {
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti);
}
- public class OneAppender<T> extends MapFunction<T, Tuple2<T, Integer>> {
+ public class OneAppender<T> extends RichMapFunction<T, Tuple2<T, Integer>> {
private static final long serialVersionUID = 1L;
public Tuple2<T, Integer> map(T value) {
@@ -935,7 +935,7 @@ public class TypeExtractorTest {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testFunctionDependingPartialOnInput() {
- MapFunction<?, ?> function = new OneAppender<DoubleValue>() {
+ RichMapFunction<?, ?> function = new OneAppender<DoubleValue>() {
private static final long serialVersionUID = 1L;
};
@@ -955,7 +955,7 @@ public class TypeExtractorTest {
@Test
public void testFunctionDependingPartialOnInput2() {
- MapFunction<DoubleValue, ?> function = new OneAppender<DoubleValue>();
+ RichMapFunction<DoubleValue, ?> function = new OneAppender<DoubleValue>();
TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, new ValueTypeInfo<DoubleValue>(DoubleValue.class));
@@ -971,7 +971,7 @@ public class TypeExtractorTest {
Assert.assertEquals(Integer.class , tti.getTypeAt(1).getTypeClass());
}
- public class FieldDuplicator<T> extends MapFunction<T, Tuple2<T, T>> {
+ public class FieldDuplicator<T> extends RichMapFunction<T, Tuple2<T, T>> {
private static final long serialVersionUID = 1L;
public Tuple2<T, T> map(T value) {
@@ -981,7 +981,7 @@ public class TypeExtractorTest {
@Test
public void testFunctionInputInOutputMultipleTimes() {
- MapFunction<Float, ?> function = new FieldDuplicator<Float>();
+ RichMapFunction<Float, ?> function = new FieldDuplicator<Float>();
TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.FLOAT_TYPE_INFO);
@@ -994,7 +994,7 @@ public class TypeExtractorTest {
@Test
public void testFunctionInputInOutputMultipleTimes2() {
- MapFunction<Tuple2<Float, Float>, ?> function = new FieldDuplicator<Tuple2<Float, Float>>();
+ RichMapFunction<Tuple2<Float, Float>, ?> function = new FieldDuplicator<Tuple2<Float, Float>>();
TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, new TupleTypeInfo<Tuple2<Float, Float>>(
BasicTypeInfo.FLOAT_TYPE_INFO, BasicTypeInfo.FLOAT_TYPE_INFO));
@@ -1023,7 +1023,7 @@ public class TypeExtractorTest {
@Test
public void testAbstractAndInterfaceTypesException() {
- MapFunction<String, ?> function = new MapFunction<String, Testable>() {
+ RichMapFunction<String, ?> function = new RichMapFunction<String, Testable>() {
private static final long serialVersionUID = 1L;
@Override
@@ -1039,7 +1039,7 @@ public class TypeExtractorTest {
// good
}
- MapFunction<String, ?> function2 = new MapFunction<String, AbstractClass>() {
+ RichMapFunction<String, ?> function2 = new RichMapFunction<String, AbstractClass>() {
private static final long serialVersionUID = 1L;
@Override
@@ -1059,7 +1059,7 @@ public class TypeExtractorTest {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testValueSupertypeException() {
- MapFunction<?, ?> function = new MapFunction<StringValue, Value>() {
+ RichMapFunction<?, ?> function = new RichMapFunction<StringValue, Value>() {
private static final long serialVersionUID = 1L;
@Override
@@ -1080,7 +1080,7 @@ public class TypeExtractorTest {
@Test
public void testBasicArray() {
// use getCoGroupReturnTypes()
- CoGroupFunction<?, ?, ?> function = new CoGroupFunction<String[], String[], String[]>() {
+ RichCoGroupFunction<?, ?, ?> function = new RichCoGroupFunction<String[], String[], String[]>() {
private static final long serialVersionUID = 1L;
@Override
@@ -1107,7 +1107,7 @@ public class TypeExtractorTest {
@Test
public void testBasicArray2() {
- MapFunction<Boolean[], ?> function = new IdentityMapper<Boolean[]>();
+ RichMapFunction<Boolean[], ?> function = new IdentityMapper<Boolean[]>();
TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO);
@@ -1122,7 +1122,7 @@ public class TypeExtractorTest {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testCustomArray() {
- MapFunction<?, ?> function = new MapFunction<CustomArrayObject[], CustomArrayObject[]>() {
+ RichMapFunction<?, ?> function = new RichMapFunction<CustomArrayObject[], CustomArrayObject[]>() {
private static final long serialVersionUID = 1L;
@Override
@@ -1140,7 +1140,7 @@ public class TypeExtractorTest {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testTupleArray() {
- MapFunction<?, ?> function = new MapFunction<Tuple2<String, String>[], Tuple2<String, String>[]>() {
+ RichMapFunction<?, ?> function = new RichMapFunction<Tuple2<String, String>[], Tuple2<String, String>[]>() {
private static final long serialVersionUID = 1L;
@Override
@@ -1167,7 +1167,7 @@ public class TypeExtractorTest {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testCustomArrayWithTypeVariable() {
- MapFunction<CustomArrayObject2<Boolean>[], ?> function = new IdentityMapper<CustomArrayObject2<Boolean>[]>();
+ RichMapFunction<CustomArrayObject2<Boolean>[], ?> function = new IdentityMapper<CustomArrayObject2<Boolean>[]>();
TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple1<Boolean>[]"));
@@ -1178,7 +1178,7 @@ public class TypeExtractorTest {
Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, tti.getTypeAt(0));
}
- public class GenericArrayClass<T> extends MapFunction<T[], T[]> {
+ public class GenericArrayClass<T> extends RichMapFunction<T[], T[]> {
private static final long serialVersionUID = 1L;
@Override
@@ -1207,7 +1207,7 @@ public class TypeExtractorTest {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testParamertizedCustomObject() {
- MapFunction<?, ?> function = new MapFunction<MyObject<String>, MyObject<String>>() {
+ RichMapFunction<?, ?> function = new RichMapFunction<MyObject<String>, MyObject<String>>() {
private static final long serialVersionUID = 1L;
@Override
@@ -1242,7 +1242,7 @@ public class TypeExtractorTest {
@Test
public void testInputMismatchExceptions() {
- MapFunction<?, ?> function = new MapFunction<Tuple2<String, String>, String>() {
+ RichMapFunction<?, ?> function = new RichMapFunction<Tuple2<String, String>, String>() {
private static final long serialVersionUID = 1L;
@Override
@@ -1265,7 +1265,7 @@ public class TypeExtractorTest {
// right
}
- MapFunction<?, ?> function2 = new MapFunction<StringValue, String>() {
+ RichMapFunction<?, ?> function2 = new RichMapFunction<StringValue, String>() {
private static final long serialVersionUID = 1L;
@Override
@@ -1281,7 +1281,7 @@ public class TypeExtractorTest {
// right
}
- MapFunction<?, ?> function3 = new MapFunction<Tuple1<Integer>[], String>() {
+ RichMapFunction<?, ?> function3 = new RichMapFunction<Tuple1<Integer>[], String>() {
private static final long serialVersionUID = 1L;
@Override
@@ -1297,7 +1297,7 @@ public class TypeExtractorTest {
// right
}
- MapFunction<?, ?> function4 = new MapFunction<Writable, String>() {
+ RichMapFunction<?, ?> function4 = new RichMapFunction<Writable, String>() {
private static final long serialVersionUID = 1L;
@Override
@@ -1314,7 +1314,7 @@ public class TypeExtractorTest {
}
}
- public static class DummyFlatMapFunction<A,B,C,D> extends FlatMapFunction<Tuple2<A,B>, Tuple2<C,D>> {
+ public static class DummyFlatMapFunction<A,B,C,D> extends RichFlatMapFunction<Tuple2<A,B>, Tuple2<C,D>> {
private static final long serialVersionUID = 1L;
@Override
@@ -1336,7 +1336,7 @@ public class TypeExtractorTest {
}
}
- public static class MyQueryableMapper<A> extends MapFunction<String, A> implements ResultTypeQueryable<A> {
+ public static class MyQueryableMapper<A> extends RichMapFunction<String, A> implements ResultTypeQueryable<A> {
private static final long serialVersionUID = 1L;
@SuppressWarnings("unchecked")
@@ -1359,7 +1359,7 @@ public class TypeExtractorTest {
@Test
public void testTupleWithPrimitiveArray() {
- MapFunction<Integer, Tuple9<int[],double[],long[],byte[],char[],float[],short[], boolean[], String[]>> function = new MapFunction<Integer, Tuple9<int[],double[],long[],byte[],char[],float[],short[], boolean[], String[]>>() {
+ RichMapFunction<Integer, Tuple9<int[],double[],long[],byte[],char[],float[],short[], boolean[], String[]>> function = new RichMapFunction<Integer, Tuple9<int[],double[],long[],byte[],char[],float[],short[], boolean[], String[]>>() {
private static final long serialVersionUID = 1L;
@Override
@@ -1382,8 +1382,8 @@ public class TypeExtractorTest {
}
@Test
- public void testInterface() {
- GenericMap<String, Boolean> mapInterface = new GenericMap<String, Boolean>() {
+ public void testFunction() {
+ RichMapFunction<String, Boolean> mapInterface = new RichMapFunction<String, Boolean>() {
@Override
public void setRuntimeContext(RuntimeContext t) {
@@ -1392,7 +1392,6 @@ public class TypeExtractorTest {
@Override
public void open(Configuration parameters) throws Exception {
-
}
@Override
@@ -1414,4 +1413,17 @@ public class TypeExtractorTest {
TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(mapInterface, BasicTypeInfo.STRING_TYPE_INFO);
Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti);
}
+
+ @Test
+ public void testInterface() {
+ MapFunction<String, Boolean> mapInterface = new MapFunction<String, Boolean>() {
+ @Override
+ public Boolean map(String record) throws Exception {
+ return null;
+ }
+ };
+
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(mapInterface, BasicTypeInfo.STRING_TYPE_INFO);
+ Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java8-tests/pom.xml b/flink-java8-tests/pom.xml
new file mode 100644
index 0000000..2587776
--- /dev/null
+++ b/flink-java8-tests/pom.xml
@@ -0,0 +1,145 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-parent</artifactId>
+ <version>0.6-incubating-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-java8-tests</artifactId>
+ <name>flink-java8-tests</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.7</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <!-- just define the Java version to be used for compiling and plugins -->
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version><!--$NO-MVN-MAN-VER$-->
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ <!-- High optimization, no debugging <compilerArgument>-g:none -O</compilerArgument> -->
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <log.level>WARN</log.level>
+ </systemPropertyVariables>
+ <forkMode>once</forkMode>
+ <argLine>-Xmx1024m</argLine>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <log.level>WARN</log.level>
+ </systemPropertyVariables>
+ <forkMode>always</forkMode>
+ <threadCount>1</threadCount>
+ <perCoreThreadCount>false</perCoreThreadCount>
+ </configuration>
+ </plugin>
+ </plugins>
+
+ <pluginManagement>
+ <plugins>
+ <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>
+ org.apache.maven.plugins
+ </groupId>
+ <artifactId>
+ maven-assembly-plugin
+ </artifactId>
+ <versionRange>
+ [2.4,)
+ </versionRange>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
new file mode 100644
index 0000000..c417249
--- /dev/null
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+public class CoGroupITCase implements Serializable {
+
+ @Test
+ public void testCoGroupLambda() {
+ try {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Integer, String>> left = env.fromElements(
+ new Tuple2<Integer, String>(1, "hello"),
+ new Tuple2<Integer, String>(2, "what's"),
+ new Tuple2<Integer, String>(2, "up")
+ );
+ DataSet<Tuple2<Integer, String>> right = env.fromElements(
+ new Tuple2<Integer, String>(1, "not"),
+ new Tuple2<Integer, String>(1, "much"),
+ new Tuple2<Integer, String>(2, "really")
+ );
+ DataSet<Tuple2<Integer,String>> joined = left.coGroup(right).where(0).equalTo(0)
+ .with((values1, values2, out) -> {
+ int sum = 0;
+ String conc = "";
+ while (values1.hasNext()) {
+ sum += values1.next().f0;
+ conc += values1.next().f1;
+ }
+ while (values2.hasNext()) {
+ sum += values2.next().f0;
+ conc += values2.next().f1;
+ }
+ });
+ env.execute();
+
+
+ } catch (UnsupportedLambdaExpressionException e) {
+ // Success
+ return;
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
new file mode 100644
index 0000000..f8d217e
--- /dev/null
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+public class CrossITCase implements Serializable {
+
+ @Test
+ public void testCrossLambda() {
+ try {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Integer, String>> left = env.fromElements(
+ new Tuple2<Integer, String>(1, "hello"),
+ new Tuple2<Integer, String>(2, "what's"),
+ new Tuple2<Integer, String>(2, "up")
+ );
+ DataSet<Tuple2<Integer, String>> right = env.fromElements(
+ new Tuple2<Integer, String>(1, "not"),
+ new Tuple2<Integer, String>(1, "much"),
+ new Tuple2<Integer, String>(2, "really")
+ );
+ DataSet<Tuple2<Integer,String>> joined = left.cross(right)
+ .with((t,s) -> new Tuple2<Integer, String> (t.f0 + s.f0, t.f1 + " " + s.f1));
+
+ } catch (UnsupportedLambdaExpressionException e) {
+ // Success
+ return;
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
new file mode 100644
index 0000000..c775425
--- /dev/null
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class FilterITCase extends JavaProgramTestBase {
+
+
+ public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env) {
+
+ List<Tuple3<Integer, Long, String>> data = new ArrayList<Tuple3<Integer, Long, String>>();
+ data.add(new Tuple3<Integer, Long, String>(1,1l,"Hi"));
+ data.add(new Tuple3<Integer, Long, String>(2,2l,"Hello"));
+ data.add(new Tuple3<Integer, Long, String>(3,2l,"Hello world"));
+ data.add(new Tuple3<Integer, Long, String>(4,3l,"Hello world, how are you?"));
+ data.add(new Tuple3<Integer, Long, String>(5,3l,"I am fine."));
+ data.add(new Tuple3<Integer, Long, String>(6,3l,"Luke Skywalker"));
+ data.add(new Tuple3<Integer, Long, String>(7,4l,"Comment#1"));
+ data.add(new Tuple3<Integer, Long, String>(8,4l,"Comment#2"));
+ data.add(new Tuple3<Integer, Long, String>(9,4l,"Comment#3"));
+ data.add(new Tuple3<Integer, Long, String>(10,4l,"Comment#4"));
+ data.add(new Tuple3<Integer, Long, String>(11,5l,"Comment#5"));
+ data.add(new Tuple3<Integer, Long, String>(12,5l,"Comment#6"));
+ data.add(new Tuple3<Integer, Long, String>(13,5l,"Comment#7"));
+ data.add(new Tuple3<Integer, Long, String>(14,5l,"Comment#8"));
+ data.add(new Tuple3<Integer, Long, String>(15,5l,"Comment#9"));
+ data.add(new Tuple3<Integer, Long, String>(16,6l,"Comment#10"));
+ data.add(new Tuple3<Integer, Long, String>(17,6l,"Comment#11"));
+ data.add(new Tuple3<Integer, Long, String>(18,6l,"Comment#12"));
+ data.add(new Tuple3<Integer, Long, String>(19,6l,"Comment#13"));
+ data.add(new Tuple3<Integer, Long, String>(20,6l,"Comment#14"));
+ data.add(new Tuple3<Integer, Long, String>(21,6l,"Comment#15"));
+
+ Collections.shuffle(data);
+
+ return env.fromCollection(data);
+ }
+
+ private static int NUM_PROGRAMS = 1;
+
+ private int curProgId = config.getInteger("ProgramId", -1);
+ private String resultPath;
+ private String expectedResult;
+
+ public FilterITCase(Configuration config) {
+ super(config);
+ }
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ expectedResult = FilterProgs.runProgram(curProgId, resultPath);
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(expectedResult, resultPath);
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+ LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+ for(int i=1; i <= NUM_PROGRAMS; i++) {
+ Configuration config = new Configuration();
+ config.setInteger("ProgramId", i);
+ tConfigs.add(config);
+ }
+
+ return toParameterList(tConfigs);
+ }
+
+ private static class FilterProgs {
+
+ public static String runProgram(int progId, String resultPath) throws Exception {
+
+ switch(progId) {
+ case 1: {
+ /*
+ * Test lambda filter
+ * Functionality identical to org.apache.flink.test.javaApiOperators.FilterITCase test 3
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+ filter(value -> value.f2.contains("world"));
+ filterDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n";
+ }
+ default:
+ throw new IllegalArgumentException("Invalid program id");
+ }
+
+ }
+
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
new file mode 100644
index 0000000..043b4e8
--- /dev/null
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+public class FlatJoinITCase implements Serializable {
+
+ @Test
+ public void testFlatJoinLambda() {
+ try {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Integer, String>> left = env.fromElements(
+ new Tuple2<Integer, String>(1, "hello"),
+ new Tuple2<Integer, String>(2, "what's"),
+ new Tuple2<Integer, String>(2, "up")
+ );
+ DataSet<Tuple2<Integer, String>> right = env.fromElements(
+ new Tuple2<Integer, String>(1, "not"),
+ new Tuple2<Integer, String>(1, "much"),
+ new Tuple2<Integer, String>(2, "really")
+ );
+ DataSet<Tuple2<Integer,String>> joined = left.join(right).where(0).equalTo(0)
+ .with((t,s,out) -> out.collect(new Tuple2<Integer,String>(t.f0, t.f1 + " " + s.f1)));
+ } catch (UnsupportedLambdaExpressionException e) {
+ // Success
+ return;
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
new file mode 100644
index 0000000..55f507c
--- /dev/null
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+public class FlatMapITCase implements Serializable {
+
+ @Test
+ public void testFlatMapLambda() {
+ try {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
+ DataSet<String> flatMappedDs = stringDs.flatMap((s, out) -> out.collect(s.replace("a", "b")));
+ env.execute();
+ } catch (UnsupportedLambdaExpressionException e) {
+ // Success
+ return;
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
new file mode 100644
index 0000000..494aff6
--- /dev/null
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+public class GroupReduceITCase implements Serializable {
+
+ @Test
+ public void testAllGroupReduceLambda() {
+ try {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
+ DataSet<String> concatDs = stringDs.reduceGroup((values, out) -> {
+ String conc = "";
+ while (values.hasNext()) {
+ String s = values.next();
+ conc = conc.concat(s);
+ }
+ out.collect(conc);
+ });
+ env.execute();
+ } catch (UnsupportedLambdaExpressionException e) {
+ // Success
+ return;
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testGroupReduceLambda() {
+ try {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Integer,String>> stringDs = env.fromElements(
+ new Tuple2<Integer,String>(1, "aa"),
+ new Tuple2<Integer,String>(2, "ab"),
+ new Tuple2<Integer,String>(1, "ac"),
+ new Tuple2<Integer,String>(2, "ad")
+ );
+ DataSet<String> concatDs = stringDs
+ .groupBy(0)
+ .reduceGroup((values, out) -> {
+ String conc = "";
+ while (values.hasNext()) {
+ String s = values.next().f1;
+ conc = conc.concat(s);
+ }
+ out.collect(conc);
+ });
+ env.execute();
+ } catch (UnsupportedLambdaExpressionException e) {
+ // Success
+ return;
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
new file mode 100644
index 0000000..3f4f696
--- /dev/null
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+public class JoinITCase implements Serializable {
+
+ @Test
+ public void testJoinLambda() {
+ try {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Integer, String>> left = env.fromElements(
+ new Tuple2<Integer, String>(1, "hello"),
+ new Tuple2<Integer, String>(2, "what's"),
+ new Tuple2<Integer, String>(2, "up")
+ );
+ DataSet<Tuple2<Integer, String>> right = env.fromElements(
+ new Tuple2<Integer, String>(1, "not"),
+ new Tuple2<Integer, String>(1, "much"),
+ new Tuple2<Integer, String>(2, "really")
+ );
+ DataSet<Tuple2<Integer,String>> joined = left.join(right).where(0).equalTo(0)
+ .with((t,s) -> new Tuple2<Integer,String>(t.f0, t.f1 + " " + t.f1));
+
+ } catch (UnsupportedLambdaExpressionException e) {
+ // Success
+ return;
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
new file mode 100644
index 0000000..3af360b
--- /dev/null
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+public class MapITCase implements Serializable{
+
+ @Test
+ public void TestMapLambda () {
+ try {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
+ DataSet<String> mappedDs = stringDs.map (s -> s.replace("a", "b"));
+ env.execute();
+ }
+ catch (UnsupportedLambdaExpressionException e) {
+ // Success
+ return;
+ }
+ catch (Exception e) {
+ Assert.fail();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
new file mode 100644
index 0000000..ab27fe4
--- /dev/null
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.BasicTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class ReduceITCase extends JavaProgramTestBase {
+
+ public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env) {
+
+ List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<Tuple5<Integer, Long, Integer, String, Long>>();
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(1,1l,0,"Hallo",1l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(2,2l,1,"Hallo Welt",2l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(2,3l,2,"Hallo Welt wie",1l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(3,4l,3,"Hallo Welt wie gehts?",2l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(3,5l,4,"ABC",2l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(3,6l,5,"BCD",3l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(4,7l,6,"CDE",2l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(4,8l,7,"DEF",1l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(4,9l,8,"EFG",1l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(4,10l,9,"FGH",2l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(5,11l,10,"GHI",1l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(5,12l,11,"HIJ",3l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(5,13l,12,"IJK",3l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(5,14l,13,"JKL",2l));
+ data.add(new Tuple5<Integer, Long, Integer, String, Long>(5,15l,14,"KLM",2l));
+
+ Collections.shuffle(data);
+
+ TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>> type = new
+ TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>>(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO
+ );
+
+ return env.fromCollection(data, type);
+ }
+
+ private static int NUM_PROGRAMS = 1;
+
+ private int curProgId = config.getInteger("ProgramId", -1);
+ private String resultPath;
+ private String expectedResult;
+
+ public ReduceITCase(Configuration config) {
+ super(config);
+ }
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ expectedResult = ReduceProgs.runProgram(curProgId, resultPath);
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(expectedResult, resultPath);
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+ LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+ for(int i=1; i <= NUM_PROGRAMS; i++) {
+ Configuration config = new Configuration();
+ config.setInteger("ProgramId", i);
+ tConfigs.add(config);
+ }
+
+ return toParameterList(tConfigs);
+ }
+
+ private static class ReduceProgs {
+
+ public static String runProgram(int progId, String resultPath) throws Exception {
+
+ switch(progId) {
+ case 1: {
+ /*
+ * Test reduce with lambda
+ * Functionality identical to org.apache.flink.test.javaApiOperators.ReduceITCase test 2
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = get5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds
+ .groupBy(4, 0)
+ .reduce((in1, in2) -> {
+ Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>();
+ out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4);
+ return out;
+ });
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "1,1,0,Hallo,1\n" +
+ "2,3,2,Hallo Welt wie,1\n" +
+ "2,2,1,Hallo Welt,2\n" +
+ "3,9,0,P-),2\n" +
+ "3,6,5,BCD,3\n" +
+ "4,17,0,P-),1\n" +
+ "4,17,0,P-),2\n" +
+ "5,11,10,GHI,1\n" +
+ "5,29,0,P-),2\n" +
+ "5,25,0,P-),3\n";
+ }
+ default:
+ throw new IllegalArgumentException("Invalid program id");
+ }
+
+ }
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
index 636c492..34cd232 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
@@ -23,7 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
@@ -54,7 +54,7 @@ import java.io.IOException;
/**
* The base class for all tasks able to participate in an iteration.
*/
-public abstract class AbstractIterativePactTask<S extends Function, OT> extends RegularPactTask<S, OT>
+public abstract class AbstractIterativePactTask<S extends RichFunction, OT> extends RegularPactTask<S, OT>
implements Terminable
{
private static final Log log = LogFactory.getLog(AbstractIterativePactTask.class);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
index d7f3b50..7a77cff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
@@ -25,7 +25,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -75,7 +75,7 @@ import org.apache.flink.util.MutableObjectIterator;
* The type of the feed-back data set (bulk partial solution / workset). For bulk iterations, {@code Y} is the
* same as {@code X}
*/
-public class IterationHeadPactTask<X, Y, S extends Function, OT> extends AbstractIterativePactTask<S, OT> {
+public class IterationHeadPactTask<X, Y, S extends RichFunction, OT> extends AbstractIterativePactTask<S, OT> {
private static final Log log = LogFactory.getLog(IterationHeadPactTask.class);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
index 25a6149..2a8325c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
@@ -23,7 +23,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.runtime.io.network.api.BufferWriter;
import org.apache.flink.runtime.io.network.channels.EndOfSuperstepEvent;
import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
@@ -41,7 +41,7 @@ import org.apache.flink.util.Collector;
* a {@link BlockingBackChannel} for the workset -XOR- a {@link MutableHashTable} for the solution set. In this case
* this task must be scheduled on the same instance as the head.
*/
-public class IterationIntermediatePactTask<S extends Function, OT> extends AbstractIterativePactTask<S, OT> {
+public class IterationIntermediatePactTask<S extends RichFunction, OT> extends AbstractIterativePactTask<S, OT> {
private static final Log log = LogFactory.getLog(IterationIntermediatePactTask.class);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
index 570630f..942e2f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.iterative.task;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
@@ -38,7 +38,7 @@ import org.apache.flink.util.Collector;
* <p/>
* If there is a separate solution set tail, the iteration head has to make sure to wait for it to finish.
*/
-public class IterationTailPactTask<S extends Function, OT> extends AbstractIterativePactTask<S, OT>
+public class IterationTailPactTask<S extends RichFunction, OT> extends AbstractIterativePactTask<S, OT>
implements PactTaskContext<S, OT> {
private static final Log log = LogFactory.getLog(IterationTailPactTask.class);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
index fe70171..d7d63af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.operators;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
-public abstract class AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends MatchDriver<IT1, IT2, OT> implements ResettablePactDriver<GenericJoiner<IT1, IT2, OT>, OT> {
+public abstract class AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends MatchDriver<IT1, IT2, OT> implements ResettablePactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
private volatile JoinTaskIterator<IT1, IT2, OT> matchIterator;
@@ -110,7 +110,7 @@ public abstract class AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends M
@Override
public void run() throws Exception {
- final GenericJoiner<IT1, IT2, OT> matchStub = this.taskContext.getStub();
+ final FlatJoinFunction<IT1, IT2, OT> matchStub = this.taskContext.getStub();
final Collector<OT> collector = this.taskContext.getOutputCollector();
if (buildSideIndex == 0) {