You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/12 23:44:17 UTC
[17/22] flink git commit: [FLINK-6731] [tests] Activate strict
checkstyle for flink-tests
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
index 6350533..add0cd1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java
@@ -18,7 +18,6 @@
package org.apache.flink.test.iterative;
-import java.util.List;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.DataSet;
@@ -27,6 +26,11 @@ import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.Collector;
+import java.util.List;
+
+/**
+ * Test iteration with termination criterion.
+ */
public class IterationTerminationWithTerminationTail extends JavaProgramTestBase {
private static final String EXPECTED = "22\n";
@@ -48,7 +52,7 @@ public class IterationTerminationWithTerminationTail extends JavaProgramTestBase
containsResultAsText(result, EXPECTED);
}
- public static final class SumReducer implements GroupReduceFunction<String, String> {
+ private static final class SumReducer implements GroupReduceFunction<String, String> {
private static final long serialVersionUID = 1L;
@Override
@@ -61,7 +65,7 @@ public class IterationTerminationWithTerminationTail extends JavaProgramTestBase
}
}
- public static class TerminationFilter implements FilterFunction<String> {
+ private static class TerminationFilter implements FilterFunction<String> {
private static final long serialVersionUID = 1L;
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
index 5a2df3f..4f749b6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
@@ -18,16 +18,19 @@
package org.apache.flink.test.iterative;
-import java.util.List;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.test.util.JavaProgramTestBase;
-import static org.apache.flink.test.util.TestBaseUtils.containsResultAsText;
import org.apache.flink.util.Collector;
+import java.util.List;
+
+/**
+ * Test iteration with termination criterion consuming the iteration tail.
+ */
public class IterationTerminationWithTwoTails extends JavaProgramTestBase {
private static final String EXPECTED = "22\n";
@@ -49,7 +52,7 @@ public class IterationTerminationWithTwoTails extends JavaProgramTestBase {
containsResultAsText(result, EXPECTED);
}
- public static final class SumReducer implements GroupReduceFunction<String, String> {
+ private static final class SumReducer implements GroupReduceFunction<String, String> {
private static final long serialVersionUID = 1L;
@Override
@@ -62,7 +65,7 @@ public class IterationTerminationWithTwoTails extends JavaProgramTestBase {
}
}
- public static class TerminationFilter implements FilterFunction<String> {
+ private static class TerminationFilter implements FilterFunction<String> {
private static final long serialVersionUID = 1L;
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
index ab66f31..3228b73 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
@@ -18,13 +18,17 @@
package org.apache.flink.test.iterative;
-import java.util.List;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.test.util.JavaProgramTestBase;
+import java.util.List;
+
+/**
+ * Test iterator with an all-reduce.
+ */
public class IterationWithAllReducerITCase extends JavaProgramTestBase {
private static final String EXPECTED = "1\n";
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
index c283df1..2ba1a8f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
@@ -24,12 +24,15 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.PointFormatter;
-import org.apache.flink.test.util.PointInFormat;
import org.apache.flink.test.util.CoordVector;
import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.PointFormatter;
+import org.apache.flink.test.util.PointInFormat;
import org.apache.flink.util.Collector;
+/**
+ * Test iteration with operator chaining.
+ */
public class IterationWithChainingITCase extends JavaProgramTestBase {
private static final String DATA_POINTS = "0|50.90|16.20|72.08|\n" + "1|73.65|61.76|62.89|\n" + "2|61.73|49.95|92.74|\n";
@@ -48,13 +51,13 @@ public class IterationWithChainingITCase extends JavaProgramTestBase {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
- DataSet<Tuple2<Integer, CoordVector>> initialInput
- = env.readFile(new PointInFormat(), dataPath).setParallelism(1).name("Input");
+ DataSet<Tuple2<Integer, CoordVector>> initialInput =
+ env.readFile(new PointInFormat(), dataPath).setParallelism(1).name("Input");
IterativeDataSet<Tuple2<Integer, CoordVector>> iteration = initialInput.iterate(2).name("Loop");
- DataSet<Tuple2<Integer, CoordVector>> identity
- = iteration.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple2<Integer, CoordVector>, Tuple2<Integer, CoordVector>>() {
+ DataSet<Tuple2<Integer, CoordVector>> identity =
+ iteration.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple2<Integer, CoordVector>, Tuple2<Integer, CoordVector>>() {
@Override
public void reduce(Iterable<Tuple2<Integer, CoordVector>> values, Collector<Tuple2<Integer, CoordVector>> out) throws Exception {
for (Tuple2<Integer, CoordVector> value : values) {
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
index 8756429..e44d870 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
@@ -18,8 +18,6 @@
package org.apache.flink.test.iterative;
-import java.io.Serializable;
-
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
@@ -27,11 +25,16 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.test.util.CoordVector;
+import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.test.util.PointFormatter;
import org.apache.flink.test.util.PointInFormat;
-import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.Collector;
+import java.io.Serializable;
+
+/**
+ * Test iteration with union.
+ */
public class IterationWithUnionITCase extends JavaProgramTestBase {
private static final String DATAPOINTS = "0|50.90|16.20|72.08|\n" + "1|73.65|61.76|62.89|\n" + "2|61.73|49.95|92.74|\n";
@@ -39,13 +42,12 @@ public class IterationWithUnionITCase extends JavaProgramTestBase {
protected String dataPath;
protected String resultPath;
-
@Override
protected void preSubmit() throws Exception {
dataPath = createTempFile("datapoints.txt", DATAPOINTS);
resultPath = getTempDirPath("union_iter_result");
}
-
+
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(DATAPOINTS + DATAPOINTS + DATAPOINTS + DATAPOINTS, resultPath);
@@ -54,18 +56,18 @@ public class IterationWithUnionITCase extends JavaProgramTestBase {
@Override
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
DataSet<Tuple2<Integer, CoordVector>> initialInput = env.readFile(new PointInFormat(), this.dataPath).setParallelism(1);
-
+
IterativeDataSet<Tuple2<Integer, CoordVector>> iteration = initialInput.iterate(2);
-
+
DataSet<Tuple2<Integer, CoordVector>> result = iteration.union(iteration).map(new IdentityMapper());
-
+
iteration.closeWith(result).writeAsFormattedText(this.resultPath, new PointFormatter());
-
+
env.execute();
}
-
+
static final class IdentityMapper implements MapFunction<Tuple2<Integer, CoordVector>, Tuple2<Integer, CoordVector>>, Serializable {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansWithBroadcastSetITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansWithBroadcastSetITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansWithBroadcastSetITCase.java
index 7bd9934..77e2663 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansWithBroadcastSetITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansWithBroadcastSetITCase.java
@@ -23,14 +23,17 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.examples.java.clustering.KMeans;
-import org.apache.flink.examples.java.clustering.KMeans.Point;
import org.apache.flink.examples.java.clustering.KMeans.Centroid;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.examples.java.clustering.KMeans.Point;
import org.apache.flink.test.testdata.KMeansData;
+import org.apache.flink.test.util.JavaProgramTestBase;
import java.util.List;
import java.util.Locale;
+/**
+ * Test KMeans clustering with a broadcast set.
+ */
public class KMeansWithBroadcastSetITCase extends JavaProgramTestBase {
@SuppressWarnings("serial")
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/MultipleSolutionSetJoinsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/MultipleSolutionSetJoinsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/MultipleSolutionSetJoinsITCase.java
index e6e91f6..cc3fdc6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/MultipleSolutionSetJoinsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/MultipleSolutionSetJoinsITCase.java
@@ -18,41 +18,44 @@
package org.apache.flink.test.iterative;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.junit.Assert;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.test.optimizer.iterations.MultipleJoinsWithSolutionSetCompilerTest;
import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.List;
+/**
+ * Test multiple joins with the solution set.
+ */
public class MultipleSolutionSetJoinsITCase extends JavaProgramTestBase {
@Override
protected void testProgram() throws Exception {
-
- final int NUM_ITERS = 4;
- final double expectedFactor = (int) Math.pow(7, NUM_ITERS);
-
+
+ final int numIters = 4;
+ final double expectedFactor = (int) Math.pow(7, numIters);
+
// this is an artificial program, it does not compute anything sensical
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Double>> initialData = env.fromElements(new Tuple2<Long, Double>(1L, 1.0), new Tuple2<Long, Double>(2L, 2.0),
new Tuple2<Long, Double>(3L, 3.0), new Tuple2<Long, Double>(4L, 4.0),
new Tuple2<Long, Double>(5L, 5.0), new Tuple2<Long, Double>(6L, 6.0));
-
- DataSet<Tuple2<Long, Double>> result = MultipleJoinsWithSolutionSetCompilerTest.constructPlan(initialData, NUM_ITERS);
-
- List<Tuple2<Long, Double>> resultCollector = new ArrayList<Tuple2<Long,Double>>();
- result.output(new LocalCollectionOutputFormat<Tuple2<Long,Double>>(resultCollector));
-
+
+ DataSet<Tuple2<Long, Double>> result = MultipleJoinsWithSolutionSetCompilerTest.constructPlan(initialData, numIters);
+
+ List<Tuple2<Long, Double>> resultCollector = new ArrayList<Tuple2<Long, Double>>();
+ result.output(new LocalCollectionOutputFormat<>(resultCollector));
+
env.execute();
-
+
for (Tuple2<Long, Double> tuple : resultCollector) {
Assert.assertEquals(expectedFactor * tuple.f0, tuple.f1.doubleValue(), 0.0);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/SolutionSetDuplicatesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/SolutionSetDuplicatesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/SolutionSetDuplicatesITCase.java
index c987dfd..a402747 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/SolutionSetDuplicatesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/SolutionSetDuplicatesITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.util.Collector;
+
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -37,6 +38,9 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+/**
+ * Test for duplicate elimination in the solution set.
+ */
@SuppressWarnings("serial")
@RunWith(Parameterized.class)
public class SolutionSetDuplicatesITCase extends MultipleProgramsTestBase {
@@ -66,7 +70,7 @@ public class SolutionSetDuplicatesITCase extends MultipleProgramsTestBase {
List<Integer> result = iter
.closeWith(iter.getWorkset(), iter.getWorkset())
- .map(new MapFunction<Tuple2<Long,Long>, Integer>() {
+ .map(new MapFunction<Tuple2<Long, Long>, Integer>() {
@Override
public Integer map(Tuple2<Long, Long> value) {
return value.f0.intValue();
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/StaticlyNestedIterationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/StaticlyNestedIterationsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/StaticlyNestedIterationsITCase.java
index 766a422..0228aef 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/StaticlyNestedIterationsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/StaticlyNestedIterationsITCase.java
@@ -27,34 +27,35 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.test.util.JavaProgramTestBase;
+/**
+ * Test iterations referenced from the static path of other iterations.
+ */
public class StaticlyNestedIterationsITCase extends JavaProgramTestBase {
-
@Override
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
DataSet<Long> data1 = env.generateSequence(1, 100);
DataSet<Long> data2 = env.generateSequence(1, 100);
-
+
IterativeDataSet<Long> firstIteration = data1.iterate(100);
-
+
DataSet<Long> firstResult = firstIteration.closeWith(firstIteration.map(new IdMapper()));
-
-
+
IterativeDataSet<Long> mainIteration = data2.map(new IdMapper()).iterate(100);
-
+
DataSet<Long> joined = mainIteration.join(firstResult)
.where(new IdKeyExtractor()).equalTo(new IdKeyExtractor())
.with(new Joiner());
-
+
DataSet<Long> mainResult = mainIteration.closeWith(joined);
-
+
mainResult.output(new DiscardingOutputFormat<Long>());
-
+
env.execute();
}
-
+
private static class IdKeyExtractor implements KeySelector<Long, Long> {
private static final long serialVersionUID = 1L;
@@ -64,21 +65,21 @@ public class StaticlyNestedIterationsITCase extends JavaProgramTestBase {
return value;
}
}
-
+
private static class IdMapper implements MapFunction<Long, Long> {
-
+
private static final long serialVersionUID = 1L;
-
+
@Override
public Long map(Long value) {
return value;
}
}
-
+
private static class Joiner implements JoinFunction<Long, Long, Long> {
-
+
private static final long serialVersionUID = 1L;
-
+
@Override
public Long join(Long first, Long second) {
return first;
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/UnionStaticDynamicIterationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/UnionStaticDynamicIterationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/UnionStaticDynamicIterationITCase.java
index fa8643f..01e578c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/UnionStaticDynamicIterationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/UnionStaticDynamicIterationITCase.java
@@ -18,36 +18,39 @@
package org.apache.flink.test.iterative;
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.test.util.JavaProgramTestBase;
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test union between static and dynamic path in an iteration.
+ */
public class UnionStaticDynamicIterationITCase extends JavaProgramTestBase {
-
+
private final ArrayList<Long> result = new ArrayList<Long>();
-
+
@Override
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
DataSet<Long> inputStatic = env.generateSequence(1, 4);
DataSet<Long> inputIteration = env.generateSequence(1, 4);
-
+
IterativeDataSet<Long> iteration = inputIteration.iterate(3);
-
+
DataSet<Long> result = iteration.closeWith(inputStatic.union(inputStatic).union(iteration.union(iteration)));
-
+
result.output(new LocalCollectionOutputFormat<Long>(this.result));
-
+
env.execute();
}
-
+
@Override
protected void postSubmit() throws Exception {
assertEquals(88, result.size());
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorConvergenceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorConvergenceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorConvergenceITCase.java
index 3bced25..2403cd9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorConvergenceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorConvergenceITCase.java
@@ -18,33 +18,33 @@
package org.apache.flink.test.iterative.aggregators;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichJoinFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import static org.junit.Assert.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
/**
- * Connected Components test case that uses a parameterizable convergence criterion
+ * Connected Components test case that uses a parameterizable convergence criterion.
*/
@RunWith(Parameterized.class)
@SuppressWarnings("serial")
@@ -55,181 +55,181 @@ public class AggregatorConvergenceITCase extends MultipleProgramsTestBase {
}
final List<Tuple2<Long, Long>> verticesInput = Arrays.asList(
- new Tuple2<>(1l,1l),
- new Tuple2<>(2l,2l),
- new Tuple2<>(3l,3l),
- new Tuple2<>(4l,4l),
- new Tuple2<>(5l,5l),
- new Tuple2<>(6l,6l),
- new Tuple2<>(7l,7l),
- new Tuple2<>(8l,8l),
- new Tuple2<>(9l,9l)
+ new Tuple2<>(1L, 1L),
+ new Tuple2<>(2L, 2L),
+ new Tuple2<>(3L, 3L),
+ new Tuple2<>(4L, 4L),
+ new Tuple2<>(5L, 5L),
+ new Tuple2<>(6L, 6L),
+ new Tuple2<>(7L, 7L),
+ new Tuple2<>(8L, 8L),
+ new Tuple2<>(9L, 9L)
);
final List<Tuple2<Long, Long>> edgesInput = Arrays.asList(
- new Tuple2<>(1l,2l),
- new Tuple2<>(1l,3l),
- new Tuple2<>(2l,3l),
- new Tuple2<>(2l,4l),
- new Tuple2<>(2l,1l),
- new Tuple2<>(3l,1l),
- new Tuple2<>(3l,2l),
- new Tuple2<>(4l,2l),
- new Tuple2<>(4l,6l),
- new Tuple2<>(5l,6l),
- new Tuple2<>(6l,4l),
- new Tuple2<>(6l,5l),
- new Tuple2<>(7l,8l),
- new Tuple2<>(7l,9l),
- new Tuple2<>(8l,7l),
- new Tuple2<>(8l,9l),
- new Tuple2<>(9l,7l),
- new Tuple2<>(9l,8l)
+ new Tuple2<>(1L, 2L),
+ new Tuple2<>(1L, 3L),
+ new Tuple2<>(2L, 3L),
+ new Tuple2<>(2L, 4L),
+ new Tuple2<>(2L, 1L),
+ new Tuple2<>(3L, 1L),
+ new Tuple2<>(3L, 2L),
+ new Tuple2<>(4L, 2L),
+ new Tuple2<>(4L, 6L),
+ new Tuple2<>(5L, 6L),
+ new Tuple2<>(6L, 4L),
+ new Tuple2<>(6L, 5L),
+ new Tuple2<>(7L, 8L),
+ new Tuple2<>(7L, 9L),
+ new Tuple2<>(8L, 7L),
+ new Tuple2<>(8L, 9L),
+ new Tuple2<>(9L, 7L),
+ new Tuple2<>(9L, 8L)
);
final List<Tuple2<Long, Long>> expectedResult = Arrays.asList(
- new Tuple2<>(1L,1L),
- new Tuple2<>(2L,1L),
- new Tuple2<>(3L,1L),
- new Tuple2<>(4L,1L),
- new Tuple2<>(5L,2L),
- new Tuple2<>(6L,1L),
- new Tuple2<>(7L,7L),
- new Tuple2<>(8L,7L),
- new Tuple2<>(9L,7L)
+ new Tuple2<>(1L, 1L),
+ new Tuple2<>(2L, 1L),
+ new Tuple2<>(3L, 1L),
+ new Tuple2<>(4L, 1L),
+ new Tuple2<>(5L, 2L),
+ new Tuple2<>(6L, 1L),
+ new Tuple2<>(7L, 7L),
+ new Tuple2<>(8L, 7L),
+ new Tuple2<>(9L, 7L)
);
@Test
public void testConnectedComponentsWithParametrizableConvergence() throws Exception {
- // name of the aggregator that checks for convergence
- final String UPDATED_ELEMENTS = "updated.elements.aggr";
+ // name of the aggregator that checks for convergence
+ final String updatedElements = "updated.elements.aggr";
+
+ // the iteration stops if less than this number of elements change value
+ final long convergenceThreshold = 3;
- // the iteration stops if less than this number of elements change value
- final long convergence_threshold = 3;
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple2<Long, Long>> initialSolutionSet = env.fromCollection(verticesInput);
+ DataSet<Tuple2<Long, Long>> edges = env.fromCollection(edgesInput);
- DataSet<Tuple2<Long, Long>> initialSolutionSet = env.fromCollection(verticesInput);
- DataSet<Tuple2<Long, Long>> edges = env.fromCollection(edgesInput);
+ IterativeDataSet<Tuple2<Long, Long>> iteration = initialSolutionSet.iterate(10);
- IterativeDataSet<Tuple2<Long, Long>> iteration = initialSolutionSet.iterate(10);
+ // register the convergence criterion
+ iteration.registerAggregationConvergenceCriterion(updatedElements,
+ new LongSumAggregator(), new UpdatedElementsConvergenceCriterion(convergenceThreshold));
- // register the convergence criterion
- iteration.registerAggregationConvergenceCriterion(UPDATED_ELEMENTS,
- new LongSumAggregator(), new UpdatedElementsConvergenceCriterion(convergence_threshold));
+ DataSet<Tuple2<Long, Long>> verticesWithNewComponents = iteration.join(edges).where(0).equalTo(0)
+ .with(new NeighborWithComponentIDJoin())
+ .groupBy(0).min(1);
- DataSet<Tuple2<Long, Long>> verticesWithNewComponents = iteration.join(edges).where(0).equalTo(0)
- .with(new NeighborWithComponentIDJoin())
- .groupBy(0).min(1);
+ DataSet<Tuple2<Long, Long>> updatedComponentId =
+ verticesWithNewComponents.join(iteration).where(0).equalTo(0)
+ .flatMap(new MinimumIdFilter(updatedElements));
- DataSet<Tuple2<Long, Long>> updatedComponentId =
- verticesWithNewComponents.join(iteration).where(0).equalTo(0)
- .flatMap(new MinimumIdFilter(UPDATED_ELEMENTS));
+ List<Tuple2<Long, Long>> result = iteration.closeWith(updatedComponentId).collect();
+ Collections.sort(result, new TestBaseUtils.TupleComparator<Tuple2<Long, Long>>());
- List<Tuple2<Long, Long>> result = iteration.closeWith(updatedComponentId).collect();
- Collections.sort(result, new TestBaseUtils.TupleComparator<Tuple2<Long, Long>>());
-
- assertEquals(expectedResult, result);
+ assertEquals(expectedResult, result);
}
@Test
public void testDeltaConnectedComponentsWithParametrizableConvergence() throws Exception {
- // name of the aggregator that checks for convergence
- final String UPDATED_ELEMENTS = "updated.elements.aggr";
+ // name of the aggregator that checks for convergence
+ final String updatedElements = "updated.elements.aggr";
- // the iteration stops if less than this number of elements change value
- final long convergence_threshold = 3;
+ // the iteration stops if less than this number of elements change value
+ final long convergenceThreshold = 3;
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<Tuple2<Long, Long>> initialSolutionSet = env.fromCollection(verticesInput);
- DataSet<Tuple2<Long, Long>> edges = env.fromCollection(edgesInput);
+ DataSet<Tuple2<Long, Long>> initialSolutionSet = env.fromCollection(verticesInput);
+ DataSet<Tuple2<Long, Long>> edges = env.fromCollection(edgesInput);
- DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
- initialSolutionSet.iterateDelta(initialSolutionSet, 10, 0);
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
+ initialSolutionSet.iterateDelta(initialSolutionSet, 10, 0);
- // register the convergence criterion
- iteration.registerAggregationConvergenceCriterion(UPDATED_ELEMENTS,
- new LongSumAggregator(), new UpdatedElementsConvergenceCriterion(convergence_threshold));
+ // register the convergence criterion
+ iteration.registerAggregationConvergenceCriterion(updatedElements,
+ new LongSumAggregator(), new UpdatedElementsConvergenceCriterion(convergenceThreshold));
- DataSet<Tuple2<Long, Long>> verticesWithNewComponents = iteration.getWorkset().join(edges).where(0).equalTo(0)
- .with(new NeighborWithComponentIDJoin())
- .groupBy(0).min(1);
+ DataSet<Tuple2<Long, Long>> verticesWithNewComponents = iteration.getWorkset().join(edges).where(0).equalTo(0)
+ .with(new NeighborWithComponentIDJoin())
+ .groupBy(0).min(1);
- DataSet<Tuple2<Long, Long>> updatedComponentId =
- verticesWithNewComponents.join(iteration.getSolutionSet()).where(0).equalTo(0)
- .flatMap(new MinimumIdFilter(UPDATED_ELEMENTS));
+ DataSet<Tuple2<Long, Long>> updatedComponentId =
+ verticesWithNewComponents.join(iteration.getSolutionSet()).where(0).equalTo(0)
+ .flatMap(new MinimumIdFilter(updatedElements));
- List<Tuple2<Long, Long>> result = iteration.closeWith(updatedComponentId, updatedComponentId).collect();
- Collections.sort(result, new TestBaseUtils.TupleComparator<Tuple2<Long, Long>>());
+ List<Tuple2<Long, Long>> result = iteration.closeWith(updatedComponentId, updatedComponentId).collect();
+ Collections.sort(result, new TestBaseUtils.TupleComparator<Tuple2<Long, Long>>());
- assertEquals(expectedResult, result);
+ assertEquals(expectedResult, result);
}
-
+
@Test
public void testParameterizableAggregator() throws Exception {
- final int MAX_ITERATIONS = 5;
- final String AGGREGATOR_NAME = "elements.in.component.aggregator";
- final long componentId = 1l;
+ final int maxIterations = 5;
+ final String aggregatorName = "elements.in.component.aggregator";
+ final long componentId = 1L;
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<Tuple2<Long, Long>> initialSolutionSet = env.fromCollection(verticesInput);
- DataSet<Tuple2<Long, Long>> edges = env.fromCollection(edgesInput);
+ DataSet<Tuple2<Long, Long>> initialSolutionSet = env.fromCollection(verticesInput);
+ DataSet<Tuple2<Long, Long>> edges = env.fromCollection(edgesInput);
- IterativeDataSet<Tuple2<Long, Long>> iteration =
- initialSolutionSet.iterate(MAX_ITERATIONS);
+ IterativeDataSet<Tuple2<Long, Long>> iteration =
+ initialSolutionSet.iterate(maxIterations);
- // register the aggregator
- iteration.registerAggregator(AGGREGATOR_NAME, new LongSumAggregatorWithParameter(componentId));
+ // register the aggregator
+ iteration.registerAggregator(aggregatorName, new LongSumAggregatorWithParameter(componentId));
- DataSet<Tuple2<Long, Long>> verticesWithNewComponents = iteration.join(edges).where(0).equalTo(0)
- .with(new NeighborWithComponentIDJoin())
- .groupBy(0).min(1);
+ DataSet<Tuple2<Long, Long>> verticesWithNewComponents = iteration.join(edges).where(0).equalTo(0)
+ .with(new NeighborWithComponentIDJoin())
+ .groupBy(0).min(1);
- DataSet<Tuple2<Long, Long>> updatedComponentId =
- verticesWithNewComponents.join(iteration).where(0).equalTo(0)
- .flatMap(new MinimumIdFilterCounting(AGGREGATOR_NAME));
+ DataSet<Tuple2<Long, Long>> updatedComponentId =
+ verticesWithNewComponents.join(iteration).where(0).equalTo(0)
+ .flatMap(new MinimumIdFilterCounting(aggregatorName));
- List<Tuple2<Long, Long>> result = iteration.closeWith(updatedComponentId).collect();
+ List<Tuple2<Long, Long>> result = iteration.closeWith(updatedComponentId).collect();
- Collections.sort(result, new TestBaseUtils.TupleComparator<Tuple2<Long, Long>>());
+ Collections.sort(result, new TestBaseUtils.TupleComparator<Tuple2<Long, Long>>());
- List<Tuple2<Long, Long>> expectedResult = Arrays.asList(
- new Tuple2<>(1L,1L),
- new Tuple2<>(2L,1L),
- new Tuple2<>(3L,1L),
- new Tuple2<>(4L,1L),
- new Tuple2<>(5L,1L),
- new Tuple2<>(6L,1L),
- new Tuple2<>(7L,7L),
- new Tuple2<>(8L,7L),
- new Tuple2<>(9L,7L)
- );
+ List<Tuple2<Long, Long>> expectedResult = Arrays.asList(
+ new Tuple2<>(1L, 1L),
+ new Tuple2<>(2L, 1L),
+ new Tuple2<>(3L, 1L),
+ new Tuple2<>(4L, 1L),
+ new Tuple2<>(5L, 1L),
+ new Tuple2<>(6L, 1L),
+ new Tuple2<>(7L, 7L),
+ new Tuple2<>(8L, 7L),
+ new Tuple2<>(9L, 7L)
+ );
- // check program result
- assertEquals(expectedResult, result);
+ // check program result
+ assertEquals(expectedResult, result);
- // check aggregators
- long[] aggr_values = MinimumIdFilterCounting.aggr_value;
+ // check aggregators
+ long[] aggrValues = MinimumIdFilterCounting.aggr_value;
- // note that position 0 has the end result from superstep 1, retrieved at the start of iteration 2
- // position one as superstep 2, retrieved at the start of iteration 3.
- // the result from iteration 5 is not available, because no iteration 6 happens
- assertEquals(3, aggr_values[0]);
- assertEquals(4, aggr_values[1]);
- assertEquals(5, aggr_values[2]);
- assertEquals(6, aggr_values[3]);
+ // note that position 0 has the end result from superstep 1, retrieved at the start of iteration 2
+ // position one as superstep 2, retrieved at the start of iteration 3.
+ // the result from iteration 5 is not available, because no iteration 6 happens
+ assertEquals(3, aggrValues[0]);
+ assertEquals(4, aggrValues[1]);
+ assertEquals(5, aggrValues[2]);
+ assertEquals(6, aggrValues[3]);
}
-
+
// ------------------------------------------------------------------------
// Test Functions
// ------------------------------------------------------------------------
- public static final class NeighborWithComponentIDJoin extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ private static final class NeighborWithComponentIDJoin extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
@@ -239,8 +239,8 @@ public class AggregatorConvergenceITCase extends MultipleProgramsTestBase {
return vertexWithCompId;
}
}
-
- public static class MinimumIdFilter extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
+
+ private static class MinimumIdFilter extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
private final String aggName;
private LongSumAggregator aggr;
@@ -261,7 +261,7 @@ public class AggregatorConvergenceITCase extends MultipleProgramsTestBase {
if (vertexWithNewAndOldId.f0.f1 < vertexWithNewAndOldId.f1.f1) {
out.collect(vertexWithNewAndOldId.f0);
- aggr.aggregate(1l);
+ aggr.aggregate(1L);
}
else {
out.collect(vertexWithNewAndOldId.f1);
@@ -269,24 +269,24 @@ public class AggregatorConvergenceITCase extends MultipleProgramsTestBase {
}
}
- public static final class MinimumIdFilterCounting
+ private static final class MinimumIdFilterCounting
extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
-
+
private static final long[] aggr_value = new long[5];
-
+
private final String aggName;
private LongSumAggregatorWithParameter aggr;
public MinimumIdFilterCounting(String aggName) {
this.aggName = aggName;
}
-
+
@Override
public void open(Configuration conf) {
final int superstep = getIterationRuntimeContext().getSuperstepNumber();
-
+
aggr = getIterationRuntimeContext().getIterationAggregator(aggName);
-
+
if (superstep > 1 && getIterationRuntimeContext().getIndexOfThisSubtask() == 0) {
LongValue val = getIterationRuntimeContext().getPreviousIterationAggregate(aggName);
aggr_value[superstep - 2] = val.getValue();
@@ -301,24 +301,26 @@ public class AggregatorConvergenceITCase extends MultipleProgramsTestBase {
if (vertexWithNewAndOldId.f0.f1 < vertexWithNewAndOldId.f1.f1) {
out.collect(vertexWithNewAndOldId.f0);
if (vertexWithNewAndOldId.f0.f1 == aggr.getComponentId()) {
- aggr.aggregate(1l);
+ aggr.aggregate(1L);
}
} else {
out.collect(vertexWithNewAndOldId.f1);
if (vertexWithNewAndOldId.f1.f1 == aggr.getComponentId()) {
- aggr.aggregate(1l);
+ aggr.aggregate(1L);
}
}
}
}
- /** A Convergence Criterion with one parameter */
- public static class UpdatedElementsConvergenceCriterion implements ConvergenceCriterion<LongValue> {
+ /**
+ * A Convergence Criterion with one parameter.
+ */
+ private static class UpdatedElementsConvergenceCriterion implements ConvergenceCriterion<LongValue> {
private final long threshold;
- public UpdatedElementsConvergenceCriterion(long u_threshold) {
- this.threshold = u_threshold;
+ public UpdatedElementsConvergenceCriterion(long uThreshold) {
+ this.threshold = uThreshold;
}
@Override
@@ -327,7 +329,7 @@ public class AggregatorConvergenceITCase extends MultipleProgramsTestBase {
}
}
- public static final class LongSumAggregatorWithParameter extends LongSumAggregator {
+ private static final class LongSumAggregatorWithParameter extends LongSumAggregator {
private long componentId;
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
index 042617d..64ee98a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
@@ -18,38 +18,38 @@
package org.apache.flink.test.iterative.aggregators;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.util.Random;
-
-import org.apache.flink.api.common.functions.RichFilterFunction;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Assert;
-
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
+
+import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.util.Random;
+
import static org.junit.Assert.assertEquals;
/**
@@ -99,7 +99,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
env.registerCachedFile(resultPath, testName);
IterativeDataSet<Long> solution = env.fromElements(1L).iterate(2);
- solution.closeWith(env.generateSequence(1,2).filter(new RichFilterFunction<Long>() {
+ solution.closeWith(env.generateSequence(1, 2).filter(new RichFilterFunction<Long>() {
@Override
public void open(Configuration parameters) throws Exception{
File file = getRuntimeContext().getDistributedCache().getFile(testName);
@@ -108,6 +108,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
reader.close();
assertEquals(output, testString);
}
+
@Override
public boolean filter(Long value) throws Exception {
return false;
@@ -311,7 +312,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- public static final class NegativeElementsConvergenceCriterion implements ConvergenceCriterion<LongValue> {
+ private static final class NegativeElementsConvergenceCriterion implements ConvergenceCriterion<LongValue> {
@Override
public boolean isConverged(int iteration, LongValue value) {
@@ -320,7 +321,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- public static final class NegativeElementsConvergenceCriterionWithParam implements ConvergenceCriterion<LongValue> {
+ private static final class NegativeElementsConvergenceCriterionWithParam implements ConvergenceCriterion<LongValue> {
private int value;
@@ -339,7 +340,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- public static final class SubtractOneMap extends RichMapFunction<Integer, Integer> {
+ private static final class SubtractOneMap extends RichMapFunction<Integer, Integer> {
private LongSumAggregator aggr;
@@ -354,14 +355,14 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
Integer newValue = value - 1;
// count negative numbers
if (newValue < 0) {
- aggr.aggregate(1l);
+ aggr.aggregate(1L);
}
return newValue;
}
}
@SuppressWarnings("serial")
- public static final class SubtractOneMapWithParam extends RichMapFunction<Integer, Integer> {
+ private static final class SubtractOneMapWithParam extends RichMapFunction<Integer, Integer> {
private LongSumAggregatorWithParameter aggr;
@@ -374,15 +375,15 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
public Integer map(Integer value) {
Integer newValue = value - 1;
// count numbers less than the aggregator parameter
- if ( newValue < aggr.getValue() ) {
- aggr.aggregate(1l);
+ if (newValue < aggr.getValue()) {
+ aggr.aggregate(1L);
}
return newValue;
}
}
@SuppressWarnings("serial")
- public static class LongSumAggregatorWithParameter extends LongSumAggregator {
+ private static class LongSumAggregatorWithParameter extends LongSumAggregator {
private int value;
@@ -396,7 +397,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- public static final class TupleMakerMap extends RichMapFunction<Integer, Tuple2<Integer, Integer>> {
+ private static final class TupleMakerMap extends RichMapFunction<Integer, Tuple2<Integer, Integer>> {
private Random rnd;
@@ -414,7 +415,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- public static final class AggregateMapDelta extends RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
+ private static final class AggregateMapDelta extends RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
private LongSumAggregator aggr;
private LongValue previousAggr;
@@ -437,14 +438,14 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) {
// count the elements that are equal to the superstep number
if (value.f1 == superstep) {
- aggr.aggregate(1l);
+ aggr.aggregate(1L);
}
return value;
}
}
@SuppressWarnings("serial")
- public static final class UpdateFilter extends RichFlatMapFunction<Tuple2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>,
+ private static final class UpdateFilter extends RichFlatMapFunction<Tuple2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>,
Tuple2<Integer, Integer>> {
private int superstep;
@@ -465,7 +466,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- public static final class ProjectSecondMapper extends RichMapFunction<Tuple2<Integer, Integer>, Integer> {
+ private static final class ProjectSecondMapper extends RichMapFunction<Tuple2<Integer, Integer>, Integer> {
@Override
public Integer map(Tuple2<Integer, Integer> value) {
@@ -474,7 +475,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- public static final class AggregateAndSubtractOneDelta extends RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
+ private static final class AggregateAndSubtractOneDelta extends RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
private LongSumAggregator aggr;
private LongValue previousAggr;
@@ -497,7 +498,7 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) {
// count the ones
if (value.f1 == 1) {
- aggr.aggregate(1l);
+ aggr.aggregate(1L);
}
value.f1--;
return value;
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
deleted file mode 100644
index fc01ce7..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import java.util.List;
-
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.javaApiOperators.util.ValueCollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.StringValue;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-@RunWith(Parameterized.class)
-public class AggregateITCase extends MultipleProgramsTestBase {
-
-
- public AggregateITCase(TestExecutionMode mode) {
- super(mode);
- }
-
- @Test
- public void testFullAggregate() throws Exception {
- /*
- * Full Aggregate
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple2<Integer, Long>> aggregateDs = ds
- .aggregate(Aggregations.SUM, 0)
- .and(Aggregations.MAX, 1)
- .project(0, 1);
-
- List<Tuple2<Integer, Long>> result = aggregateDs.collect();
-
- String expected = "231,6\n";
-
- compareResultAsTuples(result, expected);
- }
-
- @Test
- public void testFullAggregateOfMutableValueTypes() throws Exception {
- /*
- * Full Aggregate of mutable value types
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = ValueCollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple2<IntValue, LongValue>> aggregateDs = ds
- .aggregate(Aggregations.SUM, 0)
- .and(Aggregations.MAX, 1)
- .project(0, 1);
-
- List<Tuple2<IntValue, LongValue>> result = aggregateDs.collect();
-
- String expected = "231,6\n";
-
- compareResultAsTuples(result, expected);
- }
-
- @Test
- public void testGroupedAggregate() throws Exception {
- /*
- * Grouped Aggregate
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1)
- .aggregate(Aggregations.SUM, 0)
- .project(1, 0);
-
- List<Tuple2<Long, Integer>> result = aggregateDs.collect();
-
- String expected = "1,1\n" +
- "2,5\n" +
- "3,15\n" +
- "4,34\n" +
- "5,65\n" +
- "6,111\n";
-
- compareResultAsTuples(result, expected);
- }
-
- @Test
- public void testGroupedAggregateOfMutableValueTypes() throws Exception {
- /*
- * Grouped Aggregate of mutable value types
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = ValueCollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple2<IntValue, LongValue>> aggregateDs = ds.groupBy(1)
- .aggregate(Aggregations.SUM, 0)
- .project(1, 0);
-
- List<Tuple2<IntValue, LongValue>> result = aggregateDs.collect();
-
- String expected = "1,1\n" +
- "2,5\n" +
- "3,15\n" +
- "4,34\n" +
- "5,65\n" +
- "6,111\n";
-
- compareResultAsTuples(result, expected);
- }
-
- @Test
- public void testNestedAggregate() throws Exception {
- /*
- * Nested Aggregate
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1)
- .aggregate(Aggregations.MIN, 0)
- .aggregate(Aggregations.MIN, 0)
- .project(0);
-
- List<Tuple1<Integer>> result = aggregateDs.collect();
-
- String expected = "1\n";
-
- compareResultAsTuples(result, expected);
- }
-
- @Test
- public void testNestedAggregateOfMutableValueTypes() throws Exception {
- /*
- * Nested Aggregate of mutable value types
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = ValueCollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple1<IntValue>> aggregateDs = ds.groupBy(1)
- .aggregate(Aggregations.MIN, 0)
- .aggregate(Aggregations.MIN, 0)
- .project(0);
-
- List<Tuple1<IntValue>> result = aggregateDs.collect();
-
- String expected = "1\n";
-
- compareResultAsTuples(result, expected);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupGroupSortITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupGroupSortITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupGroupSortITCase.java
deleted file mode 100644
index 35d615b..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupGroupSortITCase.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-
-@SuppressWarnings({"serial", "unchecked"})
-public class CoGroupGroupSortITCase extends JavaProgramTestBase {
-
- @Override
- protected void testProgram() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-
- DataSet<Tuple2<Long, Long>> input1 = env.fromElements(
- new Tuple2<Long, Long>(0L, 5L),
- new Tuple2<Long, Long>(0L, 4L),
- new Tuple2<Long, Long>(0L, 3L),
- new Tuple2<Long, Long>(0L, 2L),
- new Tuple2<Long, Long>(0L, 1L),
- new Tuple2<Long, Long>(1L, 10L),
- new Tuple2<Long, Long>(1L, 8L),
- new Tuple2<Long, Long>(1L, 9L),
- new Tuple2<Long, Long>(1L, 7L));
-
- DataSet<TestPojo> input2 = env.fromElements(
- new TestPojo(0L, 10L, 3L),
- new TestPojo(0L, 8L, 3L),
- new TestPojo(0L, 10L, 1L),
- new TestPojo(0L, 9L, 0L),
- new TestPojo(0L, 8L, 2L),
- new TestPojo(0L, 8L, 4L),
- new TestPojo(1L, 10L, 3L),
- new TestPojo(1L, 8L, 3L),
- new TestPojo(1L, 10L, 1L),
- new TestPojo(1L, 9L, 0L),
- new TestPojo(1L, 8L, 2L),
- new TestPojo(1L, 8L, 4L));
-
- input1.coGroup(input2)
- .where(1).equalTo("b")
- .sortFirstGroup(0, Order.DESCENDING)
- .sortSecondGroup("c", Order.ASCENDING).sortSecondGroup("a", Order.DESCENDING)
-
- .with(new ValidatingCoGroup())
- .output(new DiscardingOutputFormat<NullValue>());
-
- env.execute();
- }
-
-
- private static class ValidatingCoGroup implements CoGroupFunction<Tuple2<Long, Long>, TestPojo, NullValue> {
-
- @Override
- public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<TestPojo> second, Collector<NullValue> out) throws Exception {
- // validate the tuple input, field 1, descending
- {
- long lastValue = Long.MAX_VALUE;
-
- for (Tuple2<Long, Long> t : first) {
- long current = t.f1;
- Assert.assertTrue(current <= lastValue);
- lastValue = current;
- }
- }
-
-
- // validate the pojo input
- {
- TestPojo lastValue = new TestPojo(Long.MAX_VALUE, 0, Long.MIN_VALUE);
-
- for (TestPojo current : second) {
- Assert.assertTrue(current.c >= lastValue.c);
- Assert.assertTrue(current.c != lastValue.c || current.a <= lastValue.a);
-
- lastValue = current;
- }
- }
-
- }
- }
-
- public static class TestPojo implements Cloneable {
- public long a;
- public long b;
- public long c;
-
-
- public TestPojo() {}
-
- public TestPojo(long a, long b, long c) {
- this.a = a;
- this.b = b;
- this.c = c;
- }
- }
-}