You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/31 16:23:03 UTC
[3/4] flink git commit: [FLINK-7191] Activate checkstyle
flink-java/operators/translation
[FLINK-7191] Activate checkstyle flink-java/operators/translation
This closes #4334.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e975362
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e975362
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e975362
Branch: refs/heads/master
Commit: 8e975362312c727fd602429778bc1c3628b95619
Parents: 0c9c9fb
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Fri Jul 14 10:42:57 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jul 31 17:04:26 2017 +0200
----------------------------------------------------------------------
.../CombineToGroupCombineWrapper.java | 1 +
.../translation/KeyExtractingMapper.java | 23 +--
.../translation/KeyRemovingMapper.java | 9 +-
.../PlanBothUnwrappingCoGroupOperator.java | 22 +--
.../translation/PlanFilterOperator.java | 17 +-
.../PlanLeftUnwrappingCoGroupOperator.java | 15 +-
.../translation/PlanProjectOperator.java | 25 +--
.../PlanRightUnwrappingCoGroupOperator.java | 11 +-
.../PlanUnwrappingGroupCombineOperator.java | 29 ++--
.../PlanUnwrappingReduceGroupOperator.java | 47 +++---
.../PlanUnwrappingReduceOperator.java | 12 +-
...lanUnwrappingSortedGroupCombineOperator.java | 15 +-
...PlanUnwrappingSortedReduceGroupOperator.java | 23 ++-
.../RichCombineToGroupCombineWrapper.java | 3 +-
.../translation/Tuple3UnwrappingIterator.java | 4 +-
.../translation/Tuple3WrappingCollector.java | 3 +-
.../translation/TupleLeftUnwrappingJoiner.java | 8 +
.../translation/TupleRightUnwrappingJoiner.java | 8 +
.../translation/TupleUnwrappingIterator.java | 14 +-
.../translation/TupleUnwrappingJoiner.java | 8 +
.../translation/TupleWrappingCollector.java | 13 +-
.../translation/TwoKeyExtractingMapper.java | 10 +-
.../operators/translation/WrappingFunction.java | 13 +-
.../translation/AggregateTranslationTest.java | 35 ++--
.../translation/CoGroupSortTranslationTest.java | 53 +++---
.../DeltaIterationTranslationTest.java | 160 ++++++++++---------
.../translation/DistinctTranslationTest.java | 15 +-
.../translation/ReduceTranslationTests.java | 98 ++++++------
tools/maven/suppressions-java.xml | 8 -
29 files changed, 375 insertions(+), 327 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/CombineToGroupCombineWrapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/CombineToGroupCombineWrapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/CombineToGroupCombineWrapper.java
index 408d4b3..f574218 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/CombineToGroupCombineWrapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/CombineToGroupCombineWrapper.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.api.java.operators.translation;
import org.apache.flink.annotation.Internal;
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
index f35b950..1f0cc1d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
@@ -19,34 +19,37 @@
package org.apache.flink.api.java.operators.translation;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
+/**
+ * Mapper that extracts keys.
+ * @param <T> type of value
+ * @param <K> type of key
+ */
@Internal
@ForwardedFields("*->1")
public final class KeyExtractingMapper<T, K> extends RichMapFunction<T, Tuple2<K, T>> {
-
+
private static final long serialVersionUID = 1L;
-
+
private final KeySelector<T, K> keySelector;
-
+
private final Tuple2<K, T> tuple = new Tuple2<K, T>();
-
-
+
public KeyExtractingMapper(KeySelector<T, K> keySelector) {
this.keySelector = keySelector;
}
-
-
+
@Override
public Tuple2<K, T> map(T value) throws Exception {
-
+
K key = keySelector.getKey(value);
tuple.f0 = key;
tuple.f1 = value;
-
+
return tuple;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
index 5f0de32..920f893 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
@@ -23,12 +23,17 @@ import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.tuple.Tuple2;
+/**
+ * Mapper that removes keys.
+ * @param <T> type of values
+ * @param <K> type of keys
+ */
@Internal
@ForwardedFields("1->*")
public final class KeyRemovingMapper<T, K> extends RichMapFunction<Tuple2<K, T>, T> {
-
+
private static final long serialVersionUID = 1L;
-
+
@Override
public T map(Tuple2<K, T> value) {
return value.f1;
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java
index 1814329..6ccf0f3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java
@@ -21,16 +21,18 @@ package org.apache.flink.api.java.operators.translation;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
+/**
+ * A co group operator that applies the operation only on the unwrapped values.
+ */
@Internal
public class PlanBothUnwrappingCoGroupOperator<I1, I2, OUT, K>
- extends CoGroupOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>>
-{
+ extends CoGroupOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>> {
public PlanBothUnwrappingCoGroupOperator(
CoGroupFunction<I1, I2, OUT> udf,
@@ -52,23 +54,21 @@ public class PlanBothUnwrappingCoGroupOperator<I1, I2, OUT, K>
name);
}
- public static final class TupleBothUnwrappingCoGrouper<I1, I2, OUT, K>
+ private static final class TupleBothUnwrappingCoGrouper<I1, I2, OUT, K>
extends WrappingFunction<CoGroupFunction<I1, I2, OUT>>
- implements CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>
- {
+ implements CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT> {
private static final long serialVersionUID = 1L;
-
+
private final TupleUnwrappingIterator<I1, K> iter1;
private final TupleUnwrappingIterator<I2, K> iter2;
-
+
private TupleBothUnwrappingCoGrouper(CoGroupFunction<I1, I2, OUT> wrapped) {
super(wrapped);
-
+
this.iter1 = new TupleUnwrappingIterator<I1, K>();
this.iter2 = new TupleUnwrappingIterator<I2, K>();
}
-
@Override
public void coGroup(
Iterable<Tuple2<K, I1>> records1,
@@ -79,6 +79,6 @@ public class PlanBothUnwrappingCoGroupOperator<I1, I2, OUT, K>
iter2.set(records2.iterator());
this.wrappedFunction.coGroup(iter1, iter2, out);
}
-
+
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
index c93191f..ecf1aac 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
@@ -27,26 +27,33 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.util.Collector;
+/**
+ * @see FilterOperatorBase
+ * @param <T>
+ */
@Internal
@ForwardedFields("*")
public class PlanFilterOperator<T> extends FilterOperatorBase<T, FlatMapFunction<T, T>> {
-
+
public PlanFilterOperator(FilterFunction<T> udf, String name, TypeInformation<T> type) {
super(new FlatMapFilter<T>(udf), new UnaryOperatorInformation<T, T>(type, type), name);
}
+ /**
+ * @see FlatMapFunction
+ * @param <T>
+ */
public static final class FlatMapFilter<T> extends WrappingFunction<FilterFunction<T>>
- implements FlatMapFunction<T, T>
- {
+ implements FlatMapFunction<T, T> {
private static final long serialVersionUID = 1L;
-
+
private FlatMapFilter(FilterFunction<T> wrapped) {
super(wrapped);
}
@Override
- public final void flatMap(T value, Collector<T> out) throws Exception {
+ public void flatMap(T value, Collector<T> out) throws Exception {
if (this.wrappedFunction.filter(value)) {
out.collect(value);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java
index 78840ce..b2a6937 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java
@@ -21,16 +21,18 @@ package org.apache.flink.api.java.operators.translation;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
+/**
+ * A co group operator that applies the operation only on the unwrapped values on the left.
+ */
@Internal
public class PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, K>
- extends CoGroupOperatorBase<Tuple2<K, I1>, I2, OUT, CoGroupFunction<Tuple2<K, I1>, I2, OUT>>
-{
+ extends CoGroupOperatorBase<Tuple2<K, I1>, I2, OUT, CoGroupFunction<Tuple2<K, I1>, I2, OUT>> {
public PlanLeftUnwrappingCoGroupOperator(
CoGroupFunction<I1, I2, OUT> udf,
@@ -52,21 +54,20 @@ public class PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, K>
name);
}
- public static final class TupleLeftUnwrappingCoGrouper<I1, I2, OUT, K>
+ private static final class TupleLeftUnwrappingCoGrouper<I1, I2, OUT, K>
extends WrappingFunction<CoGroupFunction<I1, I2, OUT>>
implements CoGroupFunction<Tuple2<K, I1>, I2, OUT> {
private static final long serialVersionUID = 1L;
-
+
private final TupleUnwrappingIterator<I1, K> iter1;
private TupleLeftUnwrappingCoGrouper(CoGroupFunction<I1, I2, OUT> wrapped) {
super(wrapped);
-
+
this.iter1 = new TupleUnwrappingIterator<I1, K>();
}
-
@Override
public void coGroup(
Iterable<Tuple2<K, I1>> records1,
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
index fe981a5..3960807 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
@@ -27,30 +27,33 @@ import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
+/**
+ * A map operator that retains a subset of fields from incoming tuples.
+ *
+ * @param <T> Input tuple type
+ * @param <R> Output tuple type
+ */
@Internal
public class PlanProjectOperator<T, R extends Tuple> extends MapOperatorBase<T, R, MapFunction<T, R>> {
public PlanProjectOperator(int[] fields, String name,
TypeInformation<T> inType, TypeInformation<R> outType,
- ExecutionConfig executionConfig)
- {
+ ExecutionConfig executionConfig) {
super(PlanProjectOperator.<T, R, Tuple>createTypedProjector(fields), new UnaryOperatorInformation<T, R>(inType, outType), name);
}
-
+
@SuppressWarnings("unchecked")
private static <T, R extends Tuple, X extends Tuple> MapFunction<T, R> createTypedProjector(int[] fields) {
return (MapFunction<T, R>) new MapProjector<X, R>(fields);
}
-
-
- public static final class MapProjector<T extends Tuple, R extends Tuple>
- extends AbstractRichFunction implements MapFunction<T, R>
- {
+
+ private static final class MapProjector<T extends Tuple, R extends Tuple>
+ extends AbstractRichFunction implements MapFunction<T, R> {
private static final long serialVersionUID = 1L;
-
+
private final int[] fields;
private final Tuple outTuple;
-
+
private MapProjector(int[] fields) {
this.fields = fields;
try {
@@ -69,7 +72,7 @@ public class PlanProjectOperator<T, R extends Tuple> extends MapOperatorBase<T,
for (int i = 0; i < fields.length; i++) {
outTuple.setField(inTuple.getField(fields[i]), i);
}
-
+
return (R) outTuple;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
index faeca4e..f86deb7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
@@ -21,16 +21,18 @@ package org.apache.flink.api.java.operators.translation;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
+/**
+ * A co group operator that applies the operation only on the unwrapped values on the right.
+ */
@Internal
public class PlanRightUnwrappingCoGroupOperator<I1, I2, OUT, K>
- extends CoGroupOperatorBase<I1, Tuple2<K, I2>, OUT, CoGroupFunction<I1, Tuple2<K, I2>, OUT>>
-{
+ extends CoGroupOperatorBase<I1, Tuple2<K, I2>, OUT, CoGroupFunction<I1, Tuple2<K, I2>, OUT>> {
public PlanRightUnwrappingCoGroupOperator(
CoGroupFunction<I1, I2, OUT> udf,
@@ -52,7 +54,7 @@ public class PlanRightUnwrappingCoGroupOperator<I1, I2, OUT, K>
name);
}
- public static final class TupleRightUnwrappingCoGrouper<I1, I2, OUT, K>
+ private static final class TupleRightUnwrappingCoGrouper<I1, I2, OUT, K>
extends WrappingFunction<CoGroupFunction<I1, I2, OUT>>
implements CoGroupFunction<I1, Tuple2<K, I2>, OUT> {
@@ -66,7 +68,6 @@ public class PlanRightUnwrappingCoGroupOperator<I1, I2, OUT, K>
this.iter2 = new TupleUnwrappingIterator<I2, K>();
}
-
@Override
public void coGroup(
Iterable<I1> records1,
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
index e9feb61..6e6f226 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
@@ -20,10 +20,10 @@ package org.apache.flink.api.java.operators.translation;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
@@ -35,35 +35,32 @@ import org.apache.flink.util.Collector;
public class PlanUnwrappingGroupCombineOperator<IN, OUT, K> extends GroupCombineOperatorBase<Tuple2<K, IN>, OUT, GroupCombineFunction<Tuple2<K, IN>, OUT>> {
public PlanUnwrappingGroupCombineOperator(GroupCombineFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K> key, String name,
- TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> typeInfoWithKey)
- {
+ TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> typeInfoWithKey) {
super(new TupleUnwrappingGroupCombiner<IN, OUT, K>(udf),
new UnaryOperatorInformation<Tuple2<K, IN>, OUT>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name);
-
+
}
-
+
// --------------------------------------------------------------------------------------------
-
- public static final class TupleUnwrappingGroupCombiner<IN, OUT, K> extends WrappingFunction<GroupCombineFunction<IN, OUT>>
- implements GroupCombineFunction<Tuple2<K, IN>, OUT>
- {
-
+
+ private static final class TupleUnwrappingGroupCombiner<IN, OUT, K> extends WrappingFunction<GroupCombineFunction<IN, OUT>>
+ implements GroupCombineFunction<Tuple2<K, IN>, OUT> {
+
private static final long serialVersionUID = 1L;
-
- private final TupleUnwrappingIterator<IN, K> iter;
-
+
+ private final TupleUnwrappingIterator<IN, K> iter;
+
private TupleUnwrappingGroupCombiner(GroupCombineFunction<IN, OUT> wrapped) {
super(wrapped);
this.iter = new TupleUnwrappingIterator<IN, K>();
}
-
-
+
@Override
public void combine(Iterable<Tuple2<K, IN>> values, Collector<OUT> out) throws Exception {
iter.set(values.iterator());
this.wrappedFunction.combine(iter, out);
}
-
+
@Override
public String toString() {
return this.wrappedFunction.toString();
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
index 8568659..33c527d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
@@ -21,10 +21,10 @@ package org.apache.flink.api.java.operators.translation;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
@@ -33,7 +33,7 @@ import org.apache.flink.util.Collector;
* on the unwrapped values.
*/
@Internal
-public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOperatorBase<Tuple2<K, IN>, OUT, GroupReduceFunction<Tuple2<K, IN>,OUT>> {
+public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOperatorBase<Tuple2<K, IN>, OUT, GroupReduceFunction<Tuple2<K, IN>, OUT>> {
public PlanUnwrappingReduceGroupOperator(
GroupReduceFunction<IN, OUT> udf,
@@ -41,32 +41,30 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
String name,
TypeInformation<OUT> outType,
TypeInformation<Tuple2<K, IN>> typeInfoWithKey,
- boolean combinable)
- {
+ boolean combinable) {
super(
combinable ?
new TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K>(udf) :
new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf),
new UnaryOperatorInformation<>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name);
-
+
super.setCombinable(combinable);
}
-
+
// --------------------------------------------------------------------------------------------
-
- public static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
- implements GroupReduceFunction<Tuple2<K, IN>, OUT>, GroupCombineFunction<Tuple2<K, IN>, Tuple2<K, IN>>
- {
+
+ private static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
+ implements GroupReduceFunction<Tuple2<K, IN>, OUT>, GroupCombineFunction<Tuple2<K, IN>, Tuple2<K, IN>> {
private static final long serialVersionUID = 1L;
-
+
private TupleUnwrappingIterator<IN, K> iter;
private TupleWrappingCollector<IN, K> coll;
private TupleUnwrappingGroupCombinableGroupReducer(GroupReduceFunction<IN, OUT> wrapped) {
super(wrapped);
- if(!GroupCombineFunction.class.isAssignableFrom(wrappedFunction.getClass())) {
+ if (!GroupCombineFunction.class.isAssignableFrom(wrappedFunction.getClass())) {
throw new IllegalArgumentException("Wrapped reduce function does not implement the GroupCombineFunction interface.");
}
@@ -74,7 +72,6 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
this.coll = new TupleWrappingCollector<>(this.iter);
}
-
@Override
public void reduce(Iterable<Tuple2<K, IN>> values, Collector<OUT> out) throws Exception {
iter.set(values.iterator());
@@ -87,35 +84,33 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
iter.set(values.iterator());
coll.set(out);
- ((GroupCombineFunction<IN, IN>)this.wrappedFunction).combine(iter, coll);
+ ((GroupCombineFunction<IN, IN>) this.wrappedFunction).combine(iter, coll);
}
-
+
@Override
public String toString() {
return this.wrappedFunction.toString();
}
}
-
- public static final class TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
- implements GroupReduceFunction<Tuple2<K, IN>, OUT>
- {
-
+
+ private static final class TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
+ implements GroupReduceFunction<Tuple2<K, IN>, OUT> {
+
private static final long serialVersionUID = 1L;
-
- private final TupleUnwrappingIterator<IN, K> iter;
-
+
+ private final TupleUnwrappingIterator<IN, K> iter;
+
private TupleUnwrappingNonCombinableGroupReducer(GroupReduceFunction<IN, OUT> wrapped) {
super(wrapped);
this.iter = new TupleUnwrappingIterator<>();
}
-
-
+
@Override
public void reduce(Iterable<Tuple2<K, IN>> values, Collector<OUT> out) throws Exception {
iter.set(values.iterator());
this.wrappedFunction.reduce(iter, out);
}
-
+
@Override
public String toString() {
return this.wrappedFunction.toString();
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
index 72dc41a..b2e614e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.java.operators.translation;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
-
/**
* A reduce operator that takes 2-tuples (key-value pairs), and applies the reduce operation only
* on the unwrapped values.
@@ -35,16 +34,13 @@ import org.apache.flink.api.java.tuple.Tuple2;
public class PlanUnwrappingReduceOperator<T, K> extends ReduceOperatorBase<Tuple2<K, T>, ReduceFunction<Tuple2<K, T>>> {
public PlanUnwrappingReduceOperator(ReduceFunction<T> udf, Keys.SelectorFunctionKeys<T, K> key, String name,
- TypeInformation<T> type, TypeInformation<Tuple2<K, T>> typeInfoWithKey)
- {
+ TypeInformation<T> type, TypeInformation<Tuple2<K, T>> typeInfoWithKey) {
super(new ReduceWrapper<T, K>(udf), new UnaryOperatorInformation<Tuple2<K, T>, Tuple2<K, T>>(typeInfoWithKey, typeInfoWithKey), key.computeLogicalKeyPositions(), name);
}
- public static final class ReduceWrapper<T, K> extends WrappingFunction<ReduceFunction<T>>
- implements ReduceFunction<Tuple2<K, T>>
- {
+ private static final class ReduceWrapper<T, K> extends WrappingFunction<ReduceFunction<T>>
+ implements ReduceFunction<Tuple2<K, T>> {
private static final long serialVersionUID = 1L;
-
private ReduceWrapper(ReduceFunction<T> wrapped) {
super(wrapped);
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
index f65f169..a2a9010 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
@@ -20,10 +20,10 @@ package org.apache.flink.api.java.operators.translation;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
@@ -32,21 +32,19 @@ import org.apache.flink.util.Collector;
* operation only on the unwrapped values.
*/
@Internal
-public class PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> extends GroupCombineOperatorBase<Tuple3<K1, K2, IN>, OUT, GroupCombineFunction<Tuple3<K1, K2, IN>,OUT>> {
+public class PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> extends GroupCombineOperatorBase<Tuple3<K1, K2, IN>, OUT, GroupCombineFunction<Tuple3<K1, K2, IN>, OUT>> {
public PlanUnwrappingSortedGroupCombineOperator(GroupCombineFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K1> groupingKey, Keys.SelectorFunctionKeys<IN, K2> sortingKey, String name,
- TypeInformation<OUT> outType, TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey)
- {
+ TypeInformation<OUT> outType, TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey) {
super(new TupleUnwrappingGroupReducer<IN, OUT, K1, K2>(udf),
new UnaryOperatorInformation<Tuple3<K1, K2, IN>, OUT>(typeInfoWithKey, outType),
- groupingKey.computeLogicalKeyPositions(),
+ groupingKey.computeLogicalKeyPositions(),
name);
}
- public static final class TupleUnwrappingGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupCombineFunction<IN, OUT>>
- implements GroupCombineFunction<Tuple3<K1, K2, IN>, OUT>
- {
+ private static final class TupleUnwrappingGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupCombineFunction<IN, OUT>>
+ implements GroupCombineFunction<Tuple3<K1, K2, IN>, OUT> {
private static final long serialVersionUID = 1L;
@@ -57,7 +55,6 @@ public class PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> extends G
this.iter = new Tuple3UnwrappingIterator<IN, K1, K2>();
}
-
@Override
public void combine(Iterable<Tuple3<K1, K2, IN>> values, Collector<OUT> out) throws Exception {
iter.set(values.iterator());
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
index 8080477..7f81fee 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
@@ -21,10 +21,10 @@ package org.apache.flink.api.java.operators.translation;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
@@ -33,7 +33,7 @@ import org.apache.flink.util.Collector;
* operation only on the unwrapped values.
*/
@Internal
-public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends GroupReduceOperatorBase<Tuple3<K1, K2, IN>, OUT, GroupReduceFunction<Tuple3<K1, K2, IN>,OUT>> {
+public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends GroupReduceOperatorBase<Tuple3<K1, K2, IN>, OUT, GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>> {
public PlanUnwrappingSortedReduceGroupOperator(
GroupReduceFunction<IN, OUT> udf,
@@ -42,8 +42,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
String name,
TypeInformation<OUT> outType,
TypeInformation<Tuple3<K1, K2, IN>>
- typeInfoWithKey, boolean combinable)
- {
+ typeInfoWithKey, boolean combinable) {
super(
combinable ?
new TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2>(udf) :
@@ -55,9 +54,8 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
// --------------------------------------------------------------------------------------------
- public static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
- implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>, GroupCombineFunction<Tuple3<K1, K2, IN>, Tuple3<K1, K2, IN>>
- {
+ private static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
+ implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>, GroupCombineFunction<Tuple3<K1, K2, IN>, Tuple3<K1, K2, IN>> {
private static final long serialVersionUID = 1L;
@@ -67,7 +65,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
private TupleUnwrappingGroupCombinableGroupReducer(GroupReduceFunction<IN, OUT> wrapped) {
super(wrapped);
- if(!GroupCombineFunction.class.isAssignableFrom(wrappedFunction.getClass())) {
+ if (!GroupCombineFunction.class.isAssignableFrom(wrappedFunction.getClass())) {
throw new IllegalArgumentException("Wrapped reduce function does not implement the GroupCombineFunction interface.");
}
@@ -75,7 +73,6 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
this.coll = new Tuple3WrappingCollector<>(this.iter);
}
-
@Override
public void reduce(Iterable<Tuple3<K1, K2, IN>> values, Collector<OUT> out) throws Exception {
iter.set(values.iterator());
@@ -87,7 +84,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
public void combine(Iterable<Tuple3<K1, K2, IN>> values, Collector<Tuple3<K1, K2, IN>> out) throws Exception {
iter.set(values.iterator());
coll.set(out);
- ((GroupCombineFunction<IN, IN>)this.wrappedFunction).combine(iter, coll);
+ ((GroupCombineFunction<IN, IN>) this.wrappedFunction).combine(iter, coll);
}
@Override
@@ -96,9 +93,8 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
}
}
- public static final class TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
- implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>
- {
+ private static final class TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
+ implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT> {
private static final long serialVersionUID = 1L;
@@ -109,7 +105,6 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
this.iter = new Tuple3UnwrappingIterator<>();
}
-
@Override
public void reduce(Iterable<Tuple3<K1, K2, IN>> values, Collector<OUT> out) throws Exception {
iter.set(values.iterator());
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
index d8c54d6..3f6463a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.api.java.operators.translation;
import org.apache.flink.api.common.functions.CombineFunction;
@@ -31,7 +32,7 @@ import org.apache.flink.util.Preconditions;
* and makes it look like a function that implements {@link GroupCombineFunction} and {@link GroupReduceFunction} to the runtime.
*/
public class RichCombineToGroupCombineWrapper<IN, OUT, F extends RichGroupReduceFunction<IN, OUT> & CombineFunction<IN, IN>>
- extends RichGroupCombineFunction<IN,IN> implements GroupReduceFunction<IN, OUT> {
+ extends RichGroupCombineFunction<IN, IN> implements GroupReduceFunction<IN, OUT> {
private final F wrappedFunction;
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java
index fd3b4f6..b697ac9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java
@@ -18,12 +18,12 @@
package org.apache.flink.api.java.operators.translation;
-import java.util.Iterator;
-
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.TraversableOnceException;
+import java.util.Iterator;
+
/**
* An iterator that reads 3-tuples (groupKey, sortKey, value) and returns only the values (third field).
* The iterator also tracks the groupKeys, as the triples flow though it.
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3WrappingCollector.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3WrappingCollector.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3WrappingCollector.java
index 189dcdb..57b6bc7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3WrappingCollector.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3WrappingCollector.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
/**
- * Needed to wrap tuples to {@code Tuple3<groupKey, sortKey, value>} for combine method of group reduce with key selector sorting
+ * Needed to wrap tuples to {@code Tuple3<groupKey, sortKey, value>} for combine method of group reduce with key selector sorting.
*/
@Internal
public class Tuple3WrappingCollector<IN, K1, K2> implements Collector<IN>, java.io.Serializable {
@@ -35,7 +35,6 @@ public class Tuple3WrappingCollector<IN, K1, K2> implements Collector<IN>, java.
private Collector<Tuple3<K1, K2, IN>> wrappedCollector;
-
public Tuple3WrappingCollector(Tuple3UnwrappingIterator<IN, K1, K2> tui) {
this.tui = tui;
this.outTuple = new Tuple3<K1, K2, IN>();
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java
index 2ff73ef..e39ec47 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java
@@ -23,6 +23,14 @@ import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
+/**
+ * Joiner that unwraps values from the left set before applying the join operation.
+ *
+ * @param <I1> type of values in the left set
+ * @param <I2> type of values in the right set
+ * @param <OUT> type of resulting values
+ * @param <K> type of key
+ */
@Internal
public final class TupleLeftUnwrappingJoiner<I1, I2, OUT, K>
extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java
index c9b9c27..847acd7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java
@@ -23,6 +23,14 @@ import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
+/**
+ * Joiner that unwraps values from the right set before applying the join operation.
+ *
+ * @param <I1> type of values in the left set
+ * @param <I2> type of values in the right set
+ * @param <OUT> type of resulting values
+ * @param <K> type of key
+ */
@Internal
public final class TupleRightUnwrappingJoiner<I1, I2, OUT, K>
extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
index 5dbe266..16ebef8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
@@ -18,12 +18,12 @@
package org.apache.flink.api.java.operators.translation;
-import java.util.Iterator;
-
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.TraversableOnceException;
+import java.util.Iterator;
+
/**
* An iterator that reads 2-tuples (key value pairs) and returns only the values (second field).
* The iterator also tracks the keys, as the pairs flow though it.
@@ -32,16 +32,16 @@ import org.apache.flink.util.TraversableOnceException;
public class TupleUnwrappingIterator<T, K> implements Iterator<T>, Iterable<T>, java.io.Serializable {
private static final long serialVersionUID = 1L;
-
- private K lastKey;
+
+ private K lastKey;
private Iterator<Tuple2<K, T>> iterator;
private boolean iteratorAvailable;
-
+
public void set(Iterator<Tuple2<K, T>> iterator) {
this.iterator = iterator;
this.iteratorAvailable = true;
}
-
+
public K getLastKey() {
return lastKey;
}
@@ -53,7 +53,7 @@ public class TupleUnwrappingIterator<T, K> implements Iterator<T>, Iterable<T>,
@Override
public T next() {
- Tuple2<K, T> t = iterator.next();
+ Tuple2<K, T> t = iterator.next();
this.lastKey = t.f0;
return t.f1;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java
index e0ee3b3..1e56dac 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java
@@ -23,6 +23,14 @@ import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
+/**
+ * Joiner that unwraps both values before applying the join operation.
+ *
+ * @param <I1> type of values in the left set
+ * @param <I2> type of values in the right set
+ * @param <OUT> type of resulting values
+ * @param <K> type of key
+ */
@Internal
public final class TupleUnwrappingJoiner<I1, I2, OUT, K>
extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleWrappingCollector.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleWrappingCollector.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleWrappingCollector.java
index 4581bf2..7369804 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleWrappingCollector.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleWrappingCollector.java
@@ -23,28 +23,27 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
- * Needed to wrap tuples to {@code Tuple2<key, value>} pairs for combine method of group reduce with key selector function
+ * Needed to wrap tuples to {@code Tuple2<key, value>} pairs for combine method of group reduce with key selector function.
*/
@Internal
public class TupleWrappingCollector<IN, K> implements Collector<IN>, java.io.Serializable {
-
+
private static final long serialVersionUID = 1L;
private final TupleUnwrappingIterator<IN, K> tui;
private final Tuple2<K, IN> outTuple;
-
+
private Collector<Tuple2<K, IN>> wrappedCollector;
-
-
+
public TupleWrappingCollector(TupleUnwrappingIterator<IN, K> tui) {
this.tui = tui;
this.outTuple = new Tuple2<K, IN>();
}
-
+
public void set(Collector<Tuple2<K, IN>> wrappedCollector) {
this.wrappedCollector = wrappedCollector;
}
-
+
@Override
public void close() {
this.wrappedCollector.close();
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
index 7d5e39b..9449237 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
@@ -19,11 +19,17 @@
package org.apache.flink.api.java.operators.translation;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
+/**
+ * Mapper that extracts two keys of a value.
+ * @param <T> type of the values
+ * @param <K1> type of the first key
+ * @param <K2> type of the second key
+ */
@Internal
@ForwardedFields("*->2")
public final class TwoKeyExtractingMapper<T, K1, K2> extends RichMapFunction<T, Tuple3<K1, K2, T>> {
@@ -36,13 +42,11 @@ public final class TwoKeyExtractingMapper<T, K1, K2> extends RichMapFunction<T,
private final Tuple3<K1, K2, T> tuple = new Tuple3<K1, K2, T>();
-
public TwoKeyExtractingMapper(KeySelector<T, K1> keySelector1, KeySelector<T, K2> keySelector2) {
this.keySelector1 = keySelector1;
this.keySelector2 = keySelector2;
}
-
@Override
public Tuple3<K1, K2, T> map(T value) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
index 9851c42..6c52ace 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
@@ -25,9 +25,13 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.configuration.Configuration;
+/**
+ * Wrapper around {@link Function}.
+ * @param <T>
+ */
@Internal
public abstract class WrappingFunction<T extends Function> extends AbstractRichFunction {
-
+
private static final long serialVersionUID = 1L;
protected T wrappedFunction;
@@ -36,21 +40,20 @@ public abstract class WrappingFunction<T extends Function> extends AbstractRichF
this.wrappedFunction = wrappedFunction;
}
-
@Override
public void open(Configuration parameters) throws Exception {
FunctionUtils.openFunction(this.wrappedFunction, parameters);
}
-
+
@Override
public void close() throws Exception {
FunctionUtils.closeFunction(this.wrappedFunction);
}
-
+
@Override
public void setRuntimeContext(RuntimeContext t) {
super.setRuntimeContext(t);
-
+
FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, t);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
index 0ce79e3..2f74288 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
@@ -16,25 +16,28 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.operators.translation;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.GenericDataSourceBase;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.types.StringValue;
+
import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for translation of aggregations.
+ */
public class AggregateTranslationTest {
@Test
@@ -42,26 +45,26 @@ public class AggregateTranslationTest {
try {
final int parallelism = 8;
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
-
+
@SuppressWarnings("unchecked")
- DataSet<Tuple3<Double, StringValue, Long>> initialData =
+ DataSet<Tuple3<Double, StringValue, Long>> initialData =
env.fromElements(new Tuple3<Double, StringValue, Long>(3.141592, new StringValue("foobar"), Long.valueOf(77)));
-
+
initialData.groupBy(0).aggregate(Aggregations.MIN, 1).and(Aggregations.SUM, 2).output(new DiscardingOutputFormat<Tuple3<Double, StringValue, Long>>());
-
+
Plan p = env.createProgramPlan();
-
+
GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
-
+
GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
-
+
// check keys
assertEquals(1, reducer.getKeyColumns(0).length);
assertEquals(0, reducer.getKeyColumns(0)[0]);
-
+
assertEquals(-1, reducer.getParallelism());
assertTrue(reducer.isCombinable());
-
+
assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
index 887173d..9c67e60 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
@@ -18,8 +18,6 @@
package org.apache.flink.api.java.operators.translation;
-import static org.junit.Assert.*;
-
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
@@ -31,8 +29,16 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
+
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for translation of co-group sort.
+ */
@SuppressWarnings({"serial", "unchecked"})
public class CoGroupSortTranslationTest implements java.io.Serializable {
@@ -40,35 +46,35 @@ public class CoGroupSortTranslationTest implements java.io.Serializable {
public void testGroupSortTuples() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
-
+
input1.coGroup(input2)
.where(1).equalTo(2)
.sortFirstGroup(0, Order.DESCENDING)
.sortSecondGroup(1, Order.ASCENDING).sortSecondGroup(0, Order.DESCENDING)
-
+
.with(new CoGroupFunction<Tuple2<Long, Long>, Tuple3<Long, Long, Long>, Long>() {
@Override
public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<Tuple3<Long, Long, Long>> second,
Collector<Long> out) {}
})
-
+
.output(new DiscardingOutputFormat<Long>());
-
+
Plan p = env.createProgramPlan();
-
+
GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
CoGroupOperatorBase<?, ?, ?, ?> coGroup = (CoGroupOperatorBase<?, ?, ?, ?>) sink.getInput();
-
+
assertNotNull(coGroup.getGroupOrderForInputOne());
assertNotNull(coGroup.getGroupOrderForInputTwo());
-
+
assertEquals(1, coGroup.getGroupOrderForInputOne().getNumberOfFields());
assertEquals(0, coGroup.getGroupOrderForInputOne().getFieldNumber(0).intValue());
assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputOne().getOrder(0));
-
+
assertEquals(2, coGroup.getGroupOrderForInputTwo().getNumberOfFields());
assertEquals(1, coGroup.getGroupOrderForInputTwo().getFieldNumber(0).intValue());
assertEquals(0, coGroup.getGroupOrderForInputTwo().getFieldNumber(1).intValue());
@@ -80,39 +86,39 @@ public class CoGroupSortTranslationTest implements java.io.Serializable {
fail(e.getMessage());
}
}
-
+
@Test
public void testSortTuplesAndPojos() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
DataSet<TestPoJo> input2 = env.fromElements(new TestPoJo());
-
+
input1.coGroup(input2)
.where(1).equalTo("b")
.sortFirstGroup(0, Order.DESCENDING)
.sortSecondGroup("c", Order.ASCENDING).sortSecondGroup("a", Order.DESCENDING)
-
+
.with(new CoGroupFunction<Tuple2<Long, Long>, TestPoJo, Long>() {
@Override
public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<TestPoJo> second, Collector<Long> out) {}
})
-
+
.output(new DiscardingOutputFormat<Long>());
-
+
Plan p = env.createProgramPlan();
-
+
GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
CoGroupOperatorBase<?, ?, ?, ?> coGroup = (CoGroupOperatorBase<?, ?, ?, ?>) sink.getInput();
-
+
assertNotNull(coGroup.getGroupOrderForInputOne());
assertNotNull(coGroup.getGroupOrderForInputTwo());
-
+
assertEquals(1, coGroup.getGroupOrderForInputOne().getNumberOfFields());
assertEquals(0, coGroup.getGroupOrderForInputOne().getFieldNumber(0).intValue());
assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputOne().getOrder(0));
-
+
assertEquals(2, coGroup.getGroupOrderForInputTwo().getNumberOfFields());
assertEquals(2, coGroup.getGroupOrderForInputTwo().getFieldNumber(0).intValue());
assertEquals(0, coGroup.getGroupOrderForInputTwo().getFieldNumber(1).intValue());
@@ -124,7 +130,10 @@ public class CoGroupSortTranslationTest implements java.io.Serializable {
fail(e.getMessage());
}
}
-
+
+ /**
+ * Sample test pojo.
+ */
public static class TestPoJo {
public long a;
public long b;
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
index fd60bc6..e4cb8c4 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
@@ -16,93 +16,95 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.operators.translation;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.Iterator;
-
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.common.functions.RichJoinFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.base.DeltaIterationBase;
import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.common.functions.RichCoGroupFunction;
-import org.apache.flink.api.common.functions.RichJoinFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
+
import org.junit.Test;
+import java.util.Iterator;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for translation of delta iterations.
+ */
@SuppressWarnings("serial")
public class DeltaIterationTranslationTest implements java.io.Serializable {
@Test
public void testCorrectTranslation() {
try {
- final String JOB_NAME = "Test JobName";
- final String ITERATION_NAME = "Test Name";
-
- final String BEFORE_NEXT_WORKSET_MAP = "Some Mapper";
-
- final String AGGREGATOR_NAME = "AggregatorName";
-
- final int[] ITERATION_KEYS = new int[] {2};
- final int NUM_ITERATIONS = 13;
-
- final int DEFAULT_parallelism= 133;
- final int ITERATION_parallelism = 77;
-
+ final String jobName = "Test JobName";
+ final String iterationName = "Test Name";
+
+ final String beforeNextWorksetMap = "Some Mapper";
+
+ final String aggregatorName = "AggregatorName";
+
+ final int[] iterationKeys = new int[] {2};
+ final int numIterations = 13;
+
+ final int defaultParallelism = 133;
+ final int iterationParallelism = 77;
+
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
// ------------ construct the test program ------------------
{
- env.setParallelism(DEFAULT_parallelism);
-
+ env.setParallelism(defaultParallelism);
+
@SuppressWarnings("unchecked")
DataSet<Tuple3<Double, Long, String>> initialSolutionSet = env.fromElements(new Tuple3<Double, Long, String>(3.44, 5L, "abc"));
-
+
@SuppressWarnings("unchecked")
DataSet<Tuple2<Double, String>> initialWorkSet = env.fromElements(new Tuple2<Double, String>(1.23, "abc"));
-
- DeltaIteration<Tuple3<Double, Long, String>, Tuple2<Double, String>> iteration = initialSolutionSet.iterateDelta(initialWorkSet, NUM_ITERATIONS, ITERATION_KEYS);
- iteration.name(ITERATION_NAME).parallelism(ITERATION_parallelism);
-
- iteration.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
-
+
+ DeltaIteration<Tuple3<Double, Long, String>, Tuple2<Double, String>> iteration = initialSolutionSet.iterateDelta(initialWorkSet, numIterations, iterationKeys);
+ iteration.name(iterationName).parallelism(iterationParallelism);
+
+ iteration.registerAggregator(aggregatorName, new LongSumAggregator());
+
// test that multiple workset consumers are supported
- DataSet<Tuple2<Double, String>> worksetSelfJoin =
+ DataSet<Tuple2<Double, String>> worksetSelfJoin =
iteration.getWorkset()
- .map(new IdentityMapper<Tuple2<Double,String>>())
+ .map(new IdentityMapper<Tuple2<Double, String>>())
.join(iteration.getWorkset()).where(1).equalTo(1).projectFirst(0, 1);
-
+
DataSet<Tuple3<Double, Long, String>> joined = worksetSelfJoin.join(iteration.getSolutionSet()).where(1).equalTo(2).with(new SolutionWorksetJoin());
DataSet<Tuple3<Double, Long, String>> result = iteration.closeWith(
joined,
- joined.map(new NextWorksetMapper()).name(BEFORE_NEXT_WORKSET_MAP));
-
+ joined.map(new NextWorksetMapper()).name(beforeNextWorksetMap));
+
result.output(new DiscardingOutputFormat<Tuple3<Double, Long, String>>());
result.writeAsText("/dev/null");
}
-
-
- Plan p = env.createProgramPlan(JOB_NAME);
-
+
+ Plan p = env.createProgramPlan(jobName);
+
// ------------- validate the plan ----------------
- assertEquals(JOB_NAME, p.getJobName());
- assertEquals(DEFAULT_parallelism, p.getDefaultParallelism());
-
+ assertEquals(jobName, p.getJobName());
+ assertEquals(defaultParallelism, p.getDefaultParallelism());
+
// validate the iteration
GenericDataSinkBase<?> sink1, sink2;
{
@@ -110,23 +112,23 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
sink1 = sinks.next();
sink2 = sinks.next();
}
-
+
DeltaIterationBase<?, ?> iteration = (DeltaIterationBase<?, ?>) sink1.getInput();
-
+
// check that multi consumer translation works for iterations
assertEquals(iteration, sink2.getInput());
-
+
// check the basic iteration properties
- assertEquals(NUM_ITERATIONS, iteration.getMaximumNumberOfIterations());
- assertArrayEquals(ITERATION_KEYS, iteration.getSolutionSetKeyFields());
- assertEquals(ITERATION_parallelism, iteration.getParallelism());
- assertEquals(ITERATION_NAME, iteration.getName());
-
+ assertEquals(numIterations, iteration.getMaximumNumberOfIterations());
+ assertArrayEquals(iterationKeys, iteration.getSolutionSetKeyFields());
+ assertEquals(iterationParallelism, iteration.getParallelism());
+ assertEquals(iterationName, iteration.getName());
+
MapOperatorBase<?, ?, ?> nextWorksetMapper = (MapOperatorBase<?, ?, ?>) iteration.getNextWorkset();
InnerJoinOperatorBase<?, ?, ?, ?> solutionSetJoin = (InnerJoinOperatorBase<?, ?, ?, ?>) iteration.getSolutionSetDelta();
InnerJoinOperatorBase<?, ?, ?, ?> worksetSelfJoin = (InnerJoinOperatorBase<?, ?, ?, ?>) solutionSetJoin.getFirstInput();
MapOperatorBase<?, ?, ?> worksetMapper = (MapOperatorBase<?, ?, ?>) worksetSelfJoin.getFirstInput();
-
+
assertEquals(IdentityMapper.class, worksetMapper.getUserCodeWrapper().getUserCodeClass());
assertEquals(NextWorksetMapper.class, nextWorksetMapper.getUserCodeWrapper().getUserCodeClass());
if (solutionSetJoin.getUserCodeWrapper().getUserCodeObject() instanceof WrappingFunction) {
@@ -137,9 +139,9 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
assertEquals(SolutionWorksetJoin.class, solutionSetJoin.getUserCodeWrapper().getUserCodeClass());
}
- assertEquals(BEFORE_NEXT_WORKSET_MAP, nextWorksetMapper.getName());
-
- assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
+ assertEquals(beforeNextWorksetMap, nextWorksetMapper.getName());
+
+ assertEquals(aggregatorName, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -147,20 +149,20 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
fail(e.getMessage());
}
}
-
+
@Test
public void testRejectWhenSolutionSetKeysDontMatchJoin() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
@SuppressWarnings("unchecked")
DataSet<Tuple3<Double, Long, String>> initialSolutionSet = env.fromElements(new Tuple3<Double, Long, String>(3.44, 5L, "abc"));
@SuppressWarnings("unchecked")
DataSet<Tuple2<Double, String>> initialWorkSet = env.fromElements(new Tuple2<Double, String>(1.23, "abc"));
-
+
DeltaIteration<Tuple3<Double, Long, String>, Tuple2<Double, String>> iteration = initialSolutionSet.iterateDelta(initialWorkSet, 10, 1);
-
+
try {
iteration.getWorkset().join(iteration.getSolutionSet()).where(1).equalTo(2);
fail("Accepted invalid program.");
@@ -168,7 +170,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
catch (InvalidProgramException e) {
// all good!
}
-
+
try {
iteration.getSolutionSet().join(iteration.getWorkset()).where(2).equalTo(1);
fail("Accepted invalid program.");
@@ -183,20 +185,20 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
fail(e.getMessage());
}
}
-
+
@Test
public void testRejectWhenSolutionSetKeysDontMatchCoGroup() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
@SuppressWarnings("unchecked")
DataSet<Tuple3<Double, Long, String>> initialSolutionSet = env.fromElements(new Tuple3<Double, Long, String>(3.44, 5L, "abc"));
@SuppressWarnings("unchecked")
DataSet<Tuple2<Double, String>> initialWorkSet = env.fromElements(new Tuple2<Double, String>(1.23, "abc"));
-
+
DeltaIteration<Tuple3<Double, Long, String>, Tuple2<Double, String>> iteration = initialSolutionSet.iterateDelta(initialWorkSet, 10, 1);
-
+
try {
iteration.getWorkset().coGroup(iteration.getSolutionSet()).where(1).equalTo(2).with(new SolutionWorksetCoGroup1());
fail("Accepted invalid program.");
@@ -204,7 +206,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
catch (InvalidProgramException e) {
// all good!
}
-
+
try {
iteration.getSolutionSet().coGroup(iteration.getWorkset()).where(2).equalTo(1).with(new SolutionWorksetCoGroup2());
fail("Accepted invalid program.");
@@ -219,40 +221,40 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
fail(e.getMessage());
}
}
-
+
// --------------------------------------------------------------------------------------------
-
- public static class SolutionWorksetJoin extends RichJoinFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
+
+ private static class SolutionWorksetJoin extends RichJoinFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
@Override
public Tuple3<Double, Long, String> join(Tuple2<Double, String> first, Tuple3<Double, Long, String> second){
return null;
}
}
-
- public static class NextWorksetMapper extends RichMapFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>> {
+
+ private static class NextWorksetMapper extends RichMapFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>> {
@Override
public Tuple2<Double, String> map(Tuple3<Double, Long, String> value) {
return null;
}
}
-
- public static class IdentityMapper<T> extends RichMapFunction<T, T> {
+
+ private static class IdentityMapper<T> extends RichMapFunction<T, T> {
@Override
public T map(T value) throws Exception {
return value;
}
}
-
- public static class SolutionWorksetCoGroup1 extends RichCoGroupFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
+
+ private static class SolutionWorksetCoGroup1 extends RichCoGroupFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
@Override
public void coGroup(Iterable<Tuple2<Double, String>> first, Iterable<Tuple3<Double, Long, String>> second,
Collector<Tuple3<Double, Long, String>> out) {
}
}
-
- public static class SolutionWorksetCoGroup2 extends RichCoGroupFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>, Tuple3<Double, Long, String>> {
+
+ private static class SolutionWorksetCoGroup2 extends RichCoGroupFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>, Tuple3<Double, Long, String>> {
@Override
public void coGroup(Iterable<Tuple3<Double, Long, String>> second, Iterable<Tuple2<Double, String>> first,
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
index 27c7b2f..6a98a8d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.StringValue;
+
import org.junit.Test;
import java.io.Serializable;
@@ -45,6 +46,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+/**
+ * Tests for translation of distinct operation.
+ */
@SuppressWarnings("serial")
public class DistinctTranslationTest {
@@ -164,7 +168,7 @@ public class DistinctTranslationTest {
DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
- initialData.distinct(new KeySelector<Tuple3<Double,StringValue,LongValue>, StringValue>() {
+ initialData.distinct(new KeySelector<Tuple3<Double, StringValue, LongValue>, StringValue>() {
public StringValue getKey(Tuple3<Double, StringValue, LongValue> value) {
return value.f1;
}
@@ -183,7 +187,7 @@ public class DistinctTranslationTest {
assertEquals(4, reducer.getParallelism());
// check types
- TypeInformation<?> keyValueInfo = new TupleTypeInfo<Tuple2<StringValue, Tuple3<Double,StringValue,LongValue>>>(
+ TypeInformation<?> keyValueInfo = new TupleTypeInfo<Tuple2<StringValue, Tuple3<Double, StringValue, LongValue>>>(
new ValueTypeInfo<StringValue>(StringValue.class),
initialData.getType());
@@ -245,7 +249,7 @@ public class DistinctTranslationTest {
}
@SuppressWarnings("unchecked")
- private static final DataSet<Tuple3<Double, StringValue, LongValue>> getSourceDataSet(ExecutionEnvironment env) {
+ private static DataSet<Tuple3<Double, StringValue, LongValue>> getSourceDataSet(ExecutionEnvironment env) {
return env.fromElements(new Tuple3<Double, StringValue, LongValue>(3.141592, new StringValue("foobar"), new LongValue(77)))
.setParallelism(1);
}
@@ -256,6 +260,9 @@ public class DistinctTranslationTest {
return env.fromCollection(data);
}
+ /**
+ * Custom data type, for testing purposes.
+ */
public static class CustomType implements Serializable {
private static final long serialVersionUID = 1L;
@@ -269,7 +276,7 @@ public class DistinctTranslationTest {
@Override
public String toString() {
- return ""+myInt;
+ return "" + myInt;
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
index 3adbbb8..486cad4 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
@@ -16,20 +16,19 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.operators.translation;
-import static org.junit.Assert.*;
-
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.GenericDataSourceBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
@@ -37,10 +36,17 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.StringValue;
+
import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for translation of reduce operation.
+ */
@SuppressWarnings("serial")
public class ReduceTranslationTests implements java.io.Serializable {
@@ -49,31 +55,31 @@ public class ReduceTranslationTests implements java.io.Serializable {
try {
final int parallelism = 8;
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
-
+
DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
-
- initialData.reduce(new RichReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
+
+ initialData.reduce(new RichReduceFunction<Tuple3<Double, StringValue, LongValue>>() {
public Tuple3<Double, StringValue, LongValue> reduce(Tuple3<Double, StringValue, LongValue> value1, Tuple3<Double, StringValue, LongValue> value2) {
return value1;
}
}).output(new DiscardingOutputFormat<Tuple3<Double, StringValue, LongValue>>());
-
+
Plan p = env.createProgramPlan();
-
+
GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
-
+
ReduceOperatorBase<?, ?> reducer = (ReduceOperatorBase<?, ?>) sink.getInput();
-
+
// check types
assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType());
assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType());
-
+
// check keys
assertTrue(reducer.getKeyColumns(0) == null || reducer.getKeyColumns(0).length == 0);
-
+
// parallelism was not configured on the operator
assertTrue(reducer.getParallelism() == 1 || reducer.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT);
-
+
assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
}
catch (Exception e) {
@@ -82,40 +88,40 @@ public class ReduceTranslationTests implements java.io.Serializable {
fail("Test caused an error: " + e.getMessage());
}
}
-
+
@Test
public void translateGroupedReduceNoMapper() {
try {
final int parallelism = 8;
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
-
+
DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
-
+
initialData
.groupBy(2)
- .reduce(new RichReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
+ .reduce(new RichReduceFunction<Tuple3<Double, StringValue, LongValue>>() {
public Tuple3<Double, StringValue, LongValue> reduce(Tuple3<Double, StringValue, LongValue> value1, Tuple3<Double, StringValue, LongValue> value2) {
return value1;
}
})
.output(new DiscardingOutputFormat<Tuple3<Double, StringValue, LongValue>>());
-
+
Plan p = env.createProgramPlan();
-
+
GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
-
+
ReduceOperatorBase<?, ?> reducer = (ReduceOperatorBase<?, ?>) sink.getInput();
-
+
// check types
assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType());
assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType());
-
+
// parallelism was not configured on the operator
assertTrue(reducer.getParallelism() == parallelism || reducer.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT);
-
+
// check keys
assertArrayEquals(new int[] {2}, reducer.getKeyColumns(0));
-
+
assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
}
catch (Exception e) {
@@ -124,60 +130,58 @@ public class ReduceTranslationTests implements java.io.Serializable {
fail("Test caused an error: " + e.getMessage());
}
}
-
-
+
@Test
public void translateGroupedReduceWithkeyExtractor() {
try {
final int parallelism = 8;
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
-
+
DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
-
+
initialData
- .groupBy(new KeySelector<Tuple3<Double,StringValue,LongValue>, StringValue>() {
+ .groupBy(new KeySelector<Tuple3<Double, StringValue, LongValue>, StringValue>() {
public StringValue getKey(Tuple3<Double, StringValue, LongValue> value) {
return value.f1;
}
})
- .reduce(new RichReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
+ .reduce(new RichReduceFunction<Tuple3<Double, StringValue, LongValue>>() {
public Tuple3<Double, StringValue, LongValue> reduce(Tuple3<Double, StringValue, LongValue> value1, Tuple3<Double, StringValue, LongValue> value2) {
return value1;
}
}).setParallelism(4)
.output(new DiscardingOutputFormat<Tuple3<Double, StringValue, LongValue>>());
-
+
Plan p = env.createProgramPlan();
-
+
GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
-
-
+
MapOperatorBase<?, ?, ?> keyProjector = (MapOperatorBase<?, ?, ?>) sink.getInput();
PlanUnwrappingReduceOperator<?, ?> reducer = (PlanUnwrappingReduceOperator<?, ?>) keyProjector.getInput();
MapOperatorBase<?, ?, ?> keyExtractor = (MapOperatorBase<?, ?, ?>) reducer.getInput();
-
+
// check the parallelisms
assertEquals(1, keyExtractor.getParallelism());
assertEquals(4, reducer.getParallelism());
assertEquals(4, keyProjector.getParallelism());
-
+
// check types
- TypeInformation<?> keyValueInfo = new TupleTypeInfo<Tuple2<StringValue, Tuple3<Double,StringValue,LongValue>>>(
+ TypeInformation<?> keyValueInfo = new TupleTypeInfo<Tuple2<StringValue, Tuple3<Double, StringValue, LongValue>>>(
new ValueTypeInfo<StringValue>(StringValue.class),
initialData.getType());
-
+
assertEquals(initialData.getType(), keyExtractor.getOperatorInfo().getInputType());
assertEquals(keyValueInfo, keyExtractor.getOperatorInfo().getOutputType());
-
+
assertEquals(keyValueInfo, reducer.getOperatorInfo().getInputType());
assertEquals(keyValueInfo, reducer.getOperatorInfo().getOutputType());
-
+
assertEquals(keyValueInfo, keyProjector.getOperatorInfo().getInputType());
assertEquals(initialData.getType(), keyProjector.getOperatorInfo().getOutputType());
-
+
// check keys
assertEquals(KeyExtractingMapper.class, keyExtractor.getUserCodeWrapper().getUserCodeClass());
-
+
assertTrue(keyExtractor.getInput() instanceof GenericDataSourceBase<?, ?>);
}
catch (Exception e) {
@@ -186,9 +190,9 @@ public class ReduceTranslationTests implements java.io.Serializable {
fail("Test caused an error: " + e.getMessage());
}
}
-
+
@SuppressWarnings("unchecked")
- private static final DataSet<Tuple3<Double, StringValue, LongValue>> getSourceDataSet(ExecutionEnvironment env) {
+ private static DataSet<Tuple3<Double, StringValue, LongValue>> getSourceDataSet(ExecutionEnvironment env) {
return env.fromElements(new Tuple3<Double, StringValue, LongValue>(3.141592, new StringValue("foobar"), new LongValue(77)))
.setParallelism(1);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/tools/maven/suppressions-java.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions-java.xml b/tools/maven/suppressions-java.xml
index d7e42e5..3bb8556 100644
--- a/tools/maven/suppressions-java.xml
+++ b/tools/maven/suppressions-java.xml
@@ -33,14 +33,6 @@ under the License.
checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
<suppress
- files="(.*)api[/\\]java[/\\]operators[/\\]translation[/\\](.*)"
- checks="AvoidStarImport|NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
- <!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->
- <suppress
- files="(.*)test[/\\](.*)api[/\\]java[/\\]operators[/\\]translation[/\\](.*)"
- checks="AvoidStarImport"/>
-
- <suppress
files="(.*)api[/\\]java[/\\]operator[/\\]([^/\\]*\.java)"
checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>