You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/01 02:36:51 UTC
[10/16] [FLINK-701] Several cleanups after SAM refactoring. - Lambda
detection compiles on earlier java versions - Add lambda detection test. -
Fix JavaDocs
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatMapFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatMapFunction.java
index 2293b5e..c045508 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatMapFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatMapFunction.java
@@ -20,22 +20,14 @@ package org.apache.flink.api.java.functions;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.util.Collector;
/**
- * The abstract base class for flatMap functions. FlatMap functions take elements and transform them,
- * into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists
- * and arrays. Operations that produce multiple strictly one result element per input element can also
- * use the {@link RichMapFunction}.
- * <p>
- * The basic syntax for using a FlatMapFunction is as follows:
- * <pre><blockquote>
- * DataSet<X> input = ...;
- *
- * DataSet<Y> result = input.flatMap(new MyFlatMapFunction());
- * </blockquote></pre>
- * <p>
- * Like all functions, the FlatMapFunction needs to be serializable, as defined in {@link java.io.Serializable}.
+ * Rich variant of the {@link FlatMapFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
*
* @param <IN> Type of the input elements.
* @param <OUT> Type of the returned elements.
@@ -44,16 +36,6 @@ public abstract class RichFlatMapFunction<IN, OUT> extends AbstractRichFunction
private static final long serialVersionUID = 1L;
- /**
- * The core method of the FlatMapFunction. Takes an element from the input data set and transforms
- * it into zero, one, or more elements.
- *
- * @param value The input value.
- * @param out The collector for for emitting result values.
- *
- * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
- * to fail and may trigger recovery.
- */
@Override
public abstract void flatMap(IN value, Collector<OUT> out) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java
index eb75f53..801f592 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java
@@ -27,26 +27,14 @@ import java.util.Iterator;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.FlatCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.util.Collector;
/**
- * The abstract base class for group reduce functions. Group reduce functions process groups of elements.
- * They may aggregate them to a single value, or produce multiple result values for each group.
- * <p>
- * For a reduce functions that works incrementally by combining always two elements, see
- * {@link RichReduceFunction}, called via {@link org.apache.flink.api.java.DataSet#reduce(RichReduceFunction)}.
- * <p>
- * The basic syntax for using a grouped GroupReduceFunction is as follows:
- * <pre><blockquote>
- * DataSet<X> input = ...;
- *
- * DataSet<X> result = input.groupBy(<key-definition>).reduceGroup(new MyGroupReduceFunction());
- * </blockquote></pre>
- * <p>
- * GroupReduceFunctions may be "combinable", in which case they can pre-reduce partial groups in order to
- * reduce the data volume early. See the {@link #combine(Iterator, Collector)} function for details.
- * <p>
- * Like all functions, the GroupReduceFunction needs to be serializable, as defined in {@link java.io.Serializable}.
+ * Rich variant of the {@link GroupReduceFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
*
* @param <IN> Type of the elements that this function processes.
* @param <OUT> The type of the elements returned by the user-defined function.
@@ -55,16 +43,6 @@ public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunct
private static final long serialVersionUID = 1L;
- /**
- * Core method of the reduce function. It is called one per group of elements. If the reducer
- * is not grouped, than the entire data set is considered one group.
- *
- * @param values The iterator returning the group of values to be reduced.
- * @param out The collector to emit the returned values.
- *
- * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
- * to fail and may trigger recovery.
- */
@Override
public abstract void reduce(Iterator<IN> values, Collector<OUT> out) throws Exception;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichJoinFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichJoinFunction.java
index 7eaf44c..a0c28ee 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichJoinFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichJoinFunction.java
@@ -18,10 +18,20 @@
package org.apache.flink.api.java.functions;
-
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+/**
+ * Rich variant of the {@link JoinFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <IN1> The type of the elements in the first input.
+ * @param <IN2> The type of the elements in the second input.
+ * @param <OUT> The type of the result elements.
+ */
public abstract class RichJoinFunction<IN1,IN2,OUT> extends AbstractRichFunction implements JoinFunction<IN1,IN2,OUT> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapFunction.java
index 54de7d4..f6f5356 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapFunction.java
@@ -20,22 +20,13 @@ package org.apache.flink.api.java.functions;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFunction;
/**
- * The abstract base class for Map functions. Map functions take elements and transform them,
- * element wise. A Map function always produces a single result element for each input element.
- * Typical applications are parsing elements, converting data types, or projecting out fields.
- * Operations that produce multiple result elements from a single input element can be implemented
- * using the {@link RichFlatMapFunction}.
- * <p>
- * The basic syntax for using a MapFunction is as follows:
- * <pre><blockquote>
- * DataSet<X> input = ...;
- *
- * DataSet<Y> result = input.map(new MyMapFunction());
- * </blockquote></pre>
- * <p>
- * Like all functions, the MapFunction needs to be serializable, as defined in {@link java.io.Serializable}.
+ * Rich variant of the {@link MapFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
*
* @param <IN> Type of the input elements.
* @param <OUT> Type of the returned elements.
@@ -44,16 +35,6 @@ public abstract class RichMapFunction<IN, OUT> extends AbstractRichFunction impl
private static final long serialVersionUID = 1L;
- /**
- * The core method of the MapFunction. Takes an element from the input data set and transforms
- * it into another element.
- *
- * @param value The input value.
- * @return The value produced by the map function from the input value.
- *
- * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
- * to fail and may trigger recovery.
- */
@Override
public abstract OUT map(IN value) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichReduceFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichReduceFunction.java
index 35cb392..a63f8dc 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichReduceFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichReduceFunction.java
@@ -20,27 +20,13 @@ package org.apache.flink.api.java.functions;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichFunction;
/**
- * The abstract base class for Reduce functions. Reduce functions combine groups of elements to
- * a single value, by taking always two elements and combining them into one. Reduce functions
- * may be used on entire data sets, or on grouped data sets. In the latter case, each group is reduced
- * individually.
- * <p>
- * For a reduce functions that work on an entire group at the same time (such as the
- * MapReduce/Hadoop-style reduce), see {@link RichGroupReduceFunction}, called via
- * {@link org.apache.flink.api.java.DataSet#reduceGroup(RichGroupReduceFunction)}. In the general case,
- * ReduceFunctions are considered faster, because they allow the system to use hash-based
- * execution strategies.
- * <p>
- * The basic syntax for using a grouped ReduceFunction is as follows:
- * <pre><blockquote>
- * DataSet<X> input = ...;
- *
- * DataSet<X> result = input.groupBy(<key-definition>).reduce(new MyReduceFunction());
- * </blockquote></pre>
- * <p>
- * Like all functions, the ReduceFunction needs to be serializable, as defined in {@link java.io.Serializable}.
+ * Rich variant of the {@link ReduceFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
*
* @param <T> Type of the elements that this function processes.
*/
@@ -48,16 +34,5 @@ public abstract class RichReduceFunction<T> extends AbstractRichFunction impleme
private static final long serialVersionUID = 1L;
- /**
- * The core method of the ReduceFunction, combining two values into one value of the same type.
- * The reduce function is consecutively applied to all values of a group until only a single value remains.
- *
- * @param value1 The first value to combine.
- * @param value2 The second value to combine.
- * @return The combined value of both input values.
- *
- * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
- * to fail and may trigger recovery.
- */
public abstract T reduce(T value1, T value2) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index 89c3334..744893b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -515,7 +515,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
if (function == null) {
throw new NullPointerException("CoGroup function must not be null.");
}
- if (FunctionUtils.isSerializedLambdaFunction(function)) {
+ if (FunctionUtils.isLambdaFunction(function)) {
throw new UnsupportedLambdaExpressionException();
}
TypeInformation<R> returnType = TypeExtractor.getCoGroupReturnTypes(function, input1.getType(), input2.getType());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
index d1e99d6..a24a093 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
@@ -134,7 +134,7 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
if (function == null) {
throw new NullPointerException("Cross function must not be null.");
}
- if (FunctionUtils.isSerializedLambdaFunction(function)) {
+ if (FunctionUtils.isLambdaFunction(function)) {
throw new UnsupportedLambdaExpressionException();
}
TypeInformation<R> returnType = TypeExtractor.getCrossReturnTypes(function, input1.getType(), input2.getType());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index 591551f..7646fa0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -21,7 +21,6 @@ package org.apache.flink.api.java.operators;
import java.util.Iterator;
import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Operator;
@@ -29,6 +28,7 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable;
import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -80,7 +80,6 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<?, T, ?> translateToDataFlow(Operator<T> input) {
final RichGroupReduceFunction<T, T> function = new DistinctFunction<T>();
- final FlatCombineFunction<T> combineFunction = new DistinctCombiner<T>();
String name = function.getClass().getName();
@@ -104,7 +103,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
PlanUnwrappingReduceGroupOperator<T, T, ?> po = translateSelectorFunctionDistinct(
- selectorKeys, function, combineFunction, getInputType(), getResultType(), name, input, true);
+ selectorKeys, function, getInputType(), getResultType(), name, input);
po.setDegreeOfParallelism(this.getParallelism());
@@ -118,9 +117,8 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
// --------------------------------------------------------------------------------------------
private static <IN, OUT, K> PlanUnwrappingReduceGroupOperator<IN, OUT, K> translateSelectorFunctionDistinct(
- Keys.SelectorFunctionKeys<IN, ?> rawKeys, RichGroupReduceFunction<IN, OUT> function, FlatCombineFunction<IN> combineFunction,
- TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input,
- boolean combinable)
+ Keys.SelectorFunctionKeys<IN, ?> rawKeys, RichGroupReduceFunction<IN, OUT> function,
+ TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input)
{
@SuppressWarnings("unchecked")
final Keys.SelectorFunctionKeys<IN, K> keys = (Keys.SelectorFunctionKeys<IN, K>) rawKeys;
@@ -131,7 +129,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer =
- new PlanUnwrappingReduceGroupOperator<IN, OUT, K>(function, keys, name, outputType, typeInfoWithKey, combinable);
+ new PlanUnwrappingReduceGroupOperator<IN, OUT, K>(function, keys, name, outputType, typeInfoWithKey, true);
MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, IN>>> mapper = new MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple2<K, IN>>(inputType, typeInfoWithKey), "Key Extractor");
@@ -144,26 +142,14 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
return reducer;
}
+ @Combinable
public static final class DistinctFunction<T> extends RichGroupReduceFunction<T, T> {
private static final long serialVersionUID = 1L;
@Override
- public void reduce(Iterator<T> values, Collector<T> out)
- throws Exception {
+ public void reduce(Iterator<T> values, Collector<T> out) {
out.collect(values.next());
}
}
-
- public static final class DistinctCombiner<T> implements FlatCombineFunction<T> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void combine(Iterator<T> values, Collector<T> out)
- throws Exception {
- out.collect(values.next());
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index 7ab0b11..e1424ad 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -51,9 +51,6 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
private final Grouping<IN> grouper;
- // reduceFunction is a GroupReduceFunction
- private boolean richFunction;
-
private boolean combinable;
/**
@@ -176,8 +173,8 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions();
UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
- GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>> po =
- new GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);
+ GroupReduceOperatorBase<IN, OUT, GroupReduceFunction<IN, OUT>> po =
+ new GroupReduceOperatorBase<IN, OUT, GroupReduceFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);
po.setCombinable(combinable);
po.setInput(input);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
index 3223f4d..200c4de 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
@@ -26,8 +26,8 @@ import org.apache.flink.api.java.DataSet;
* Grouping is an intermediate step for a transformation on a grouped DataSet.<br/>
* The following transformation can be applied on Grouping:
* <ul>
- * <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.java.functions.RichReduceFunction)},</li>
- * <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)}, and</li>
+ * <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.common.functions.ReduceFunction)},</li>
+ * <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)}, and</li>
* <li>{@link UnsortedGrouping#aggregate(org.apache.flink.api.java.aggregation.Aggregations, int)}.</li>
* </ul>
*
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index a07a157..ce0aea7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -467,7 +467,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
if (function == null) {
throw new NullPointerException("Join function must not be null.");
}
- if (FunctionUtils.isSerializedLambdaFunction(function)) {
+ if (FunctionUtils.isLambdaFunction(function)) {
throw new UnsupportedLambdaExpressionException();
}
TypeInformation<R> returnType = TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type());
@@ -478,10 +478,10 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
if (function == null) {
throw new NullPointerException("Join function must not be null.");
}
- if (FunctionUtils.isSerializedLambdaFunction(function)) {
+ if (FunctionUtils.isLambdaFunction(function)) {
throw new UnsupportedLambdaExpressionException();
}
- FlatJoinFunction generatedFunction = new WrappingFlatJoinFunction<I1, I2, R>(function);
+ FlatJoinFunction<I1, I2, R> generatedFunction = new WrappingFlatJoinFunction<I1, I2, R>(function);
TypeInformation<R> returnType = TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type());
return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), generatedFunction, function, returnType, getJoinHint());
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index 97b2417..767f75a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -32,7 +32,7 @@ import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
* SortedGrouping is an intermediate step for a transformation on a grouped and sorted DataSet.<br/>
* The following transformation can be applied on sorted groups:
* <ul>
- * <li>{@link SortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)},</li>
+ * <li>{@link SortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)},</li>
* </ul>
*
* @param <T> The type of the elements of the sorted and grouped DataSet.
@@ -82,7 +82,7 @@ public class SortedGrouping<T> extends Grouping<T> {
if (reducer == null) {
throw new NullPointerException("GroupReduce function must not be null.");
}
- if (FunctionUtils.isSerializedLambdaFunction(reducer)) {
+ if (FunctionUtils.isLambdaFunction(reducer)) {
throw new UnsupportedLambdaExpressionException();
}
return new GroupReduceOperator<T, R>(this, reducer);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index 9e71ba0..87b1454 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -127,7 +127,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
if (reducer == null) {
throw new NullPointerException("GroupReduce function must not be null.");
}
- if (FunctionUtils.isSerializedLambdaFunction(reducer)) {
+ if (FunctionUtils.isLambdaFunction(reducer)) {
throw new UnsupportedLambdaExpressionException();
}
return new GroupReduceOperator<T, R>(this, reducer);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
index 5e80455..29eb5ed 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
@@ -39,7 +39,7 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
public PlanUnwrappingReduceGroupOperator(GroupReduceFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K> key, String name,
TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> typeInfoWithKey, boolean combinable)
{
- super(combinable ? new TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K>((RichGroupReduceFunction) udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf),
+ super(combinable ? new TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K>((RichGroupReduceFunction<IN, OUT>) udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf),
new UnaryOperatorInformation<Tuple2<K, IN>, OUT>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name);
super.setCombinable(combinable);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
index fa0ca11..e9b5c25 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
@@ -26,7 +26,7 @@ import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
/**
- * The CoGroupFunction is the base class for functions that are invoked by a {@link org.apache.flink.api.java.operators.CoGroupOperator}.
+ * The CoGroupFunction is the base class for functions that are invoked by a {@link org.apache.flink.api.java.record.operators.CoGroupOperator}.
*/
public abstract class CoGroupFunction extends AbstractRichFunction implements org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record> {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
index c4587fd..3a6c931 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.types.Record;
/**
- * The CrossFunction is the base class for functions that are invoked by a {@link org.apache.flink.api.java.operators.CrossOperator}.
+ * The CrossFunction is the base class for functions that are invoked by a {@link org.apache.flink.api.java.record.operators.CrossOperator}.
*/
public abstract class CrossFunction extends AbstractRichFunction implements org.apache.flink.api.common.functions.CrossFunction<Record, Record, Record> {
@@ -41,10 +41,6 @@ public abstract class CrossFunction extends AbstractRichFunction implements org.
* runtime catches an exception, it aborts the task and lets the fail-over logic
* decide whether to retry the task execution.
*/
-
- //@Override
- //public abstract void cross(Record record1, Record record2, Collector<Record> out) throws Exception;
-
@Override
public abstract Record cross(Record first, Record second) throws Exception;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
index dce24a3..cc4f96b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.record.functions;
import org.apache.flink.api.common.functions.AbstractRichFunction;
@@ -25,25 +24,13 @@ import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
/**
- * The JoinFunction must implementation by functions of a {@link org.apache.flink.api.java.operators.JoinOperator}.
+ * The JoinFunction must implementation by functions of a {@link org.apache.flink.api.java.record.operators.JoinOperator}.
* It resembles an equality join of both inputs on their key fields.
*/
public abstract class JoinFunction extends AbstractRichFunction implements FlatJoinFunction<Record, Record, Record> {
private static final long serialVersionUID = 1L;
- /**
- * This method must be implemented to provide a user implementation of a join.
- * It is called for each two records that share the same key and come from different inputs.
- *
- * @param value1 The record that comes from the first input.
- * @param value2 The record that comes from the second input.
- * @return The result of the join UDF as record
- *
- * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
- * runtime catches an exception, it aborts the combine task and lets the fail-over logic
- * decide whether to retry the combiner execution.
- */
@Override
public abstract void join(Record value1, Record value2, Collector<Record> out) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
index 99c945d..b082e2d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.record.functions;
import org.apache.flink.api.common.functions.AbstractRichFunction;
@@ -28,6 +27,7 @@ import org.apache.flink.util.Collector;
* The MapFunction must be extended to provide a mapper implementation
* By definition, the mapper is called for each individual input record.
*/
+@SuppressWarnings("deprecation")
public abstract class MapFunction extends AbstractRichFunction implements GenericCollectorMap<Record, Record> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
index 073b11a..a1e6369 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
@@ -29,7 +29,7 @@ import org.apache.flink.util.Collector;
/**
* The ReduceFunction must be extended to provide a reducer implementation, as invoked by a
- * {@link org.apache.flink.api.java.operators.ReduceOperator}.
+ * {@link org.apache.flink.api.java.record.operators.ReduceOperator}.
*/
public abstract class ReduceFunction extends AbstractRichFunction implements GroupReduceFunction<Record, Record>, FlatCombineFunction<Record> {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapOperator.java
index 64f70f6..85afa64 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapOperator.java
@@ -41,6 +41,7 @@ import org.apache.flink.types.Record;
*
* @see MapFunction
*/
+@SuppressWarnings("deprecation")
public class MapOperator extends CollectorMapOperatorBase<Record, Record, MapFunction> implements RecordOperator {
private static String DEFAULT_NAME = "<Unnamed Mapper>";
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
index 155bbd1..be872e5 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
@@ -129,7 +129,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
assertEquals(IdentityMapper.class, worksetMapper.getUserCodeWrapper().getUserCodeClass());
assertEquals(NextWorksetMapper.class, nextWorksetMapper.getUserCodeWrapper().getUserCodeClass());
if (solutionSetJoin.getUserCodeWrapper().getUserCodeObject() instanceof WrappingFunction) {
- WrappingFunction wf = (WrappingFunction) solutionSetJoin.getUserCodeWrapper().getUserCodeObject();
+ WrappingFunction<?> wf = (WrappingFunction<?>) solutionSetJoin.getUserCodeWrapper().getUserCodeObject();
assertEquals(SolutionWorksetJoin.class, wf.getWrappedFunction().getClass());
}
else {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java
new file mode 100644
index 0000000..ec3898e
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.translation;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.DistinctOperator;
+import org.junit.Assert;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class DistrinctTranslationTest {
+
+ @Test
+ public void testCombinable() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<String> input = env.fromElements("1", "2", "1", "3");
+
+
+ DistinctOperator<String> op = input.distinct(new KeySelector<String, String>() {
+ public String getKey(String value) { return value; }
+ });
+
+ op.print();
+
+ Plan p = env.createProgramPlan();
+
+ GroupReduceOperatorBase<?, ?, ?> reduceOp = (GroupReduceOperatorBase<?, ?, ?>) p.getDataSinks().iterator().next().getInput();
+ Assert.assertTrue(reduceOp.isCombinable());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index c6ad73d..8346d00 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -1385,6 +1385,8 @@ public class TypeExtractorTest {
public void testFunction() {
RichMapFunction<String, Boolean> mapInterface = new RichMapFunction<String, Boolean>() {
+ private static final long serialVersionUID = 1L;
+
@Override
public void setRuntimeContext(RuntimeContext t) {
@@ -1417,6 +1419,8 @@ public class TypeExtractorTest {
@Test
public void testInterface() {
MapFunction<String, Boolean> mapInterface = new MapFunction<String, Boolean>() {
+ private static final long serialVersionUID = 1L;
+
@Override
public Boolean map(String record) throws Exception {
return null;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
index c417249..7dd4dea 100644
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
@@ -27,6 +27,7 @@ import org.junit.Test;
import java.io.Serializable;
+@SuppressWarnings("serial")
public class CoGroupITCase implements Serializable {
@Test
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
index f8d217e..3875bab 100644
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
@@ -27,6 +27,7 @@ import org.junit.Test;
import java.io.Serializable;
+@SuppressWarnings("serial")
public class CrossITCase implements Serializable {
@Test
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
index c775425..bb04336 100644
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
@@ -23,8 +23,6 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.JavaProgramTestBase;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -34,9 +32,11 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-@RunWith(Parameterized.class)
+@SuppressWarnings("serial")
public class FilterITCase extends JavaProgramTestBase {
+ private static final String EXPECTED_RESULT = "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n";
public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env) {
@@ -68,15 +68,7 @@ public class FilterITCase extends JavaProgramTestBase {
return env.fromCollection(data);
}
- private static int NUM_PROGRAMS = 1;
-
- private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
- private String expectedResult;
-
- public FilterITCase(Configuration config) {
- super(config);
- }
@Override
protected void preSubmit() throws Exception {
@@ -85,58 +77,18 @@ public class FilterITCase extends JavaProgramTestBase {
@Override
protected void testProgram() throws Exception {
- expectedResult = FilterProgs.runProgram(curProgId, resultPath);
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+ filter(value -> value.f2.contains("world"));
+ filterDs.writeAsCsv(resultPath);
+ env.execute();
}
@Override
protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(expectedResult, resultPath);
- }
-
- @Parameterized.Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
-
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
- for(int i=1; i <= NUM_PROGRAMS; i++) {
- Configuration config = new Configuration();
- config.setInteger("ProgramId", i);
- tConfigs.add(config);
- }
-
- return toParameterList(tConfigs);
+ compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
}
-
- private static class FilterProgs {
-
- public static String runProgram(int progId, String resultPath) throws Exception {
-
- switch(progId) {
- case 1: {
- /*
- * Test lambda filter
- * Functionality identical to org.apache.flink.test.javaApiOperators.FilterITCase test 3
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
- filter(value -> value.f2.contains("world"));
- filterDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "3,2,Hello world\n" +
- "4,3,Hello world, how are you?\n";
- }
- default:
- throw new IllegalArgumentException("Invalid program id");
- }
-
- }
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
index 043b4e8..431151e 100644
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
@@ -27,6 +27,7 @@ import org.junit.Test;
import java.io.Serializable;
+@SuppressWarnings("serial")
public class FlatJoinITCase implements Serializable {
@Test
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
index 55f507c..5cf7fc2 100644
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
@@ -26,6 +26,7 @@ import org.junit.Test;
import java.io.Serializable;
+@SuppressWarnings("serial")
public class FlatMapITCase implements Serializable {
@Test
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
index 494aff6..a86de1f 100644
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
@@ -27,6 +27,7 @@ import org.junit.Test;
import java.io.Serializable;
+@SuppressWarnings("serial")
public class GroupReduceITCase implements Serializable {
@Test
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
index 3f4f696..d44d116 100644
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
@@ -27,6 +27,7 @@ import org.junit.Test;
import java.io.Serializable;
+@SuppressWarnings("serial")
public class JoinITCase implements Serializable {
@Test
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/LambdaExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/LambdaExtractionTest.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/LambdaExtractionTest.java
new file mode 100644
index 0000000..4c8ee23
--- /dev/null
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/LambdaExtractionTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class LambdaExtractionTest {
+
+ @Test
+ public void testIdentifyLambdas() {
+ try {
+ MapFunction<?, ?> anonymousFromInterface = new MapFunction<String, Integer>() {
+ @Override
+ public Integer map(String value) { return Integer.parseInt(value); }
+ };
+
+ MapFunction<?, ?> anonymousFromClass = new RichMapFunction<String, Integer>() {
+ @Override
+ public Integer map(String value) { return Integer.parseInt(value); }
+ };
+
+ MapFunction<?, ?> fromProperClass = new StaticMapper();
+
+ MapFunction<?, ?> fromDerived = new ToTuple<Integer>() {
+ @Override
+ public Tuple2<Integer, Long> map(Integer value) {
+ return new Tuple2<Integer, Long>(value, 1L);
+ }
+ };
+
+ MapFunction<String, Integer> lambda = (str) -> Integer.parseInt(str);
+
+ assertFalse(FunctionUtils.isLambdaFunction(anonymousFromInterface));
+ assertFalse(FunctionUtils.isLambdaFunction(anonymousFromClass));
+ assertFalse(FunctionUtils.isLambdaFunction(fromProperClass));
+ assertFalse(FunctionUtils.isLambdaFunction(fromDerived));
+ assertTrue(FunctionUtils.isLambdaFunction(lambda));
+ assertTrue(FunctionUtils.isLambdaFunction(STATIC_LAMBDA));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ public static class StaticMapper implements MapFunction<String, Integer> {
+
+ @Override
+ public Integer map(String value) { return Integer.parseInt(value); }
+ }
+
+ public interface ToTuple<T> extends MapFunction<T, Tuple2<T, Long>> {
+
+ @Override
+ public Tuple2<T, Long> map(T value) throws Exception;
+ }
+
+ private static final MapFunction<String, Integer> STATIC_LAMBDA = (str) -> Integer.parseInt(str);
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
index 3af360b..5e9f732 100644
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
@@ -26,6 +26,7 @@ import org.junit.Test;
import java.io.Serializable;
+@SuppressWarnings("serial")
public class MapITCase implements Serializable{
@Test
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
index ab27fe4..1a34814 100644
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
@@ -36,9 +36,20 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-@RunWith(Parameterized.class)
+@SuppressWarnings("serial")
public class ReduceITCase extends JavaProgramTestBase {
+ private static final String EXPECTED_RESULT = "1,1,0,Hallo,1\n" +
+ "2,3,2,Hallo Welt wie,1\n" +
+ "2,2,1,Hallo Welt,2\n" +
+ "3,9,0,P-),2\n" +
+ "3,6,5,BCD,3\n" +
+ "4,17,0,P-),1\n" +
+ "4,17,0,P-),2\n" +
+ "5,11,10,GHI,1\n" +
+ "5,29,0,P-),2\n" +
+ "5,25,0,P-),3\n";
+
public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env) {
List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<Tuple5<Integer, Long, Integer, String, Long>>();
@@ -71,17 +82,9 @@ public class ReduceITCase extends JavaProgramTestBase {
return env.fromCollection(data, type);
}
-
- private static int NUM_PROGRAMS = 1;
-
- private int curProgId = config.getInteger("ProgramId", -1);
+
private String resultPath;
- private String expectedResult;
-
- public ReduceITCase(Configuration config) {
- super(config);
- }
-
+
@Override
protected void preSubmit() throws Exception {
resultPath = getTempDirPath("result");
@@ -89,72 +92,23 @@ public class ReduceITCase extends JavaProgramTestBase {
@Override
protected void testProgram() throws Exception {
- expectedResult = ReduceProgs.runProgram(curProgId, resultPath);
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = get5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds
+ .groupBy(4, 0)
+ .reduce((in1, in2) -> {
+ Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>();
+ out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4);
+ return out;
+ });
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
}
@Override
protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(expectedResult, resultPath);
- }
-
- @Parameterized.Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
-
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
- for(int i=1; i <= NUM_PROGRAMS; i++) {
- Configuration config = new Configuration();
- config.setInteger("ProgramId", i);
- tConfigs.add(config);
- }
-
- return toParameterList(tConfigs);
- }
-
- private static class ReduceProgs {
-
- public static String runProgram(int progId, String resultPath) throws Exception {
-
- switch(progId) {
- case 1: {
- /*
- * Test reduce with lambda
- * Functionality identical to org.apache.flink.test.javaApiOperators.ReduceITCase test 2
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = get5TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds
- .groupBy(4, 0)
- .reduce((in1, in2) -> {
- Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>();
- out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4);
- return out;
- });
-
- reduceDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1,0,Hallo,1\n" +
- "2,3,2,Hallo Welt wie,1\n" +
- "2,2,1,Hallo Welt,2\n" +
- "3,9,0,P-),2\n" +
- "3,6,5,BCD,3\n" +
- "4,17,0,P-),1\n" +
- "4,17,0,P-),2\n" +
- "5,11,10,GHI,1\n" +
- "5,29,0,P-),2\n" +
- "5,25,0,P-),3\n";
- }
- default:
- throw new IllegalArgumentException("Invalid program id");
- }
-
- }
-
+ compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
}
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
index 1db3524..9ff7181 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.io;
import java.io.IOException;
@@ -28,13 +27,12 @@ import org.apache.flink.util.Collector;
/**
* A {@link Collector} to update the solution set of a workset iteration.
* <p/>
- * The records are written to a {@link MutableHashTable} hash table to allow in-memory point updates.
+ * The records are written to a HashTable hash table to allow in-memory point updates.
* <p/>
* Assumption for fast updates: the build side iterator of the hash table is already positioned for the update. This
* is for example the case when a solution set update happens directly after a solution set join. If this assumption
* doesn't hold, use {@link SolutionSetUpdateOutputCollector}, which probes the hash table before updating.
- *
- * @see {SolutionSetUpdateOutputCollector}
+
*/
public class SolutionSetFastUpdateOutputCollector<T> implements Collector<T> {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
index 17670f1..89789c35 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
@@ -28,7 +28,7 @@ import org.apache.flink.util.Collector;
/**
* A {@link Collector} to update the solution set of a workset iteration.
* <p/>
- * The records are written to a {@link MutableHashTable} hash table to allow in-memory point updates.
+ * The records are written to a HashTable hash table to allow in-memory point updates.
* <p/>
* Records will only be collected, if there is a match after probing the hash table. If the build side iterator is
* already positioned for the update, use {@link SolutionSetFastUpdateOutputCollector} to the save re-probing.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
index 34cd232..636c492 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
@@ -23,7 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
-import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
@@ -54,7 +54,7 @@ import java.io.IOException;
/**
* The base class for all tasks able to participate in an iteration.
*/
-public abstract class AbstractIterativePactTask<S extends RichFunction, OT> extends RegularPactTask<S, OT>
+public abstract class AbstractIterativePactTask<S extends Function, OT> extends RegularPactTask<S, OT>
implements Terminable
{
private static final Log log = LogFactory.getLog(AbstractIterativePactTask.class);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
index 7a77cff..797bbb6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.task;
import java.io.IOException;
@@ -25,7 +24,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -57,7 +56,7 @@ import org.apache.flink.util.MutableObjectIterator;
* The head is responsible for coordinating an iteration and can run a
* {@link org.apache.flink.runtime.operators.PactDriver} inside. It will read
* the initial input and establish a {@link BlockingBackChannel} to the iteration's tail. After successfully processing
- * the input, it will send {@link EndOfSuperstepEvent} events to its outputs. It must also be connected to a
+ * the input, it will send EndOfSuperstep events to its outputs. It must also be connected to a
* synchronization task and after each superstep, it will wait
* until it receives an {@link AllWorkersDoneEvent} from the sync, which signals that all other heads have also finished
* their iteration. Starting with
@@ -75,7 +74,7 @@ import org.apache.flink.util.MutableObjectIterator;
* The type of the feed-back data set (bulk partial solution / workset). For bulk iterations, {@code Y} is the
* same as {@code X}
*/
-public class IterationHeadPactTask<X, Y, S extends RichFunction, OT> extends AbstractIterativePactTask<S, OT> {
+public class IterationHeadPactTask<X, Y, S extends Function, OT> extends AbstractIterativePactTask<S, OT> {
private static final Log log = LogFactory.getLog(IterationHeadPactTask.class);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
index 2a8325c..c23eae1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
@@ -16,14 +16,13 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.task;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.Function;
import org.apache.flink.runtime.io.network.api.BufferWriter;
import org.apache.flink.runtime.io.network.channels.EndOfSuperstepEvent;
import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
@@ -32,16 +31,16 @@ import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
import org.apache.flink.util.Collector;
/**
- * An intermediate iteration task, which runs a {@link PactDriver} inside.
+ * An intermediate iteration task, which runs a Driver}inside.
* <p/>
* It will propagate {@link EndOfSuperstepEvent}s and {@link TerminationEvent}s to it's connected tasks. Furthermore
* intermediate tasks can also update the iteration state, either the workset or the solution set.
* <p/>
* If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadPactTask} via
- * a {@link BlockingBackChannel} for the workset -XOR- a {@link MutableHashTable} for the solution set. In this case
+ * a {@link BlockingBackChannel} for the workset -XOR- a eHashTable for the solution set. In this case
* this task must be scheduled on the same instance as the head.
*/
-public class IterationIntermediatePactTask<S extends RichFunction, OT> extends AbstractIterativePactTask<S, OT> {
+public class IterationIntermediatePactTask<S extends Function, OT> extends AbstractIterativePactTask<S, OT> {
private static final Log log = LogFactory.getLog(IterationIntermediatePactTask.class);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
index a06ef5d..c44f443 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
@@ -44,9 +44,9 @@ import org.apache.flink.types.Value;
import com.google.common.base.Preconditions;
/**
- * The task responsible for synchronizing all iteration heads, implemented as an {@link AbstractOutputTask}. This task
+ * The task responsible for synchronizing all iteration heads, implemented as an output task. This task
* will never see any data.
- * In each superstep, it simply waits until it has receiced a {@link WorkerDoneEvent} from each head and will send back
+ * In each superstep, it simply waits until it has received a {@link WorkerDoneEvent} from each head and will send back
* an {@link AllWorkersDoneEvent} to signal that the next superstep can begin.
*/
public class IterationSynchronizationSinkTask extends AbstractInvokable implements Terminable {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
index 942e2f6..90d732c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
@@ -16,12 +16,11 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.task;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.Function;
import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
@@ -29,16 +28,16 @@ import org.apache.flink.runtime.operators.PactTaskContext;
import org.apache.flink.util.Collector;
/**
- * An iteration tail, which runs a {@link PactDriver} inside.
+ * An iteration tail, which runs a driver inside.
* <p/>
* If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadPactTask} via
- * a {@link BlockingBackChannel} for the workset -OR- a {@link MutableHashTable} for the solution set. Therefore this
+ * a BackChannel for the workset -OR- a HashTable for the solution set. Therefore this
* task must be scheduled on the same instance as the head. It's also possible for the tail to update *both* the workset
* and the solution set.
* <p/>
* If there is a separate solution set tail, the iteration head has to make sure to wait for it to finish.
*/
-public class IterationTailPactTask<S extends RichFunction, OT> extends AbstractIterativePactTask<S, OT>
+public class IterationTailPactTask<S extends Function, OT> extends AbstractIterativePactTask<S, OT>
implements PactTaskContext<S, OT> {
private static final Log log = LogFactory.getLog(IterationTailPactTask.class);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RuntimeExecutionContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RuntimeExecutionContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RuntimeExecutionContext.java
deleted file mode 100644
index 78cf1f5..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RuntimeExecutionContext.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators;
-
-import org.apache.flink.api.common.functions.ExecutionContext;
-import org.apache.flink.runtime.execution.Environment;
-
-
-/**
- * Default implementation of the {@link ExecutionContext} that delegates the calls to the nephele task
- * environment.
- *
- */
-public class RuntimeExecutionContext implements ExecutionContext
-{
- private final Environment env;
-
- public RuntimeExecutionContext(Environment env) {
- this.env = env;
- }
-
-
- @Override
- public String getTaskName() {
- return this.env.getTaskName();
- }
-
-
- @Override
- public int getNumberOfSubtasks() {
- return this.env.getCurrentNumberOfSubtasks();
- }
-
-
- @Override
- public int getSubtaskIndex() {
- return this.env.getIndexInSubtaskGroup() + 1;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
index aaad08c..99a59b1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
@@ -16,11 +16,8 @@
* limitations under the License.
*/
-
package org.apache.flink.test.operators;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.record.functions.CrossFunction;
import org.apache.flink.api.java.record.io.DelimitedInputFormat;
@@ -35,8 +32,6 @@ import org.apache.flink.test.util.RecordAPITestBase;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.Ignore;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@@ -47,17 +42,12 @@ import java.io.Serializable;
import java.util.Collection;
import java.util.LinkedList;
-/**
- */
@RunWith(Parameterized.class)
-//@Ignore("Test needs to be adapted to new cross signature")
public class CrossITCase extends RecordAPITestBase {
- private static final Log LOG = LogFactory.getLog(CrossITCase.class);
-
- String leftInPath = null;
- String rightInPath = null;
- String resultPath = null;
+ private String leftInPath = null;
+ private String rightInPath = null;
+ private String resultPath = null;
public CrossITCase(Configuration testConfig) {
super(testConfig);
@@ -112,8 +102,6 @@ public class CrossITCase extends RecordAPITestBase {
int key1 = Integer.parseInt(string.toString());
string = record2.getField(0, string);
int key2 = Integer.parseInt(string.toString());
-
- LOG.debug("Processing { [" + key1 + "," + val1 + "] , [" + key2 + "," + val2 + "] }");
string.setValue((key1 + key2 + 2) + "");
integer.setValue(val2 - val1 + 1);